upload_and_sync.go 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207
  1. // Copyright 2018 The go-ethereum Authors
  2. // This file is part of go-ethereum.
  3. //
  4. // go-ethereum is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // go-ethereum is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU General Public License
  15. // along with go-ethereum. If not, see <http://www.gnu.org/licenses/>.
  16. package main
  17. import (
  18. "bytes"
  19. "context"
  20. "fmt"
  21. "io/ioutil"
  22. "math/rand"
  23. "os"
  24. "strings"
  25. "sync"
  26. "time"
  27. "github.com/ethereum/go-ethereum/log"
  28. "github.com/ethereum/go-ethereum/metrics"
  29. "github.com/ethereum/go-ethereum/rpc"
  30. "github.com/ethereum/go-ethereum/swarm/api"
  31. "github.com/ethereum/go-ethereum/swarm/storage"
  32. "github.com/ethereum/go-ethereum/swarm/testutil"
  33. "github.com/pborman/uuid"
  34. cli "gopkg.in/urfave/cli.v1"
  35. )
  36. func uploadAndSyncCmd(ctx *cli.Context, tuid string) error {
  37. // use input seed if it has been set
  38. if inputSeed != 0 {
  39. seed = inputSeed
  40. }
  41. randomBytes := testutil.RandomBytes(seed, filesize*1000)
  42. errc := make(chan error)
  43. go func() {
  44. errc <- uploadAndSync(ctx, randomBytes, tuid)
  45. }()
  46. var err error
  47. select {
  48. case err = <-errc:
  49. if err != nil {
  50. metrics.GetOrRegisterCounter(fmt.Sprintf("%s.fail", commandName), nil).Inc(1)
  51. }
  52. case <-time.After(time.Duration(timeout) * time.Second):
  53. metrics.GetOrRegisterCounter(fmt.Sprintf("%s.timeout", commandName), nil).Inc(1)
  54. err = fmt.Errorf("timeout after %v sec", timeout)
  55. }
  56. // trigger debug functionality on randomBytes
  57. e := trackChunks(randomBytes[:])
  58. if e != nil {
  59. log.Error(e.Error())
  60. }
  61. return err
  62. }
  63. func trackChunks(testData []byte) error {
  64. addrs, err := getAllRefs(testData)
  65. if err != nil {
  66. return err
  67. }
  68. for i, ref := range addrs {
  69. log.Trace(fmt.Sprintf("ref %d", i), "ref", ref)
  70. }
  71. for _, host := range hosts {
  72. httpHost := fmt.Sprintf("ws://%s:%d", host, 8546)
  73. hostChunks := []string{}
  74. rpcClient, err := rpc.Dial(httpHost)
  75. if err != nil {
  76. log.Error("error dialing host", "err", err, "host", httpHost)
  77. continue
  78. }
  79. var hasInfo []api.HasInfo
  80. err = rpcClient.Call(&hasInfo, "bzz_has", addrs)
  81. if err != nil {
  82. log.Error("error calling rpc client", "err", err, "host", httpHost)
  83. continue
  84. }
  85. count := 0
  86. for _, info := range hasInfo {
  87. if info.Has {
  88. hostChunks = append(hostChunks, "1")
  89. } else {
  90. hostChunks = append(hostChunks, "0")
  91. count++
  92. }
  93. }
  94. if count == 0 {
  95. log.Info("host reported to have all chunks", "host", host)
  96. }
  97. log.Trace("chunks", "chunks", strings.Join(hostChunks, ""), "host", host)
  98. }
  99. return nil
  100. }
  101. func getAllRefs(testData []byte) (storage.AddressCollection, error) {
  102. datadir, err := ioutil.TempDir("", "chunk-debug")
  103. if err != nil {
  104. return nil, fmt.Errorf("unable to create temp dir: %v", err)
  105. }
  106. defer os.RemoveAll(datadir)
  107. fileStore, err := storage.NewLocalFileStore(datadir, make([]byte, 32))
  108. if err != nil {
  109. return nil, err
  110. }
  111. ctx, cancel := context.WithTimeout(context.Background(), time.Duration(trackTimeout)*time.Second)
  112. defer cancel()
  113. reader := bytes.NewReader(testData)
  114. return fileStore.GetAllReferences(ctx, reader, false)
  115. }
  116. func uploadAndSync(c *cli.Context, randomBytes []byte, tuid string) error {
  117. log.Info("uploading to "+httpEndpoint(hosts[0])+" and syncing", "tuid", tuid, "seed", seed)
  118. t1 := time.Now()
  119. hash, err := upload(randomBytes, httpEndpoint(hosts[0]))
  120. if err != nil {
  121. log.Error(err.Error())
  122. return err
  123. }
  124. t2 := time.Since(t1)
  125. metrics.GetOrRegisterResettingTimer("upload-and-sync.upload-time", nil).Update(t2)
  126. fhash, err := digest(bytes.NewReader(randomBytes))
  127. if err != nil {
  128. log.Error(err.Error())
  129. return err
  130. }
  131. log.Info("uploaded successfully", "tuid", tuid, "hash", hash, "took", t2, "digest", fmt.Sprintf("%x", fhash))
  132. time.Sleep(time.Duration(syncDelay) * time.Second)
  133. wg := sync.WaitGroup{}
  134. if single {
  135. randIndex := 1 + rand.Intn(len(hosts)-1)
  136. ruid := uuid.New()[:8]
  137. wg.Add(1)
  138. go func(endpoint string, ruid string) {
  139. for {
  140. start := time.Now()
  141. err := fetch(hash, endpoint, fhash, ruid, tuid)
  142. if err != nil {
  143. continue
  144. }
  145. ended := time.Since(start)
  146. metrics.GetOrRegisterResettingTimer("upload-and-sync.single.fetch-time", nil).Update(ended)
  147. log.Info("fetch successful", "tuid", tuid, "ruid", ruid, "took", ended, "endpoint", endpoint)
  148. wg.Done()
  149. return
  150. }
  151. }(httpEndpoint(hosts[randIndex]), ruid)
  152. } else {
  153. for _, endpoint := range hosts[1:] {
  154. ruid := uuid.New()[:8]
  155. wg.Add(1)
  156. go func(endpoint string, ruid string) {
  157. for {
  158. start := time.Now()
  159. err := fetch(hash, endpoint, fhash, ruid, tuid)
  160. if err != nil {
  161. continue
  162. }
  163. ended := time.Since(start)
  164. metrics.GetOrRegisterResettingTimer("upload-and-sync.each.fetch-time", nil).Update(ended)
  165. log.Info("fetch successful", "tuid", tuid, "ruid", ruid, "took", ended, "endpoint", endpoint)
  166. wg.Done()
  167. return
  168. }
  169. }(httpEndpoint(endpoint), ruid)
  170. }
  171. }
  172. wg.Wait()
  173. log.Info("all hosts synced random file successfully")
  174. return nil
  175. }