feed_upload_and_sync.go 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323
  1. package main
  2. import (
  3. "bytes"
  4. "crypto/md5"
  5. "fmt"
  6. "io"
  7. "io/ioutil"
  8. "net/http"
  9. "os"
  10. "os/exec"
  11. "strings"
  12. "sync"
  13. "time"
  14. "github.com/pborman/uuid"
  15. "github.com/ethereum/go-ethereum/common/hexutil"
  16. "github.com/ethereum/go-ethereum/crypto"
  17. "github.com/ethereum/go-ethereum/log"
  18. "github.com/ethereum/go-ethereum/swarm/multihash"
  19. "github.com/ethereum/go-ethereum/swarm/storage/feed"
  20. colorable "github.com/mattn/go-colorable"
  21. cli "gopkg.in/urfave/cli.v1"
  22. )
  23. const (
  24. feedRandomDataLength = 8
  25. )
  26. // TODO: retrieve with manifest + extract repeating code
  27. func cliFeedUploadAndSync(c *cli.Context) error {
  28. log.Root().SetHandler(log.CallerFileHandler(log.LvlFilterHandler(log.Lvl(verbosity), log.StreamHandler(colorable.NewColorableStderr(), log.TerminalFormat(true)))))
  29. defer func(now time.Time) { log.Info("total time", "time", time.Since(now), "size (kb)", filesize) }(time.Now())
  30. generateEndpoints(scheme, cluster, from, to)
  31. log.Info("generating and uploading MRUs to " + endpoints[0] + " and syncing")
  32. // create a random private key to sign updates with and derive the address
  33. pkFile, err := ioutil.TempFile("", "swarm-feed-smoke-test")
  34. if err != nil {
  35. return err
  36. }
  37. defer pkFile.Close()
  38. defer os.Remove(pkFile.Name())
  39. privkeyHex := "0000000000000000000000000000000000000000000000000000000000001976"
  40. privKey, err := crypto.HexToECDSA(privkeyHex)
  41. if err != nil {
  42. return err
  43. }
  44. user := crypto.PubkeyToAddress(privKey.PublicKey)
  45. userHex := hexutil.Encode(user.Bytes())
  46. // save the private key to a file
  47. _, err = io.WriteString(pkFile, privkeyHex)
  48. if err != nil {
  49. return err
  50. }
  51. // keep hex strings for topic and subtopic
  52. var topicHex string
  53. var subTopicHex string
  54. // and create combination hex topics for bzz-feed retrieval
  55. // xor'ed with topic (zero-value topic if no topic)
  56. var subTopicOnlyHex string
  57. var mergedSubTopicHex string
  58. // generate random topic and subtopic and put a hex on them
  59. topicBytes, err := generateRandomData(feed.TopicLength)
  60. topicHex = hexutil.Encode(topicBytes)
  61. subTopicBytes, err := generateRandomData(8)
  62. subTopicHex = hexutil.Encode(subTopicBytes)
  63. if err != nil {
  64. return err
  65. }
  66. mergedSubTopic, err := feed.NewTopic(subTopicHex, topicBytes)
  67. if err != nil {
  68. return err
  69. }
  70. mergedSubTopicHex = hexutil.Encode(mergedSubTopic[:])
  71. subTopicOnlyBytes, err := feed.NewTopic(subTopicHex, nil)
  72. if err != nil {
  73. return err
  74. }
  75. subTopicOnlyHex = hexutil.Encode(subTopicOnlyBytes[:])
  76. // create feed manifest, topic only
  77. var out bytes.Buffer
  78. cmd := exec.Command("swarm", "--bzzapi", endpoints[0], "feed", "create", "--topic", topicHex, "--user", userHex)
  79. cmd.Stdout = &out
  80. log.Debug("create feed manifest topic cmd", "cmd", cmd)
  81. err = cmd.Run()
  82. if err != nil {
  83. return err
  84. }
  85. manifestWithTopic := strings.TrimRight(out.String(), string([]byte{0x0a}))
  86. if len(manifestWithTopic) != 64 {
  87. return fmt.Errorf("unknown feed create manifest hash format (topic): (%d) %s", len(out.String()), manifestWithTopic)
  88. }
  89. log.Debug("create topic feed", "manifest", manifestWithTopic)
  90. out.Reset()
  91. // create feed manifest, subtopic only
  92. cmd = exec.Command("swarm", "--bzzapi", endpoints[0], "feed", "create", "--name", subTopicHex, "--user", userHex)
  93. cmd.Stdout = &out
  94. log.Debug("create feed manifest subtopic cmd", "cmd", cmd)
  95. err = cmd.Run()
  96. if err != nil {
  97. return err
  98. }
  99. manifestWithSubTopic := strings.TrimRight(out.String(), string([]byte{0x0a}))
  100. if len(manifestWithSubTopic) != 64 {
  101. return fmt.Errorf("unknown feed create manifest hash format (subtopic): (%d) %s", len(out.String()), manifestWithSubTopic)
  102. }
  103. log.Debug("create subtopic feed", "manifest", manifestWithTopic)
  104. out.Reset()
  105. // create feed manifest, merged topic
  106. cmd = exec.Command("swarm", "--bzzapi", endpoints[0], "feed", "create", "--topic", topicHex, "--name", subTopicHex, "--user", userHex)
  107. cmd.Stdout = &out
  108. log.Debug("create feed manifest mergetopic cmd", "cmd", cmd)
  109. err = cmd.Run()
  110. if err != nil {
  111. log.Error(err.Error())
  112. return err
  113. }
  114. manifestWithMergedTopic := strings.TrimRight(out.String(), string([]byte{0x0a}))
  115. if len(manifestWithMergedTopic) != 64 {
  116. return fmt.Errorf("unknown feed create manifest hash format (mergedtopic): (%d) %s", len(out.String()), manifestWithMergedTopic)
  117. }
  118. log.Debug("create mergedtopic feed", "manifest", manifestWithMergedTopic)
  119. out.Reset()
  120. // create test data
  121. data, err := generateRandomData(feedRandomDataLength)
  122. if err != nil {
  123. return err
  124. }
  125. h := md5.New()
  126. h.Write(data)
  127. dataHash := h.Sum(nil)
  128. dataHex := hexutil.Encode(data)
  129. // update with topic
  130. cmd = exec.Command("swarm", "--bzzaccount", pkFile.Name(), "--bzzapi", endpoints[0], "feed", "update", "--topic", topicHex, dataHex)
  131. cmd.Stdout = &out
  132. log.Debug("update feed manifest topic cmd", "cmd", cmd)
  133. err = cmd.Run()
  134. if err != nil {
  135. return err
  136. }
  137. log.Debug("feed update topic", "out", out)
  138. out.Reset()
  139. // update with subtopic
  140. cmd = exec.Command("swarm", "--bzzaccount", pkFile.Name(), "--bzzapi", endpoints[0], "feed", "update", "--name", subTopicHex, dataHex)
  141. cmd.Stdout = &out
  142. log.Debug("update feed manifest subtopic cmd", "cmd", cmd)
  143. err = cmd.Run()
  144. if err != nil {
  145. return err
  146. }
  147. log.Debug("feed update subtopic", "out", out)
  148. out.Reset()
  149. // update with merged topic
  150. cmd = exec.Command("swarm", "--bzzaccount", pkFile.Name(), "--bzzapi", endpoints[0], "feed", "update", "--topic", topicHex, "--name", subTopicHex, dataHex)
  151. cmd.Stdout = &out
  152. log.Debug("update feed manifest merged topic cmd", "cmd", cmd)
  153. err = cmd.Run()
  154. if err != nil {
  155. return err
  156. }
  157. log.Debug("feed update mergedtopic", "out", out)
  158. out.Reset()
  159. time.Sleep(3 * time.Second)
  160. // retrieve the data
  161. wg := sync.WaitGroup{}
  162. for _, endpoint := range endpoints {
  163. // raw retrieve, topic only
  164. for _, hex := range []string{topicHex, subTopicOnlyHex, mergedSubTopicHex} {
  165. wg.Add(1)
  166. ruid := uuid.New()[:8]
  167. go func(endpoint string, ruid string) {
  168. for {
  169. err := fetchFeed(hex, userHex, endpoint, dataHash, ruid)
  170. if err != nil {
  171. continue
  172. }
  173. wg.Done()
  174. return
  175. }
  176. }(endpoint, ruid)
  177. }
  178. }
  179. wg.Wait()
  180. log.Info("all endpoints synced random data successfully")
  181. // upload test file
  182. log.Info("uploading to " + endpoints[0] + " and syncing")
  183. f, cleanup := generateRandomFile(filesize * 1000)
  184. defer cleanup()
  185. hash, err := upload(f, endpoints[0])
  186. if err != nil {
  187. return err
  188. }
  189. hashBytes, err := hexutil.Decode("0x" + hash)
  190. if err != nil {
  191. return err
  192. }
  193. multihashHex := hexutil.Encode(multihash.ToMultihash(hashBytes))
  194. fileHash, err := digest(f)
  195. if err != nil {
  196. return err
  197. }
  198. log.Info("uploaded successfully", "hash", hash, "digest", fmt.Sprintf("%x", fileHash))
  199. // update file with topic
  200. cmd = exec.Command("swarm", "--bzzaccount", pkFile.Name(), "--bzzapi", endpoints[0], "feed", "update", "--topic", topicHex, multihashHex)
  201. cmd.Stdout = &out
  202. err = cmd.Run()
  203. if err != nil {
  204. return err
  205. }
  206. log.Debug("feed update topic", "out", out)
  207. out.Reset()
  208. // update file with subtopic
  209. cmd = exec.Command("swarm", "--bzzaccount", pkFile.Name(), "--bzzapi", endpoints[0], "feed", "update", "--name", subTopicHex, multihashHex)
  210. cmd.Stdout = &out
  211. err = cmd.Run()
  212. if err != nil {
  213. return err
  214. }
  215. log.Debug("feed update subtopic", "out", out)
  216. out.Reset()
  217. // update file with merged topic
  218. cmd = exec.Command("swarm", "--bzzaccount", pkFile.Name(), "--bzzapi", endpoints[0], "feed", "update", "--topic", topicHex, "--name", subTopicHex, multihashHex)
  219. cmd.Stdout = &out
  220. err = cmd.Run()
  221. if err != nil {
  222. return err
  223. }
  224. log.Debug("feed update mergedtopic", "out", out)
  225. out.Reset()
  226. time.Sleep(3 * time.Second)
  227. for _, endpoint := range endpoints {
  228. // manifest retrieve, topic only
  229. for _, url := range []string{manifestWithTopic, manifestWithSubTopic, manifestWithMergedTopic} {
  230. wg.Add(1)
  231. ruid := uuid.New()[:8]
  232. go func(endpoint string, ruid string) {
  233. for {
  234. err := fetch(url, endpoint, fileHash, ruid)
  235. if err != nil {
  236. continue
  237. }
  238. wg.Done()
  239. return
  240. }
  241. }(endpoint, ruid)
  242. }
  243. }
  244. wg.Wait()
  245. log.Info("all endpoints synced random file successfully")
  246. return nil
  247. }
  248. func fetchFeed(topic string, user string, endpoint string, original []byte, ruid string) error {
  249. log.Trace("sleeping", "ruid", ruid)
  250. time.Sleep(3 * time.Second)
  251. log.Trace("http get request (feed)", "ruid", ruid, "api", endpoint, "topic", topic, "user", user)
  252. res, err := http.Get(endpoint + "/bzz-feed:/?topic=" + topic + "&user=" + user)
  253. if err != nil {
  254. return err
  255. }
  256. log.Trace("http get response (feed)", "ruid", ruid, "api", endpoint, "topic", topic, "user", user, "code", res.StatusCode, "len", res.ContentLength)
  257. if res.StatusCode != 200 {
  258. return fmt.Errorf("expected status code %d, got %v (ruid %v)", 200, res.StatusCode, ruid)
  259. }
  260. defer res.Body.Close()
  261. rdigest, err := digest(res.Body)
  262. if err != nil {
  263. log.Warn(err.Error(), "ruid", ruid)
  264. return err
  265. }
  266. if !bytes.Equal(rdigest, original) {
  267. err := fmt.Errorf("downloaded imported file md5=%x is not the same as the generated one=%x", rdigest, original)
  268. log.Warn(err.Error(), "ruid", ruid)
  269. return err
  270. }
  271. log.Trace("downloaded file matches random file", "ruid", ruid, "len", res.ContentLength)
  272. return nil
  273. }