snapshot_sync_test.go 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723
  1. // Copyright 2018 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package stream
  17. import (
  18. "context"
  19. crand "crypto/rand"
  20. "encoding/json"
  21. "flag"
  22. "fmt"
  23. "io"
  24. "io/ioutil"
  25. "math/rand"
  26. "os"
  27. "sync"
  28. "testing"
  29. "time"
  30. "github.com/ethereum/go-ethereum/common"
  31. "github.com/ethereum/go-ethereum/log"
  32. "github.com/ethereum/go-ethereum/p2p"
  33. "github.com/ethereum/go-ethereum/p2p/discover"
  34. "github.com/ethereum/go-ethereum/p2p/simulations"
  35. "github.com/ethereum/go-ethereum/p2p/simulations/adapters"
  36. "github.com/ethereum/go-ethereum/rpc"
  37. "github.com/ethereum/go-ethereum/swarm/network"
  38. streamTesting "github.com/ethereum/go-ethereum/swarm/network/stream/testing"
  39. "github.com/ethereum/go-ethereum/swarm/pot"
  40. "github.com/ethereum/go-ethereum/swarm/storage"
  41. )
  42. const testMinProxBinSize = 2
  43. const MaxTimeout = 600
  44. var (
  45. pof = pot.DefaultPof(256)
  46. conf *synctestConfig
  47. ids []discover.NodeID
  48. datadirs map[discover.NodeID]string
  49. ppmap map[string]*network.PeerPot
  50. live bool
  51. history bool
  52. longrunning = flag.Bool("longrunning", false, "do run long-running tests")
  53. )
  54. type synctestConfig struct {
  55. addrs [][]byte
  56. hashes []storage.Address
  57. idToChunksMap map[discover.NodeID][]int
  58. chunksToNodesMap map[string][]int
  59. addrToIdMap map[string]discover.NodeID
  60. }
  61. func init() {
  62. rand.Seed(time.Now().Unix())
  63. }
  64. //common_test needs to initialize the test in a init() func
  65. //in order for adapters to register the NewStreamerService;
  66. //this service is dependent on some global variables
  67. //we thus need to initialize first as init() as well.
  68. func initSyncTest() {
  69. //assign the toAddr func so NewStreamerService can build the addr
  70. toAddr = func(id discover.NodeID) *network.BzzAddr {
  71. addr := network.NewAddrFromNodeID(id)
  72. return addr
  73. }
  74. //global func to create local store
  75. if *useMockStore {
  76. createStoreFunc = createMockStore
  77. } else {
  78. createStoreFunc = createTestLocalStorageForId
  79. }
  80. //local stores
  81. stores = make(map[discover.NodeID]storage.ChunkStore)
  82. //data directories for each node and store
  83. datadirs = make(map[discover.NodeID]string)
  84. //deliveries for each node
  85. deliveries = make(map[discover.NodeID]*Delivery)
  86. //registries, map of discover.NodeID to its streamer
  87. registries = make(map[discover.NodeID]*TestRegistry)
  88. //not needed for this test but required from common_test for NewStreamService
  89. waitPeerErrC = make(chan error)
  90. //also not needed for this test but required for NewStreamService
  91. peerCount = func(id discover.NodeID) int {
  92. if ids[0] == id || ids[len(ids)-1] == id {
  93. return 1
  94. }
  95. return 2
  96. }
  97. if *useMockStore {
  98. createGlobalStore()
  99. }
  100. }
  101. //This test is a syncing test for nodes.
  102. //One node is randomly selected to be the pivot node.
  103. //A configurable number of chunks and nodes can be
  104. //provided to the test, the number of chunks is uploaded
  105. //to the pivot node, and we check that nodes get the chunks
  106. //they are expected to store based on the syncing protocol.
  107. //Number of chunks and nodes can be provided via commandline too.
  108. func TestSyncing(t *testing.T) {
  109. //if nodes/chunks have been provided via commandline,
  110. //run the tests with these values
  111. if *nodes != 0 && *chunks != 0 {
  112. log.Info(fmt.Sprintf("Running test with %d chunks and %d nodes...", *chunks, *nodes))
  113. testSyncing(t, *chunks, *nodes)
  114. } else {
  115. var nodeCnt []int
  116. var chnkCnt []int
  117. //if the `longrunning` flag has been provided
  118. //run more test combinations
  119. if *longrunning {
  120. chnkCnt = []int{1, 8, 32, 256, 1024}
  121. nodeCnt = []int{16, 32, 64, 128, 256}
  122. } else {
  123. //default test
  124. chnkCnt = []int{4, 32}
  125. nodeCnt = []int{32, 16}
  126. }
  127. for _, chnk := range chnkCnt {
  128. for _, n := range nodeCnt {
  129. log.Info(fmt.Sprintf("Long running test with %d chunks and %d nodes...", chnk, n))
  130. testSyncing(t, chnk, n)
  131. }
  132. }
  133. }
  134. }
  135. //Do run the tests
  136. //Every test runs 3 times, a live, a history, and a live AND history
  137. func testSyncing(t *testing.T, chunkCount int, nodeCount int) {
  138. //test live and NO history
  139. log.Info("Testing live and no history")
  140. live = true
  141. history = false
  142. err := runSyncTest(chunkCount, nodeCount, live, history)
  143. if err != nil {
  144. t.Fatal(err)
  145. }
  146. //test history only
  147. log.Info("Testing history only")
  148. live = false
  149. history = true
  150. err = runSyncTest(chunkCount, nodeCount, live, history)
  151. if err != nil {
  152. t.Fatal(err)
  153. }
  154. //finally test live and history
  155. log.Info("Testing live and history")
  156. live = true
  157. err = runSyncTest(chunkCount, nodeCount, live, history)
  158. if err != nil {
  159. t.Fatal(err)
  160. }
  161. }
  162. /*
  163. The test generates the given number of chunks
  164. The upload is done by dependency to the global
  165. `live` and `history` variables;
  166. If `live` is set, first stream subscriptions are established, then
  167. upload to a random node.
  168. If `history` is enabled, first upload then build up subscriptions.
  169. For every chunk generated, the nearest node addresses
  170. are identified, we verify that the nodes closer to the
  171. chunk addresses actually do have the chunks in their local stores.
  172. The test loads a snapshot file to construct the swarm network,
  173. assuming that the snapshot file identifies a healthy
  174. kademlia network. The snapshot should have 'streamer' in its service list.
  175. For every test run, a series of three tests will be executed:
  176. - a LIVE test first, where first subscriptions are established,
  177. then a file (random chunks) is uploaded
  178. - a HISTORY test, where the file is uploaded first, and then
  179. the subscriptions are established
  180. - a crude LIVE AND HISTORY test last, where (different) chunks
  181. are uploaded twice, once before and once after subscriptions
  182. */
  183. func runSyncTest(chunkCount int, nodeCount int, live bool, history bool) error {
  184. initSyncTest()
  185. //the ids of the snapshot nodes, initiate only now as we need nodeCount
  186. ids = make([]discover.NodeID, nodeCount)
  187. //initialize the test struct
  188. conf = &synctestConfig{}
  189. //map of discover ID to indexes of chunks expected at that ID
  190. conf.idToChunksMap = make(map[discover.NodeID][]int)
  191. //map of overlay address to discover ID
  192. conf.addrToIdMap = make(map[string]discover.NodeID)
  193. //array where the generated chunk hashes will be stored
  194. conf.hashes = make([]storage.Address, 0)
  195. //channel to trigger node checks in the simulation
  196. trigger := make(chan discover.NodeID)
  197. //channel to check for disconnection errors
  198. disconnectC := make(chan error)
  199. //channel to close disconnection watcher routine
  200. quitC := make(chan struct{})
  201. //load nodes from the snapshot file
  202. net, err := initNetWithSnapshot(nodeCount)
  203. if err != nil {
  204. return err
  205. }
  206. var rpcSubscriptionsWg sync.WaitGroup
  207. //do cleanup after test is terminated
  208. defer func() {
  209. // close quitC channel to signall all goroutines to clanup
  210. // before calling simulation network shutdown.
  211. close(quitC)
  212. //wait for all rpc subscriptions to unsubscribe
  213. rpcSubscriptionsWg.Wait()
  214. //shutdown the snapshot network
  215. net.Shutdown()
  216. //after the test, clean up local stores initialized with createLocalStoreForId
  217. localStoreCleanup()
  218. //finally clear all data directories
  219. datadirsCleanup()
  220. }()
  221. //get the nodes of the network
  222. nodes := net.GetNodes()
  223. //select one index at random...
  224. idx := rand.Intn(len(nodes))
  225. //...and get the the node at that index
  226. //this is the node selected for upload
  227. node := nodes[idx]
  228. log.Info("Initializing test config")
  229. //iterate over all nodes...
  230. for c := 0; c < len(nodes); c++ {
  231. //create an array of discovery node IDs
  232. ids[c] = nodes[c].ID()
  233. //get the kademlia overlay address from this ID
  234. a := network.ToOverlayAddr(ids[c].Bytes())
  235. //append it to the array of all overlay addresses
  236. conf.addrs = append(conf.addrs, a)
  237. //the proximity calculation is on overlay addr,
  238. //the p2p/simulations check func triggers on discover.NodeID,
  239. //so we need to know which overlay addr maps to which nodeID
  240. conf.addrToIdMap[string(a)] = ids[c]
  241. }
  242. log.Info("Test config successfully initialized")
  243. //only needed for healthy call when debugging
  244. ppmap = network.NewPeerPotMap(testMinProxBinSize, conf.addrs)
  245. //define the action to be performed before the test checks: start syncing
  246. action := func(ctx context.Context) error {
  247. //first run the health check on all nodes,
  248. //wait until nodes are all healthy
  249. ticker := time.NewTicker(200 * time.Millisecond)
  250. defer ticker.Stop()
  251. for range ticker.C {
  252. healthy := true
  253. for _, id := range ids {
  254. r := registries[id]
  255. //PeerPot for this node
  256. addr := common.Bytes2Hex(network.ToOverlayAddr(id.Bytes()))
  257. pp := ppmap[addr]
  258. //call Healthy RPC
  259. h := r.delivery.overlay.Healthy(pp)
  260. //print info
  261. log.Debug(r.delivery.overlay.String())
  262. log.Debug(fmt.Sprintf("IS HEALTHY: %t", h.GotNN && h.KnowNN && h.Full))
  263. if !h.GotNN || !h.Full {
  264. healthy = false
  265. break
  266. }
  267. }
  268. if healthy {
  269. break
  270. }
  271. }
  272. if history {
  273. log.Info("Uploading for history")
  274. //If testing only history, we upload the chunk(s) first
  275. chunks, err := uploadFileToSingleNodeStore(node.ID(), chunkCount)
  276. if err != nil {
  277. return err
  278. }
  279. conf.hashes = append(conf.hashes, chunks...)
  280. //finally map chunks to the closest addresses
  281. mapKeysToNodes(conf)
  282. }
  283. //variables needed to wait for all subscriptions established before uploading
  284. errc := make(chan error)
  285. //now setup and start event watching in order to know when we can upload
  286. ctx, watchCancel := context.WithTimeout(context.Background(), MaxTimeout*time.Second)
  287. defer watchCancel()
  288. log.Info("Setting up stream subscription")
  289. //We need two iterations, one to subscribe to the subscription events
  290. //(so we know when setup phase is finished), and one to
  291. //actually run the stream subscriptions. We can't do it in the same iteration,
  292. //because while the first nodes in the loop are setting up subscriptions,
  293. //the latter ones have not subscribed to listen to peer events yet,
  294. //and then we miss events.
  295. //first iteration: setup disconnection watcher and subscribe to peer events
  296. for j, id := range ids {
  297. log.Trace(fmt.Sprintf("Subscribe to subscription events: %d", j))
  298. client, err := net.GetNode(id).Client()
  299. if err != nil {
  300. return err
  301. }
  302. wsDoneC := watchSubscriptionEvents(ctx, id, client, errc, quitC)
  303. // doneC is nil, the error happened which is sent to errc channel, already
  304. if wsDoneC == nil {
  305. continue
  306. }
  307. rpcSubscriptionsWg.Add(1)
  308. go func() {
  309. <-wsDoneC
  310. rpcSubscriptionsWg.Done()
  311. }()
  312. //watch for peers disconnecting
  313. wdDoneC, err := streamTesting.WatchDisconnections(id, client, disconnectC, quitC)
  314. if err != nil {
  315. return err
  316. }
  317. rpcSubscriptionsWg.Add(1)
  318. go func() {
  319. <-wdDoneC
  320. rpcSubscriptionsWg.Done()
  321. }()
  322. }
  323. //second iteration: start syncing
  324. for j, id := range ids {
  325. log.Trace(fmt.Sprintf("Start syncing subscriptions: %d", j))
  326. client, err := net.GetNode(id).Client()
  327. if err != nil {
  328. return err
  329. }
  330. //start syncing!
  331. var cnt int
  332. err = client.CallContext(ctx, &cnt, "stream_startSyncing")
  333. if err != nil {
  334. return err
  335. }
  336. //increment the number of subscriptions we need to wait for
  337. //by the count returned from startSyncing (SYNC subscriptions)
  338. subscriptionCount += cnt
  339. }
  340. //now wait until the number of expected subscriptions has been finished
  341. //`watchSubscriptionEvents` will write with a `nil` value to errc
  342. for err := range errc {
  343. if err != nil {
  344. return err
  345. }
  346. //`nil` received, decrement count
  347. subscriptionCount--
  348. //all subscriptions received
  349. if subscriptionCount == 0 {
  350. break
  351. }
  352. }
  353. log.Info("Stream subscriptions successfully requested")
  354. if live {
  355. //now upload the chunks to the selected random single node
  356. hashes, err := uploadFileToSingleNodeStore(node.ID(), chunkCount)
  357. if err != nil {
  358. return err
  359. }
  360. conf.hashes = append(conf.hashes, hashes...)
  361. //finally map chunks to the closest addresses
  362. log.Debug(fmt.Sprintf("Uploaded chunks for live syncing: %v", conf.hashes))
  363. mapKeysToNodes(conf)
  364. log.Info(fmt.Sprintf("Uploaded %d chunks to random single node", chunkCount))
  365. }
  366. log.Info("Action terminated")
  367. return nil
  368. }
  369. //check defines what will be checked during the test
  370. check := func(ctx context.Context, id discover.NodeID) (bool, error) {
  371. select {
  372. case <-ctx.Done():
  373. return false, ctx.Err()
  374. case e := <-disconnectC:
  375. log.Error(e.Error())
  376. return false, fmt.Errorf("Disconnect event detected, network unhealthy")
  377. default:
  378. }
  379. log.Trace(fmt.Sprintf("Checking node: %s", id))
  380. //select the local store for the given node
  381. //if there are more than one chunk, test only succeeds if all expected chunks are found
  382. allSuccess := true
  383. //all the chunk indexes which are supposed to be found for this node
  384. localChunks := conf.idToChunksMap[id]
  385. //for each expected chunk, check if it is in the local store
  386. for _, ch := range localChunks {
  387. //get the real chunk by the index in the index array
  388. chunk := conf.hashes[ch]
  389. log.Trace(fmt.Sprintf("node has chunk: %s:", chunk))
  390. //check if the expected chunk is indeed in the localstore
  391. var err error
  392. if *useMockStore {
  393. if globalStore == nil {
  394. return false, fmt.Errorf("Something went wrong; using mockStore enabled but globalStore is nil")
  395. }
  396. //use the globalStore if the mockStore should be used; in that case,
  397. //the complete localStore stack is bypassed for getting the chunk
  398. _, err = globalStore.Get(common.BytesToAddress(id.Bytes()), chunk)
  399. } else {
  400. //use the actual localstore
  401. lstore := stores[id]
  402. _, err = lstore.Get(context.TODO(), chunk)
  403. }
  404. if err != nil {
  405. log.Warn(fmt.Sprintf("Chunk %s NOT found for id %s", chunk, id))
  406. allSuccess = false
  407. } else {
  408. log.Debug(fmt.Sprintf("Chunk %s IS FOUND for id %s", chunk, id))
  409. }
  410. }
  411. return allSuccess, nil
  412. }
  413. //for each tick, run the checks on all nodes
  414. timingTicker := time.NewTicker(time.Second * 1)
  415. defer timingTicker.Stop()
  416. go func() {
  417. for range timingTicker.C {
  418. for i := 0; i < len(ids); i++ {
  419. log.Trace(fmt.Sprintf("triggering step %d, id %s", i, ids[i]))
  420. trigger <- ids[i]
  421. }
  422. }
  423. }()
  424. log.Info("Starting simulation run...")
  425. timeout := MaxTimeout * time.Second
  426. ctx, cancel := context.WithTimeout(context.Background(), timeout)
  427. defer cancel()
  428. //run the simulation
  429. result := simulations.NewSimulation(net).Run(ctx, &simulations.Step{
  430. Action: action,
  431. Trigger: trigger,
  432. Expect: &simulations.Expectation{
  433. Nodes: ids,
  434. Check: check,
  435. },
  436. })
  437. if result.Error != nil {
  438. return result.Error
  439. }
  440. log.Info("Simulation terminated")
  441. return nil
  442. }
  443. //the server func to start syncing
  444. //issues `RequestSubscriptionMsg` to peers, based on po, by iterating over
  445. //the kademlia's `EachBin` function.
  446. //returns the number of subscriptions requested
  447. func (r *TestRegistry) StartSyncing(ctx context.Context) (int, error) {
  448. var err error
  449. if log.Lvl(*loglevel) == log.LvlDebug {
  450. //PeerPot for this node
  451. addr := common.Bytes2Hex(r.addr.OAddr)
  452. pp := ppmap[addr]
  453. //call Healthy RPC
  454. h := r.delivery.overlay.Healthy(pp)
  455. //print info
  456. log.Debug(r.delivery.overlay.String())
  457. log.Debug(fmt.Sprintf("IS HEALTHY: %t", h.GotNN && h.KnowNN && h.Full))
  458. }
  459. kad, ok := r.delivery.overlay.(*network.Kademlia)
  460. if !ok {
  461. return 0, fmt.Errorf("Not a Kademlia!")
  462. }
  463. subCnt := 0
  464. //iterate over each bin and solicit needed subscription to bins
  465. kad.EachBin(r.addr.Over(), pof, 0, func(conn network.OverlayConn, po int) bool {
  466. //identify begin and start index of the bin(s) we want to subscribe to
  467. log.Debug(fmt.Sprintf("Requesting subscription by: registry %s from peer %s for bin: %d", r.addr.ID(), conf.addrToIdMap[string(conn.Address())], po))
  468. var histRange *Range
  469. if history {
  470. histRange = &Range{}
  471. }
  472. subCnt++
  473. err = r.RequestSubscription(conf.addrToIdMap[string(conn.Address())], NewStream("SYNC", FormatSyncBinKey(uint8(po)), live), histRange, Top)
  474. if err != nil {
  475. log.Error(fmt.Sprintf("Error in RequestSubsciption! %v", err))
  476. return false
  477. }
  478. return true
  479. })
  480. return subCnt, nil
  481. }
  482. //map chunk keys to addresses which are responsible
  483. func mapKeysToNodes(conf *synctestConfig) {
  484. kmap := make(map[string][]int)
  485. nodemap := make(map[string][]int)
  486. //build a pot for chunk hashes
  487. np := pot.NewPot(nil, 0)
  488. indexmap := make(map[string]int)
  489. for i, a := range conf.addrs {
  490. indexmap[string(a)] = i
  491. np, _, _ = pot.Add(np, a, pof)
  492. }
  493. //for each address, run EachNeighbour on the chunk hashes pot to identify closest nodes
  494. log.Trace(fmt.Sprintf("Generated hash chunk(s): %v", conf.hashes))
  495. for i := 0; i < len(conf.hashes); i++ {
  496. pl := 256 //highest possible proximity
  497. var nns []int
  498. np.EachNeighbour([]byte(conf.hashes[i]), pof, func(val pot.Val, po int) bool {
  499. a := val.([]byte)
  500. if pl < 256 && pl != po {
  501. return false
  502. }
  503. if pl == 256 || pl == po {
  504. log.Trace(fmt.Sprintf("appending %s", conf.addrToIdMap[string(a)]))
  505. nns = append(nns, indexmap[string(a)])
  506. nodemap[string(a)] = append(nodemap[string(a)], i)
  507. }
  508. if pl == 256 && len(nns) >= testMinProxBinSize {
  509. //maxProxBinSize has been reached at this po, so save it
  510. //we will add all other nodes at the same po
  511. pl = po
  512. }
  513. return true
  514. })
  515. kmap[string(conf.hashes[i])] = nns
  516. }
  517. for addr, chunks := range nodemap {
  518. //this selects which chunks are expected to be found with the given node
  519. conf.idToChunksMap[conf.addrToIdMap[addr]] = chunks
  520. }
  521. log.Debug(fmt.Sprintf("Map of expected chunks by ID: %v", conf.idToChunksMap))
  522. conf.chunksToNodesMap = kmap
  523. }
  524. //upload a file(chunks) to a single local node store
  525. func uploadFileToSingleNodeStore(id discover.NodeID, chunkCount int) ([]storage.Address, error) {
  526. log.Debug(fmt.Sprintf("Uploading to node id: %s", id))
  527. lstore := stores[id]
  528. size := chunkSize
  529. fileStore := storage.NewFileStore(lstore, storage.NewFileStoreParams())
  530. var rootAddrs []storage.Address
  531. for i := 0; i < chunkCount; i++ {
  532. ctx := context.TODO()
  533. rk, wait, err := fileStore.Store(ctx, io.LimitReader(crand.Reader, int64(size)), int64(size), false)
  534. if err != nil {
  535. return nil, err
  536. }
  537. err = wait(ctx)
  538. if err != nil {
  539. return nil, err
  540. }
  541. rootAddrs = append(rootAddrs, (rk))
  542. }
  543. return rootAddrs, nil
  544. }
  545. //initialize a network from a snapshot
  546. func initNetWithSnapshot(nodeCount int) (*simulations.Network, error) {
  547. var a adapters.NodeAdapter
  548. //add the streamer service to the node adapter
  549. if *adapter == "exec" {
  550. dirname, err := ioutil.TempDir(".", "")
  551. if err != nil {
  552. return nil, err
  553. }
  554. a = adapters.NewExecAdapter(dirname)
  555. } else if *adapter == "tcp" {
  556. a = adapters.NewTCPAdapter(services)
  557. } else if *adapter == "sim" {
  558. a = adapters.NewSimAdapter(services)
  559. }
  560. log.Info("Setting up Snapshot network")
  561. net := simulations.NewNetwork(a, &simulations.NetworkConfig{
  562. ID: "0",
  563. DefaultService: "streamer",
  564. })
  565. f, err := os.Open(fmt.Sprintf("testing/snapshot_%d.json", nodeCount))
  566. if err != nil {
  567. return nil, err
  568. }
  569. defer f.Close()
  570. jsonbyte, err := ioutil.ReadAll(f)
  571. if err != nil {
  572. return nil, err
  573. }
  574. var snap simulations.Snapshot
  575. err = json.Unmarshal(jsonbyte, &snap)
  576. if err != nil {
  577. return nil, err
  578. }
  579. //the snapshot probably has the property EnableMsgEvents not set
  580. //just in case, set it to true!
  581. //(we need this to wait for messages before uploading)
  582. for _, n := range snap.Nodes {
  583. n.Node.Config.EnableMsgEvents = true
  584. }
  585. log.Info("Waiting for p2p connections to be established...")
  586. //now we can load the snapshot
  587. err = net.Load(&snap)
  588. if err != nil {
  589. return nil, err
  590. }
  591. log.Info("Snapshot loaded")
  592. return net, nil
  593. }
  594. //we want to wait for subscriptions to be established before uploading to test
  595. //that live syncing is working correctly
  596. func watchSubscriptionEvents(ctx context.Context, id discover.NodeID, client *rpc.Client, errc chan error, quitC chan struct{}) (doneC <-chan struct{}) {
  597. events := make(chan *p2p.PeerEvent)
  598. sub, err := client.Subscribe(context.Background(), "admin", events, "peerEvents")
  599. if err != nil {
  600. log.Error(err.Error())
  601. errc <- fmt.Errorf("error getting peer events for node %v: %s", id, err)
  602. return
  603. }
  604. c := make(chan struct{})
  605. go func() {
  606. defer func() {
  607. log.Trace("watch subscription events: unsubscribe", "id", id)
  608. sub.Unsubscribe()
  609. close(c)
  610. }()
  611. for {
  612. select {
  613. case <-quitC:
  614. return
  615. case <-ctx.Done():
  616. select {
  617. case errc <- ctx.Err():
  618. case <-quitC:
  619. }
  620. return
  621. case e := <-events:
  622. //just catch SubscribeMsg
  623. if e.Type == p2p.PeerEventTypeMsgRecv && e.Protocol == "stream" && e.MsgCode != nil && *e.MsgCode == 4 {
  624. errc <- nil
  625. }
  626. case err := <-sub.Err():
  627. if err != nil {
  628. select {
  629. case errc <- fmt.Errorf("error getting peer events for node %v: %v", id, err):
  630. case <-quitC:
  631. }
  632. return
  633. }
  634. }
  635. }
  636. }()
  637. return c
  638. }
  639. //create a local store for the given node
  640. func createTestLocalStorageForId(id discover.NodeID, addr *network.BzzAddr) (storage.ChunkStore, error) {
  641. var datadir string
  642. var err error
  643. datadir, err = ioutil.TempDir("", fmt.Sprintf("syncer-test-%s", id.TerminalString()))
  644. if err != nil {
  645. return nil, err
  646. }
  647. datadirs[id] = datadir
  648. var store storage.ChunkStore
  649. params := storage.NewDefaultLocalStoreParams()
  650. params.ChunkDbPath = datadir
  651. params.BaseKey = addr.Over()
  652. store, err = storage.NewTestLocalStoreForAddr(params)
  653. if err != nil {
  654. return nil, err
  655. }
  656. return store, nil
  657. }