feed_upload_and_sync.go 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291
  1. package main
  2. import (
  3. "bytes"
  4. "crypto/md5"
  5. "fmt"
  6. "io"
  7. "io/ioutil"
  8. "os"
  9. "os/exec"
  10. "strings"
  11. "sync"
  12. "time"
  13. "github.com/ethereum/go-ethereum/common/hexutil"
  14. "github.com/ethereum/go-ethereum/crypto"
  15. "github.com/ethereum/go-ethereum/log"
  16. "github.com/ethereum/go-ethereum/metrics"
  17. "github.com/ethereum/go-ethereum/swarm/storage/feed"
  18. "github.com/ethereum/go-ethereum/swarm/testutil"
  19. "github.com/pborman/uuid"
  20. cli "gopkg.in/urfave/cli.v1"
  21. )
  22. const (
  23. feedRandomDataLength = 8
  24. )
  25. func feedUploadAndSyncCmd(ctx *cli.Context) error {
  26. errc := make(chan error)
  27. go func() {
  28. errc <- feedUploadAndSync(ctx)
  29. }()
  30. select {
  31. case err := <-errc:
  32. if err != nil {
  33. metrics.GetOrRegisterCounter(fmt.Sprintf("%s.fail", commandName), nil).Inc(1)
  34. }
  35. return err
  36. case <-time.After(time.Duration(timeout) * time.Second):
  37. metrics.GetOrRegisterCounter(fmt.Sprintf("%s.timeout", commandName), nil).Inc(1)
  38. return fmt.Errorf("timeout after %v sec", timeout)
  39. }
  40. }
  41. func feedUploadAndSync(c *cli.Context) error {
  42. log.Info("generating and uploading feeds to " + httpEndpoint(hosts[0]) + " and syncing")
  43. // create a random private key to sign updates with and derive the address
  44. pkFile, err := ioutil.TempFile("", "swarm-feed-smoke-test")
  45. if err != nil {
  46. return err
  47. }
  48. defer pkFile.Close()
  49. defer os.Remove(pkFile.Name())
  50. privkeyHex := "0000000000000000000000000000000000000000000000000000000000001976"
  51. privKey, err := crypto.HexToECDSA(privkeyHex)
  52. if err != nil {
  53. return err
  54. }
  55. user := crypto.PubkeyToAddress(privKey.PublicKey)
  56. userHex := hexutil.Encode(user.Bytes())
  57. // save the private key to a file
  58. _, err = io.WriteString(pkFile, privkeyHex)
  59. if err != nil {
  60. return err
  61. }
  62. // keep hex strings for topic and subtopic
  63. var topicHex string
  64. var subTopicHex string
  65. // and create combination hex topics for bzz-feed retrieval
  66. // xor'ed with topic (zero-value topic if no topic)
  67. var subTopicOnlyHex string
  68. var mergedSubTopicHex string
  69. // generate random topic and subtopic and put a hex on them
  70. topicBytes, err := generateRandomData(feed.TopicLength)
  71. topicHex = hexutil.Encode(topicBytes)
  72. subTopicBytes, err := generateRandomData(8)
  73. subTopicHex = hexutil.Encode(subTopicBytes)
  74. if err != nil {
  75. return err
  76. }
  77. mergedSubTopic, err := feed.NewTopic(subTopicHex, topicBytes)
  78. if err != nil {
  79. return err
  80. }
  81. mergedSubTopicHex = hexutil.Encode(mergedSubTopic[:])
  82. subTopicOnlyBytes, err := feed.NewTopic(subTopicHex, nil)
  83. if err != nil {
  84. return err
  85. }
  86. subTopicOnlyHex = hexutil.Encode(subTopicOnlyBytes[:])
  87. // create feed manifest, topic only
  88. var out bytes.Buffer
  89. cmd := exec.Command("swarm", "--bzzapi", httpEndpoint(hosts[0]), "feed", "create", "--topic", topicHex, "--user", userHex)
  90. cmd.Stdout = &out
  91. log.Debug("create feed manifest topic cmd", "cmd", cmd)
  92. err = cmd.Run()
  93. if err != nil {
  94. return err
  95. }
  96. manifestWithTopic := strings.TrimRight(out.String(), string([]byte{0x0a}))
  97. if len(manifestWithTopic) != 64 {
  98. return fmt.Errorf("unknown feed create manifest hash format (topic): (%d) %s", len(out.String()), manifestWithTopic)
  99. }
  100. log.Debug("create topic feed", "manifest", manifestWithTopic)
  101. out.Reset()
  102. // create feed manifest, subtopic only
  103. cmd = exec.Command("swarm", "--bzzapi", httpEndpoint(hosts[0]), "feed", "create", "--name", subTopicHex, "--user", userHex)
  104. cmd.Stdout = &out
  105. log.Debug("create feed manifest subtopic cmd", "cmd", cmd)
  106. err = cmd.Run()
  107. if err != nil {
  108. return err
  109. }
  110. manifestWithSubTopic := strings.TrimRight(out.String(), string([]byte{0x0a}))
  111. if len(manifestWithSubTopic) != 64 {
  112. return fmt.Errorf("unknown feed create manifest hash format (subtopic): (%d) %s", len(out.String()), manifestWithSubTopic)
  113. }
  114. log.Debug("create subtopic feed", "manifest", manifestWithTopic)
  115. out.Reset()
  116. // create feed manifest, merged topic
  117. cmd = exec.Command("swarm", "--bzzapi", httpEndpoint(hosts[0]), "feed", "create", "--topic", topicHex, "--name", subTopicHex, "--user", userHex)
  118. cmd.Stdout = &out
  119. log.Debug("create feed manifest mergetopic cmd", "cmd", cmd)
  120. err = cmd.Run()
  121. if err != nil {
  122. log.Error(err.Error())
  123. return err
  124. }
  125. manifestWithMergedTopic := strings.TrimRight(out.String(), string([]byte{0x0a}))
  126. if len(manifestWithMergedTopic) != 64 {
  127. return fmt.Errorf("unknown feed create manifest hash format (mergedtopic): (%d) %s", len(out.String()), manifestWithMergedTopic)
  128. }
  129. log.Debug("create mergedtopic feed", "manifest", manifestWithMergedTopic)
  130. out.Reset()
  131. // create test data
  132. data, err := generateRandomData(feedRandomDataLength)
  133. if err != nil {
  134. return err
  135. }
  136. h := md5.New()
  137. h.Write(data)
  138. dataHash := h.Sum(nil)
  139. dataHex := hexutil.Encode(data)
  140. // update with topic
  141. cmd = exec.Command("swarm", "--bzzaccount", pkFile.Name(), "--bzzapi", httpEndpoint(hosts[0]), "feed", "update", "--topic", topicHex, dataHex)
  142. cmd.Stdout = &out
  143. log.Debug("update feed manifest topic cmd", "cmd", cmd)
  144. err = cmd.Run()
  145. if err != nil {
  146. return err
  147. }
  148. log.Debug("feed update topic", "out", out)
  149. out.Reset()
  150. // update with subtopic
  151. cmd = exec.Command("swarm", "--bzzaccount", pkFile.Name(), "--bzzapi", httpEndpoint(hosts[0]), "feed", "update", "--name", subTopicHex, dataHex)
  152. cmd.Stdout = &out
  153. log.Debug("update feed manifest subtopic cmd", "cmd", cmd)
  154. err = cmd.Run()
  155. if err != nil {
  156. return err
  157. }
  158. log.Debug("feed update subtopic", "out", out)
  159. out.Reset()
  160. // update with merged topic
  161. cmd = exec.Command("swarm", "--bzzaccount", pkFile.Name(), "--bzzapi", httpEndpoint(hosts[0]), "feed", "update", "--topic", topicHex, "--name", subTopicHex, dataHex)
  162. cmd.Stdout = &out
  163. log.Debug("update feed manifest merged topic cmd", "cmd", cmd)
  164. err = cmd.Run()
  165. if err != nil {
  166. return err
  167. }
  168. log.Debug("feed update mergedtopic", "out", out)
  169. out.Reset()
  170. time.Sleep(3 * time.Second)
  171. // retrieve the data
  172. wg := sync.WaitGroup{}
  173. for _, host := range hosts {
  174. // raw retrieve, topic only
  175. for _, hex := range []string{topicHex, subTopicOnlyHex, mergedSubTopicHex} {
  176. wg.Add(1)
  177. ruid := uuid.New()[:8]
  178. go func(hex string, endpoint string, ruid string) {
  179. for {
  180. err := fetchFeed(hex, userHex, httpEndpoint(host), dataHash, ruid)
  181. if err != nil {
  182. continue
  183. }
  184. wg.Done()
  185. return
  186. }
  187. }(hex, httpEndpoint(host), ruid)
  188. }
  189. }
  190. wg.Wait()
  191. log.Info("all endpoints synced random data successfully")
  192. // upload test file
  193. log.Info("feed uploading to "+httpEndpoint(hosts[0])+" and syncing", "seed", seed)
  194. randomBytes := testutil.RandomBytes(seed, filesize*1000)
  195. hash, err := upload(randomBytes, httpEndpoint(hosts[0]))
  196. if err != nil {
  197. return err
  198. }
  199. hashBytes, err := hexutil.Decode("0x" + hash)
  200. if err != nil {
  201. return err
  202. }
  203. multihashHex := hexutil.Encode(hashBytes)
  204. fileHash := h.Sum(nil)
  205. log.Info("uploaded successfully", "hash", hash, "digest", fmt.Sprintf("%x", fileHash))
  206. // update file with topic
  207. cmd = exec.Command("swarm", "--bzzaccount", pkFile.Name(), "--bzzapi", httpEndpoint(hosts[0]), "feed", "update", "--topic", topicHex, multihashHex)
  208. cmd.Stdout = &out
  209. err = cmd.Run()
  210. if err != nil {
  211. return err
  212. }
  213. log.Debug("feed update topic", "out", out)
  214. out.Reset()
  215. // update file with subtopic
  216. cmd = exec.Command("swarm", "--bzzaccount", pkFile.Name(), "--bzzapi", httpEndpoint(hosts[0]), "feed", "update", "--name", subTopicHex, multihashHex)
  217. cmd.Stdout = &out
  218. err = cmd.Run()
  219. if err != nil {
  220. return err
  221. }
  222. log.Debug("feed update subtopic", "out", out)
  223. out.Reset()
  224. // update file with merged topic
  225. cmd = exec.Command("swarm", "--bzzaccount", pkFile.Name(), "--bzzapi", httpEndpoint(hosts[0]), "feed", "update", "--topic", topicHex, "--name", subTopicHex, multihashHex)
  226. cmd.Stdout = &out
  227. err = cmd.Run()
  228. if err != nil {
  229. return err
  230. }
  231. log.Debug("feed update mergedtopic", "out", out)
  232. out.Reset()
  233. time.Sleep(3 * time.Second)
  234. for _, host := range hosts {
  235. // manifest retrieve, topic only
  236. for _, url := range []string{manifestWithTopic, manifestWithSubTopic, manifestWithMergedTopic} {
  237. wg.Add(1)
  238. ruid := uuid.New()[:8]
  239. go func(url string, endpoint string, ruid string) {
  240. for {
  241. err := fetch(url, endpoint, fileHash, ruid)
  242. if err != nil {
  243. continue
  244. }
  245. wg.Done()
  246. return
  247. }
  248. }(url, httpEndpoint(host), ruid)
  249. }
  250. }
  251. wg.Wait()
  252. log.Info("all endpoints synced random file successfully")
  253. return nil
  254. }