upload_and_sync.go 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376
  1. // Copyright 2018 The go-ethereum Authors
  2. // This file is part of go-ethereum.
  3. //
  4. // go-ethereum is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // go-ethereum is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU General Public License
  15. // along with go-ethereum. If not, see <http://www.gnu.org/licenses/>.
  16. package main
  17. import (
  18. "bytes"
  19. "context"
  20. "encoding/hex"
  21. "fmt"
  22. "io/ioutil"
  23. "math/rand"
  24. "os"
  25. "strings"
  26. "sync"
  27. "sync/atomic"
  28. "time"
  29. "github.com/ethereum/go-ethereum/log"
  30. "github.com/ethereum/go-ethereum/metrics"
  31. "github.com/ethereum/go-ethereum/rpc"
  32. "github.com/ethereum/go-ethereum/swarm/chunk"
  33. "github.com/ethereum/go-ethereum/swarm/storage"
  34. "github.com/ethereum/go-ethereum/swarm/testutil"
  35. cli "gopkg.in/urfave/cli.v1"
  36. )
  37. func uploadAndSyncCmd(ctx *cli.Context) error {
  38. // use input seed if it has been set
  39. if inputSeed != 0 {
  40. seed = inputSeed
  41. }
  42. randomBytes := testutil.RandomBytes(seed, filesize*1000)
  43. errc := make(chan error)
  44. go func() {
  45. errc <- uploadAndSync(ctx, randomBytes)
  46. }()
  47. var err error
  48. select {
  49. case err = <-errc:
  50. if err != nil {
  51. metrics.GetOrRegisterCounter(fmt.Sprintf("%s.fail", commandName), nil).Inc(1)
  52. }
  53. case <-time.After(time.Duration(timeout) * time.Second):
  54. metrics.GetOrRegisterCounter(fmt.Sprintf("%s.timeout", commandName), nil).Inc(1)
  55. err = fmt.Errorf("timeout after %v sec", timeout)
  56. }
  57. // trigger debug functionality on randomBytes
  58. e := trackChunks(randomBytes[:], true)
  59. if e != nil {
  60. log.Error(e.Error())
  61. }
  62. return err
  63. }
  64. func trackChunks(testData []byte, submitMetrics bool) error {
  65. addrs, err := getAllRefs(testData)
  66. if err != nil {
  67. return err
  68. }
  69. for i, ref := range addrs {
  70. log.Debug(fmt.Sprintf("ref %d", i), "ref", ref)
  71. }
  72. var globalYes, globalNo int
  73. var globalMu sync.Mutex
  74. var hasErr bool
  75. var wg sync.WaitGroup
  76. wg.Add(len(hosts))
  77. var mu sync.Mutex // mutex protecting the allHostsChunks and bzzAddrs maps
  78. allHostChunks := map[string]string{} // host->bitvector of presence for chunks
  79. bzzAddrs := map[string]string{} // host->bzzAddr
  80. for _, host := range hosts {
  81. host := host
  82. go func() {
  83. defer wg.Done()
  84. httpHost := fmt.Sprintf("ws://%s:%d", host, 8546)
  85. ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
  86. defer cancel()
  87. rpcClient, err := rpc.DialContext(ctx, httpHost)
  88. if rpcClient != nil {
  89. defer rpcClient.Close()
  90. }
  91. if err != nil {
  92. log.Error("error dialing host", "err", err, "host", httpHost)
  93. hasErr = true
  94. return
  95. }
  96. hostChunks, err := getChunksBitVectorFromHost(rpcClient, addrs)
  97. if err != nil {
  98. log.Error("error getting chunks bit vector from host", "err", err, "host", httpHost)
  99. hasErr = true
  100. return
  101. }
  102. bzzAddr, err := getBzzAddrFromHost(rpcClient)
  103. if err != nil {
  104. log.Error("error getting bzz addrs from host", "err", err, "host", httpHost)
  105. hasErr = true
  106. return
  107. }
  108. mu.Lock()
  109. allHostChunks[host] = hostChunks
  110. bzzAddrs[host] = bzzAddr
  111. mu.Unlock()
  112. yes, no := 0, 0
  113. for _, val := range hostChunks {
  114. if val == '1' {
  115. yes++
  116. } else {
  117. no++
  118. }
  119. }
  120. if no == 0 {
  121. log.Info("host reported to have all chunks", "host", host)
  122. }
  123. log.Debug("chunks", "chunks", hostChunks, "yes", yes, "no", no, "host", host)
  124. if submitMetrics {
  125. globalMu.Lock()
  126. globalYes += yes
  127. globalNo += no
  128. globalMu.Unlock()
  129. }
  130. }()
  131. }
  132. wg.Wait()
  133. checkChunksVsMostProxHosts(addrs, allHostChunks, bzzAddrs)
  134. if !hasErr && submitMetrics {
  135. // remove the chunks stored on the uploader node
  136. globalYes -= len(addrs)
  137. metrics.GetOrRegisterCounter("deployment.chunks.yes", nil).Inc(int64(globalYes))
  138. metrics.GetOrRegisterCounter("deployment.chunks.no", nil).Inc(int64(globalNo))
  139. metrics.GetOrRegisterCounter("deployment.chunks.refs", nil).Inc(int64(len(addrs)))
  140. }
  141. return nil
  142. }
  143. // getChunksBitVectorFromHost returns a bit vector of presence for a given slice of chunks from a given host
  144. func getChunksBitVectorFromHost(client *rpc.Client, addrs []storage.Address) (string, error) {
  145. var hostChunks string
  146. err := client.Call(&hostChunks, "bzz_has", addrs)
  147. if err != nil {
  148. return "", err
  149. }
  150. return hostChunks, nil
  151. }
  152. // getBzzAddrFromHost returns the bzzAddr for a given host
  153. func getBzzAddrFromHost(client *rpc.Client) (string, error) {
  154. var hive string
  155. err := client.Call(&hive, "bzz_hive")
  156. if err != nil {
  157. return "", err
  158. }
  159. // we make an ugly assumption about the output format of the hive.String() method
  160. // ideally we should replace this with an API call that returns the bzz addr for a given host,
  161. // but this also works for now (provided we don't change the hive.String() method, which we haven't in some time
  162. ss := strings.Split(strings.Split(hive, "\n")[3], " ")
  163. return ss[len(ss)-1], nil
  164. }
  165. // checkChunksVsMostProxHosts is checking:
  166. // 1. whether a chunk has been found at less than 2 hosts. Considering our NN size, this should not happen.
  167. // 2. if a chunk is not found at its closest node. This should also not happen.
  168. // Together with the --only-upload flag, we could run this smoke test and make sure that our syncing
  169. // functionality is correct (without even trying to retrieve the content).
  170. //
  171. // addrs - a slice with all uploaded chunk refs
  172. // allHostChunks - host->bit vector, showing what chunks are present on what hosts
  173. // bzzAddrs - host->bzz address, used when determining the most proximate host for a given chunk
  174. func checkChunksVsMostProxHosts(addrs []storage.Address, allHostChunks map[string]string, bzzAddrs map[string]string) {
  175. for k, v := range bzzAddrs {
  176. log.Trace("bzzAddr", "bzz", v, "host", k)
  177. }
  178. for i := range addrs {
  179. var foundAt int
  180. maxProx := -1
  181. var maxProxHost string
  182. for host := range allHostChunks {
  183. if allHostChunks[host][i] == '1' {
  184. foundAt++
  185. }
  186. ba, err := hex.DecodeString(bzzAddrs[host])
  187. if err != nil {
  188. panic(err)
  189. }
  190. // calculate the host closest to any chunk
  191. prox := chunk.Proximity(addrs[i], ba)
  192. if prox > maxProx {
  193. maxProx = prox
  194. maxProxHost = host
  195. }
  196. }
  197. if allHostChunks[maxProxHost][i] == '0' {
  198. log.Error("chunk not found at max prox host", "ref", addrs[i], "host", maxProxHost, "bzzAddr", bzzAddrs[maxProxHost])
  199. } else {
  200. log.Trace("chunk present at max prox host", "ref", addrs[i], "host", maxProxHost, "bzzAddr", bzzAddrs[maxProxHost])
  201. }
  202. // if chunk found at less than 2 hosts
  203. if foundAt < 2 {
  204. log.Error("chunk found at less than two hosts", "foundAt", foundAt, "ref", addrs[i])
  205. }
  206. }
  207. }
  208. func getAllRefs(testData []byte) (storage.AddressCollection, error) {
  209. datadir, err := ioutil.TempDir("", "chunk-debug")
  210. if err != nil {
  211. return nil, fmt.Errorf("unable to create temp dir: %v", err)
  212. }
  213. defer os.RemoveAll(datadir)
  214. fileStore, err := storage.NewLocalFileStore(datadir, make([]byte, 32), chunk.NewTags())
  215. if err != nil {
  216. return nil, err
  217. }
  218. reader := bytes.NewReader(testData)
  219. return fileStore.GetAllReferences(context.Background(), reader, false)
  220. }
  221. func uploadAndSync(c *cli.Context, randomBytes []byte) error {
  222. log.Info("uploading to "+httpEndpoint(hosts[0])+" and syncing", "seed", seed)
  223. t1 := time.Now()
  224. hash, err := upload(randomBytes, httpEndpoint(hosts[0]))
  225. if err != nil {
  226. log.Error(err.Error())
  227. return err
  228. }
  229. t2 := time.Since(t1)
  230. metrics.GetOrRegisterResettingTimer("upload-and-sync.upload-time", nil).Update(t2)
  231. fhash, err := digest(bytes.NewReader(randomBytes))
  232. if err != nil {
  233. log.Error(err.Error())
  234. return err
  235. }
  236. log.Info("uploaded successfully", "hash", hash, "took", t2, "digest", fmt.Sprintf("%x", fhash))
  237. // wait to sync and log chunks before fetch attempt, only if syncDelay is set to true
  238. if syncDelay {
  239. waitToSync()
  240. log.Debug("chunks before fetch attempt", "hash", hash)
  241. err = trackChunks(randomBytes, false)
  242. if err != nil {
  243. log.Error(err.Error())
  244. }
  245. }
  246. if onlyUpload {
  247. log.Debug("only-upload is true, stoppping test", "hash", hash)
  248. return nil
  249. }
  250. randIndex := 1 + rand.Intn(len(hosts)-1)
  251. for {
  252. start := time.Now()
  253. err := fetch(hash, httpEndpoint(hosts[randIndex]), fhash, "")
  254. if err != nil {
  255. time.Sleep(2 * time.Second)
  256. continue
  257. }
  258. ended := time.Since(start)
  259. metrics.GetOrRegisterResettingTimer("upload-and-sync.single.fetch-time", nil).Update(ended)
  260. log.Info("fetch successful", "took", ended, "endpoint", httpEndpoint(hosts[randIndex]))
  261. break
  262. }
  263. return nil
  264. }
  265. func isSyncing(wsHost string) (bool, error) {
  266. rpcClient, err := rpc.Dial(wsHost)
  267. if rpcClient != nil {
  268. defer rpcClient.Close()
  269. }
  270. if err != nil {
  271. log.Error("error dialing host", "err", err)
  272. return false, err
  273. }
  274. var isSyncing bool
  275. err = rpcClient.Call(&isSyncing, "bzz_isSyncing")
  276. if err != nil {
  277. log.Error("error calling host for isSyncing", "err", err)
  278. return false, err
  279. }
  280. log.Debug("isSyncing result", "host", wsHost, "isSyncing", isSyncing)
  281. return isSyncing, nil
  282. }
  283. func waitToSync() {
  284. t1 := time.Now()
  285. ns := uint64(1)
  286. for ns > 0 {
  287. time.Sleep(3 * time.Second)
  288. notSynced := uint64(0)
  289. var wg sync.WaitGroup
  290. wg.Add(len(hosts))
  291. for i := 0; i < len(hosts); i++ {
  292. i := i
  293. go func(idx int) {
  294. stillSyncing, err := isSyncing(wsEndpoint(hosts[idx]))
  295. if stillSyncing || err != nil {
  296. atomic.AddUint64(&notSynced, 1)
  297. }
  298. wg.Done()
  299. }(i)
  300. }
  301. wg.Wait()
  302. ns = atomic.LoadUint64(&notSynced)
  303. }
  304. t2 := time.Since(t1)
  305. metrics.GetOrRegisterResettingTimer("upload-and-sync.single.wait-for-sync.deployment", nil).Update(t2)
  306. }