snapshot_retrieval_test.go 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795
  1. // Copyright 2018 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package stream
  17. import (
  18. "context"
  19. crand "crypto/rand"
  20. "fmt"
  21. "math/rand"
  22. "strings"
  23. "sync"
  24. "testing"
  25. "time"
  26. "github.com/ethereum/go-ethereum/common"
  27. "github.com/ethereum/go-ethereum/p2p/discover"
  28. "github.com/ethereum/go-ethereum/p2p/simulations"
  29. "github.com/ethereum/go-ethereum/swarm/log"
  30. "github.com/ethereum/go-ethereum/swarm/network"
  31. streamTesting "github.com/ethereum/go-ethereum/swarm/network/stream/testing"
  32. "github.com/ethereum/go-ethereum/swarm/storage"
  33. )
  34. //constants for random file generation
  35. const (
  36. minFileSize = 2
  37. maxFileSize = 40
  38. )
  39. func initRetrievalTest() {
  40. //global func to get overlay address from discover ID
  41. toAddr = func(id discover.NodeID) *network.BzzAddr {
  42. addr := network.NewAddrFromNodeID(id)
  43. return addr
  44. }
  45. //global func to create local store
  46. createStoreFunc = createTestLocalStorageForId
  47. //local stores
  48. stores = make(map[discover.NodeID]storage.ChunkStore)
  49. //data directories for each node and store
  50. datadirs = make(map[discover.NodeID]string)
  51. //deliveries for each node
  52. deliveries = make(map[discover.NodeID]*Delivery)
  53. //global retrieve func
  54. getRetrieveFunc = func(id discover.NodeID) func(chunk *storage.Chunk) error {
  55. return func(chunk *storage.Chunk) error {
  56. skipCheck := true
  57. return deliveries[id].RequestFromPeers(chunk.Addr[:], skipCheck)
  58. }
  59. }
  60. //registries, map of discover.NodeID to its streamer
  61. registries = make(map[discover.NodeID]*TestRegistry)
  62. //not needed for this test but required from common_test for NewStreamService
  63. waitPeerErrC = make(chan error)
  64. //also not needed for this test but required for NewStreamService
  65. peerCount = func(id discover.NodeID) int {
  66. if ids[0] == id || ids[len(ids)-1] == id {
  67. return 1
  68. }
  69. return 2
  70. }
  71. }
  72. //This test is a retrieval test for nodes.
  73. //A configurable number of nodes can be
  74. //provided to the test.
  75. //Files are uploaded to nodes, other nodes try to retrieve the file
  76. //Number of nodes can be provided via commandline too.
  77. func TestFileRetrieval(t *testing.T) {
  78. if *nodes != 0 {
  79. fileRetrievalTest(t, *nodes)
  80. } else {
  81. nodeCnt := []int{16}
  82. //if the `longrunning` flag has been provided
  83. //run more test combinations
  84. if *longrunning {
  85. nodeCnt = append(nodeCnt, 32, 64, 128)
  86. }
  87. for _, n := range nodeCnt {
  88. fileRetrievalTest(t, n)
  89. }
  90. }
  91. }
  92. //This test is a retrieval test for nodes.
  93. //One node is randomly selected to be the pivot node.
  94. //A configurable number of chunks and nodes can be
  95. //provided to the test, the number of chunks is uploaded
  96. //to the pivot node and other nodes try to retrieve the chunk(s).
  97. //Number of chunks and nodes can be provided via commandline too.
  98. func TestRetrieval(t *testing.T) {
  99. //if nodes/chunks have been provided via commandline,
  100. //run the tests with these values
  101. if *nodes != 0 && *chunks != 0 {
  102. retrievalTest(t, *chunks, *nodes)
  103. } else {
  104. var nodeCnt []int
  105. var chnkCnt []int
  106. //if the `longrunning` flag has been provided
  107. //run more test combinations
  108. if *longrunning {
  109. nodeCnt = []int{16, 32, 128}
  110. chnkCnt = []int{4, 32, 256}
  111. } else {
  112. //default test
  113. nodeCnt = []int{16}
  114. chnkCnt = []int{32}
  115. }
  116. for _, n := range nodeCnt {
  117. for _, c := range chnkCnt {
  118. retrievalTest(t, c, n)
  119. }
  120. }
  121. }
  122. }
  123. //Every test runs 3 times, a live, a history, and a live AND history
  124. func fileRetrievalTest(t *testing.T, nodeCount int) {
  125. //test live and NO history
  126. log.Info("Testing live and no history", "nodeCount", nodeCount)
  127. live = true
  128. history = false
  129. err := runFileRetrievalTest(nodeCount)
  130. if err != nil {
  131. t.Fatal(err)
  132. }
  133. //test history only
  134. log.Info("Testing history only", "nodeCount", nodeCount)
  135. live = false
  136. history = true
  137. err = runFileRetrievalTest(nodeCount)
  138. if err != nil {
  139. t.Fatal(err)
  140. }
  141. //finally test live and history
  142. log.Info("Testing live and history", "nodeCount", nodeCount)
  143. live = true
  144. err = runFileRetrievalTest(nodeCount)
  145. if err != nil {
  146. t.Fatal(err)
  147. }
  148. }
  149. //Every test runs 3 times, a live, a history, and a live AND history
  150. func retrievalTest(t *testing.T, chunkCount int, nodeCount int) {
  151. //test live and NO history
  152. log.Info("Testing live and no history", "chunkCount", chunkCount, "nodeCount", nodeCount)
  153. live = true
  154. history = false
  155. err := runRetrievalTest(chunkCount, nodeCount)
  156. if err != nil {
  157. t.Fatal(err)
  158. }
  159. //test history only
  160. log.Info("Testing history only", "chunkCount", chunkCount, "nodeCount", nodeCount)
  161. live = false
  162. history = true
  163. err = runRetrievalTest(chunkCount, nodeCount)
  164. if err != nil {
  165. t.Fatal(err)
  166. }
  167. //finally test live and history
  168. log.Info("Testing live and history", "chunkCount", chunkCount, "nodeCount", nodeCount)
  169. live = true
  170. err = runRetrievalTest(chunkCount, nodeCount)
  171. if err != nil {
  172. t.Fatal(err)
  173. }
  174. }
  175. /*
  176. The upload is done by dependency to the global
  177. `live` and `history` variables;
  178. If `live` is set, first stream subscriptions are established,
  179. then files are uploaded to nodes.
  180. If `history` is enabled, first upload files, then build up subscriptions.
  181. The test loads a snapshot file to construct the swarm network,
  182. assuming that the snapshot file identifies a healthy
  183. kademlia network. Nevertheless a health check runs in the
  184. simulation's `action` function.
  185. The snapshot should have 'streamer' in its service list.
  186. */
  187. func runFileRetrievalTest(nodeCount int) error {
  188. //for every run (live, history), int the variables
  189. initRetrievalTest()
  190. //the ids of the snapshot nodes, initiate only now as we need nodeCount
  191. ids = make([]discover.NodeID, nodeCount)
  192. //channel to check for disconnection errors
  193. disconnectC := make(chan error)
  194. //channel to close disconnection watcher routine
  195. quitC := make(chan struct{})
  196. //the test conf (using same as in `snapshot_sync_test`
  197. conf = &synctestConfig{}
  198. //map of overlay address to discover ID
  199. conf.addrToIdMap = make(map[string]discover.NodeID)
  200. //array where the generated chunk hashes will be stored
  201. conf.hashes = make([]storage.Address, 0)
  202. //load nodes from the snapshot file
  203. net, err := initNetWithSnapshot(nodeCount)
  204. if err != nil {
  205. return err
  206. }
  207. var rpcSubscriptionsWg sync.WaitGroup
  208. //do cleanup after test is terminated
  209. defer func() {
  210. //shutdown the snapshot network
  211. net.Shutdown()
  212. //after the test, clean up local stores initialized with createLocalStoreForId
  213. localStoreCleanup()
  214. //finally clear all data directories
  215. datadirsCleanup()
  216. }()
  217. //get the nodes of the network
  218. nodes := net.GetNodes()
  219. //iterate over all nodes...
  220. for c := 0; c < len(nodes); c++ {
  221. //create an array of discovery nodeIDS
  222. ids[c] = nodes[c].ID()
  223. a := network.ToOverlayAddr(ids[c].Bytes())
  224. //append it to the array of all overlay addresses
  225. conf.addrs = append(conf.addrs, a)
  226. conf.addrToIdMap[string(a)] = ids[c]
  227. }
  228. //needed for healthy call
  229. ppmap = network.NewPeerPotMap(testMinProxBinSize, conf.addrs)
  230. //an array for the random files
  231. var randomFiles []string
  232. //channel to signal when the upload has finished
  233. uploadFinished := make(chan struct{})
  234. //channel to trigger new node checks
  235. trigger := make(chan discover.NodeID)
  236. //simulation action
  237. action := func(ctx context.Context) error {
  238. //first run the health check on all nodes,
  239. //wait until nodes are all healthy
  240. ticker := time.NewTicker(200 * time.Millisecond)
  241. defer ticker.Stop()
  242. for range ticker.C {
  243. healthy := true
  244. for _, id := range ids {
  245. r := registries[id]
  246. //PeerPot for this node
  247. addr := common.Bytes2Hex(r.addr.OAddr)
  248. pp := ppmap[addr]
  249. //call Healthy RPC
  250. h := r.delivery.overlay.Healthy(pp)
  251. //print info
  252. log.Debug(r.delivery.overlay.String())
  253. log.Debug(fmt.Sprintf("IS HEALTHY: %t", h.GotNN && h.KnowNN && h.Full))
  254. if !h.GotNN || !h.Full {
  255. healthy = false
  256. break
  257. }
  258. }
  259. if healthy {
  260. break
  261. }
  262. }
  263. if history {
  264. log.Info("Uploading for history")
  265. //If testing only history, we upload the chunk(s) first
  266. conf.hashes, randomFiles, err = uploadFilesToNodes(nodes)
  267. if err != nil {
  268. return err
  269. }
  270. }
  271. //variables needed to wait for all subscriptions established before uploading
  272. errc := make(chan error)
  273. //now setup and start event watching in order to know when we can upload
  274. ctx, watchCancel := context.WithTimeout(context.Background(), MaxTimeout*time.Second)
  275. defer watchCancel()
  276. log.Info("Setting up stream subscription")
  277. //We need two iterations, one to subscribe to the subscription events
  278. //(so we know when setup phase is finished), and one to
  279. //actually run the stream subscriptions. We can't do it in the same iteration,
  280. //because while the first nodes in the loop are setting up subscriptions,
  281. //the latter ones have not subscribed to listen to peer events yet,
  282. //and then we miss events.
  283. //first iteration: setup disconnection watcher and subscribe to peer events
  284. for j, id := range ids {
  285. log.Trace(fmt.Sprintf("Subscribe to subscription events: %d", j))
  286. client, err := net.GetNode(id).Client()
  287. if err != nil {
  288. return err
  289. }
  290. wsDoneC := watchSubscriptionEvents(ctx, id, client, errc, quitC)
  291. // doneC is nil, the error happened which is sent to errc channel, already
  292. if wsDoneC == nil {
  293. continue
  294. }
  295. rpcSubscriptionsWg.Add(1)
  296. go func() {
  297. <-wsDoneC
  298. rpcSubscriptionsWg.Done()
  299. }()
  300. //watch for peers disconnecting
  301. wdDoneC, err := streamTesting.WatchDisconnections(id, client, disconnectC, quitC)
  302. if err != nil {
  303. return err
  304. }
  305. rpcSubscriptionsWg.Add(1)
  306. go func() {
  307. <-wdDoneC
  308. rpcSubscriptionsWg.Done()
  309. }()
  310. }
  311. //second iteration: start syncing and setup stream subscriptions
  312. for j, id := range ids {
  313. log.Trace(fmt.Sprintf("Start syncing and stream subscriptions: %d", j))
  314. client, err := net.GetNode(id).Client()
  315. if err != nil {
  316. return err
  317. }
  318. //start syncing!
  319. var cnt int
  320. err = client.CallContext(ctx, &cnt, "stream_startSyncing")
  321. if err != nil {
  322. return err
  323. }
  324. //increment the number of subscriptions we need to wait for
  325. //by the count returned from startSyncing (SYNC subscriptions)
  326. subscriptionCount += cnt
  327. //now also add the number of RETRIEVAL_REQUEST subscriptions
  328. for snid := range registries[id].peers {
  329. subscriptionCount++
  330. err = client.CallContext(ctx, nil, "stream_subscribeStream", snid, NewStream(swarmChunkServerStreamName, "", false), nil, Top)
  331. if err != nil {
  332. return err
  333. }
  334. }
  335. }
  336. //now wait until the number of expected subscriptions has been finished
  337. //`watchSubscriptionEvents` will write with a `nil` value to errc
  338. //every time a `SubscriptionMsg` has been received
  339. for err := range errc {
  340. if err != nil {
  341. return err
  342. }
  343. //`nil` received, decrement count
  344. subscriptionCount--
  345. //all subscriptions received
  346. if subscriptionCount == 0 {
  347. break
  348. }
  349. }
  350. log.Info("Stream subscriptions successfully requested, action terminated")
  351. if live {
  352. //upload generated files to nodes
  353. var hashes []storage.Address
  354. var rfiles []string
  355. hashes, rfiles, err = uploadFilesToNodes(nodes)
  356. if err != nil {
  357. return err
  358. }
  359. conf.hashes = append(conf.hashes, hashes...)
  360. randomFiles = append(randomFiles, rfiles...)
  361. //signal to the trigger loop that the upload has finished
  362. uploadFinished <- struct{}{}
  363. }
  364. return nil
  365. }
  366. //check defines what will be checked during the test
  367. check := func(ctx context.Context, id discover.NodeID) (bool, error) {
  368. select {
  369. case <-ctx.Done():
  370. return false, ctx.Err()
  371. case e := <-disconnectC:
  372. log.Error(e.Error())
  373. return false, fmt.Errorf("Disconnect event detected, network unhealthy")
  374. default:
  375. }
  376. log.Trace(fmt.Sprintf("Checking node: %s", id))
  377. //if there are more than one chunk, test only succeeds if all expected chunks are found
  378. allSuccess := true
  379. //check on the node's FileStore (netstore)
  380. fileStore := registries[id].fileStore
  381. //check all chunks
  382. for i, hash := range conf.hashes {
  383. reader, _ := fileStore.Retrieve(context.TODO(), hash)
  384. //check that we can read the file size and that it corresponds to the generated file size
  385. if s, err := reader.Size(nil); err != nil || s != int64(len(randomFiles[i])) {
  386. allSuccess = false
  387. log.Warn("Retrieve error", "err", err, "hash", hash, "nodeId", id)
  388. } else {
  389. log.Debug(fmt.Sprintf("File with root hash %x successfully retrieved", hash))
  390. }
  391. }
  392. return allSuccess, nil
  393. }
  394. //for each tick, run the checks on all nodes
  395. timingTicker := time.NewTicker(5 * time.Second)
  396. defer timingTicker.Stop()
  397. go func() {
  398. //for live upload, we should wait for uploads to have finished
  399. //before starting to trigger the checks, due to file size
  400. if live {
  401. <-uploadFinished
  402. }
  403. for range timingTicker.C {
  404. for i := 0; i < len(ids); i++ {
  405. log.Trace(fmt.Sprintf("triggering step %d, id %s", i, ids[i]))
  406. trigger <- ids[i]
  407. }
  408. }
  409. }()
  410. log.Info("Starting simulation run...")
  411. timeout := MaxTimeout * time.Second
  412. ctx, cancel := context.WithTimeout(context.Background(), timeout)
  413. defer cancel()
  414. //run the simulation
  415. result := simulations.NewSimulation(net).Run(ctx, &simulations.Step{
  416. Action: action,
  417. Trigger: trigger,
  418. Expect: &simulations.Expectation{
  419. Nodes: ids,
  420. Check: check,
  421. },
  422. })
  423. if result.Error != nil {
  424. return result.Error
  425. }
  426. return nil
  427. }
  428. /*
  429. The test generates the given number of chunks.
  430. The upload is done by dependency to the global
  431. `live` and `history` variables;
  432. If `live` is set, first stream subscriptions are established, then
  433. upload to a random node.
  434. If `history` is enabled, first upload then build up subscriptions.
  435. The test loads a snapshot file to construct the swarm network,
  436. assuming that the snapshot file identifies a healthy
  437. kademlia network. Nevertheless a health check runs in the
  438. simulation's `action` function.
  439. The snapshot should have 'streamer' in its service list.
  440. */
  441. func runRetrievalTest(chunkCount int, nodeCount int) error {
  442. //for every run (live, history), int the variables
  443. initRetrievalTest()
  444. //the ids of the snapshot nodes, initiate only now as we need nodeCount
  445. ids = make([]discover.NodeID, nodeCount)
  446. //channel to check for disconnection errors
  447. disconnectC := make(chan error)
  448. //channel to close disconnection watcher routine
  449. quitC := make(chan struct{})
  450. //the test conf (using same as in `snapshot_sync_test`
  451. conf = &synctestConfig{}
  452. //map of overlay address to discover ID
  453. conf.addrToIdMap = make(map[string]discover.NodeID)
  454. //array where the generated chunk hashes will be stored
  455. conf.hashes = make([]storage.Address, 0)
  456. //load nodes from the snapshot file
  457. net, err := initNetWithSnapshot(nodeCount)
  458. if err != nil {
  459. return err
  460. }
  461. var rpcSubscriptionsWg sync.WaitGroup
  462. //do cleanup after test is terminated
  463. defer func() {
  464. //shutdown the snapshot network
  465. net.Shutdown()
  466. //after the test, clean up local stores initialized with createLocalStoreForId
  467. localStoreCleanup()
  468. //finally clear all data directories
  469. datadirsCleanup()
  470. }()
  471. //get the nodes of the network
  472. nodes := net.GetNodes()
  473. //select one index at random...
  474. idx := rand.Intn(len(nodes))
  475. //...and get the the node at that index
  476. //this is the node selected for upload
  477. uploadNode := nodes[idx]
  478. //iterate over all nodes...
  479. for c := 0; c < len(nodes); c++ {
  480. //create an array of discovery nodeIDS
  481. ids[c] = nodes[c].ID()
  482. a := network.ToOverlayAddr(ids[c].Bytes())
  483. //append it to the array of all overlay addresses
  484. conf.addrs = append(conf.addrs, a)
  485. conf.addrToIdMap[string(a)] = ids[c]
  486. }
  487. //needed for healthy call
  488. ppmap = network.NewPeerPotMap(testMinProxBinSize, conf.addrs)
  489. trigger := make(chan discover.NodeID)
  490. //simulation action
  491. action := func(ctx context.Context) error {
  492. //first run the health check on all nodes,
  493. //wait until nodes are all healthy
  494. ticker := time.NewTicker(200 * time.Millisecond)
  495. defer ticker.Stop()
  496. for range ticker.C {
  497. healthy := true
  498. for _, id := range ids {
  499. r := registries[id]
  500. //PeerPot for this node
  501. addr := common.Bytes2Hex(network.ToOverlayAddr(id.Bytes()))
  502. pp := ppmap[addr]
  503. //call Healthy RPC
  504. h := r.delivery.overlay.Healthy(pp)
  505. //print info
  506. log.Debug(r.delivery.overlay.String())
  507. log.Debug(fmt.Sprintf("IS HEALTHY: %t", h.GotNN && h.KnowNN && h.Full))
  508. if !h.GotNN || !h.Full {
  509. healthy = false
  510. break
  511. }
  512. }
  513. if healthy {
  514. break
  515. }
  516. }
  517. if history {
  518. log.Info("Uploading for history")
  519. //If testing only history, we upload the chunk(s) first
  520. conf.hashes, err = uploadFileToSingleNodeStore(uploadNode.ID(), chunkCount)
  521. if err != nil {
  522. return err
  523. }
  524. }
  525. //variables needed to wait for all subscriptions established before uploading
  526. errc := make(chan error)
  527. //now setup and start event watching in order to know when we can upload
  528. ctx, watchCancel := context.WithTimeout(context.Background(), MaxTimeout*time.Second)
  529. defer watchCancel()
  530. log.Info("Setting up stream subscription")
  531. //We need two iterations, one to subscribe to the subscription events
  532. //(so we know when setup phase is finished), and one to
  533. //actually run the stream subscriptions. We can't do it in the same iteration,
  534. //because while the first nodes in the loop are setting up subscriptions,
  535. //the latter ones have not subscribed to listen to peer events yet,
  536. //and then we miss events.
  537. //first iteration: setup disconnection watcher and subscribe to peer events
  538. for j, id := range ids {
  539. log.Trace(fmt.Sprintf("Subscribe to subscription events: %d", j))
  540. client, err := net.GetNode(id).Client()
  541. if err != nil {
  542. return err
  543. }
  544. //check for `SubscribeMsg` events to know when setup phase is complete
  545. wsDoneC := watchSubscriptionEvents(ctx, id, client, errc, quitC)
  546. // doneC is nil, the error happened which is sent to errc channel, already
  547. if wsDoneC == nil {
  548. continue
  549. }
  550. rpcSubscriptionsWg.Add(1)
  551. go func() {
  552. <-wsDoneC
  553. rpcSubscriptionsWg.Done()
  554. }()
  555. //watch for peers disconnecting
  556. wdDoneC, err := streamTesting.WatchDisconnections(id, client, disconnectC, quitC)
  557. if err != nil {
  558. return err
  559. }
  560. rpcSubscriptionsWg.Add(1)
  561. go func() {
  562. <-wdDoneC
  563. rpcSubscriptionsWg.Done()
  564. }()
  565. }
  566. //second iteration: start syncing and setup stream subscriptions
  567. for j, id := range ids {
  568. log.Trace(fmt.Sprintf("Start syncing and stream subscriptions: %d", j))
  569. client, err := net.GetNode(id).Client()
  570. if err != nil {
  571. return err
  572. }
  573. //start syncing!
  574. var cnt int
  575. err = client.CallContext(ctx, &cnt, "stream_startSyncing")
  576. if err != nil {
  577. return err
  578. }
  579. //increment the number of subscriptions we need to wait for
  580. //by the count returned from startSyncing (SYNC subscriptions)
  581. subscriptionCount += cnt
  582. //now also add the number of RETRIEVAL_REQUEST subscriptions
  583. for snid := range registries[id].peers {
  584. subscriptionCount++
  585. err = client.CallContext(ctx, nil, "stream_subscribeStream", snid, NewStream(swarmChunkServerStreamName, "", false), nil, Top)
  586. if err != nil {
  587. return err
  588. }
  589. }
  590. }
  591. //now wait until the number of expected subscriptions has been finished
  592. //`watchSubscriptionEvents` will write with a `nil` value to errc
  593. //every time a `SubscriptionMsg` has been received
  594. for err := range errc {
  595. if err != nil {
  596. return err
  597. }
  598. //`nil` received, decrement count
  599. subscriptionCount--
  600. //all subscriptions received
  601. if subscriptionCount == 0 {
  602. break
  603. }
  604. }
  605. log.Info("Stream subscriptions successfully requested, action terminated")
  606. if live {
  607. //now upload the chunks to the selected random single node
  608. chnks, err := uploadFileToSingleNodeStore(uploadNode.ID(), chunkCount)
  609. if err != nil {
  610. return err
  611. }
  612. conf.hashes = append(conf.hashes, chnks...)
  613. }
  614. return nil
  615. }
  616. chunkSize := storage.DefaultChunkSize
  617. //check defines what will be checked during the test
  618. check := func(ctx context.Context, id discover.NodeID) (bool, error) {
  619. //don't check the uploader node
  620. if id == uploadNode.ID() {
  621. return true, nil
  622. }
  623. select {
  624. case <-ctx.Done():
  625. return false, ctx.Err()
  626. case e := <-disconnectC:
  627. log.Error(e.Error())
  628. return false, fmt.Errorf("Disconnect event detected, network unhealthy")
  629. default:
  630. }
  631. log.Trace(fmt.Sprintf("Checking node: %s", id))
  632. //if there are more than one chunk, test only succeeds if all expected chunks are found
  633. allSuccess := true
  634. //check on the node's FileStore (netstore)
  635. fileStore := registries[id].fileStore
  636. //check all chunks
  637. for _, chnk := range conf.hashes {
  638. reader, _ := fileStore.Retrieve(context.TODO(), chnk)
  639. //assuming that reading the Size of the chunk is enough to know we found it
  640. if s, err := reader.Size(nil); err != nil || s != chunkSize {
  641. allSuccess = false
  642. log.Warn("Retrieve error", "err", err, "chunk", chnk, "nodeId", id)
  643. } else {
  644. log.Debug(fmt.Sprintf("Chunk %x found", chnk))
  645. }
  646. }
  647. return allSuccess, nil
  648. }
  649. //for each tick, run the checks on all nodes
  650. timingTicker := time.NewTicker(5 * time.Second)
  651. defer timingTicker.Stop()
  652. go func() {
  653. for range timingTicker.C {
  654. for i := 0; i < len(ids); i++ {
  655. log.Trace(fmt.Sprintf("triggering step %d, id %s", i, ids[i]))
  656. trigger <- ids[i]
  657. }
  658. }
  659. }()
  660. log.Info("Starting simulation run...")
  661. timeout := MaxTimeout * time.Second
  662. ctx, cancel := context.WithTimeout(context.Background(), timeout)
  663. defer cancel()
  664. //run the simulation
  665. result := simulations.NewSimulation(net).Run(ctx, &simulations.Step{
  666. Action: action,
  667. Trigger: trigger,
  668. Expect: &simulations.Expectation{
  669. Nodes: ids,
  670. Check: check,
  671. },
  672. })
  673. if result.Error != nil {
  674. return result.Error
  675. }
  676. return nil
  677. }
  678. //upload generated files to nodes
  679. //every node gets one file uploaded
  680. func uploadFilesToNodes(nodes []*simulations.Node) ([]storage.Address, []string, error) {
  681. nodeCnt := len(nodes)
  682. log.Debug(fmt.Sprintf("Uploading %d files to nodes", nodeCnt))
  683. //array holding generated files
  684. rfiles := make([]string, nodeCnt)
  685. //array holding the root hashes of the files
  686. rootAddrs := make([]storage.Address, nodeCnt)
  687. var err error
  688. //for every node, generate a file and upload
  689. for i, n := range nodes {
  690. id := n.ID()
  691. fileStore := registries[id].fileStore
  692. //generate a file
  693. rfiles[i], err = generateRandomFile()
  694. if err != nil {
  695. return nil, nil, err
  696. }
  697. //store it (upload it) on the FileStore
  698. ctx := context.TODO()
  699. rk, wait, err := fileStore.Store(ctx, strings.NewReader(rfiles[i]), int64(len(rfiles[i])), false)
  700. log.Debug("Uploaded random string file to node")
  701. if err != nil {
  702. return nil, nil, err
  703. }
  704. err = wait(ctx)
  705. if err != nil {
  706. return nil, nil, err
  707. }
  708. rootAddrs[i] = rk
  709. }
  710. return rootAddrs, rfiles, nil
  711. }
  712. //generate a random file (string)
  713. func generateRandomFile() (string, error) {
  714. //generate a random file size between minFileSize and maxFileSize
  715. fileSize := rand.Intn(maxFileSize-minFileSize) + minFileSize
  716. log.Debug(fmt.Sprintf("Generated file with filesize %d kB", fileSize))
  717. b := make([]byte, fileSize*1024)
  718. _, err := crand.Read(b)
  719. if err != nil {
  720. log.Error("Error generating random file.", "err", err)
  721. return "", err
  722. }
  723. return string(b), nil
  724. }