delivery_test.go 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699
  1. // Copyright 2018 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package stream
  17. import (
  18. "bytes"
  19. "context"
  20. crand "crypto/rand"
  21. "fmt"
  22. "io"
  23. "sync"
  24. "testing"
  25. "time"
  26. "github.com/ethereum/go-ethereum/common"
  27. "github.com/ethereum/go-ethereum/p2p/discover"
  28. "github.com/ethereum/go-ethereum/p2p/simulations"
  29. p2ptest "github.com/ethereum/go-ethereum/p2p/testing"
  30. "github.com/ethereum/go-ethereum/rpc"
  31. "github.com/ethereum/go-ethereum/swarm/log"
  32. "github.com/ethereum/go-ethereum/swarm/network"
  33. streamTesting "github.com/ethereum/go-ethereum/swarm/network/stream/testing"
  34. "github.com/ethereum/go-ethereum/swarm/storage"
  35. )
  36. func TestStreamerRetrieveRequest(t *testing.T) {
  37. tester, streamer, _, teardown, err := newStreamerTester(t)
  38. defer teardown()
  39. if err != nil {
  40. t.Fatal(err)
  41. }
  42. peerID := tester.IDs[0]
  43. streamer.delivery.RequestFromPeers(hash0[:], true)
  44. err = tester.TestExchanges(p2ptest.Exchange{
  45. Label: "RetrieveRequestMsg",
  46. Expects: []p2ptest.Expect{
  47. {
  48. Code: 5,
  49. Msg: &RetrieveRequestMsg{
  50. Addr: hash0[:],
  51. SkipCheck: true,
  52. },
  53. Peer: peerID,
  54. },
  55. },
  56. })
  57. if err != nil {
  58. t.Fatalf("Expected no error, got %v", err)
  59. }
  60. }
  61. func TestStreamerUpstreamRetrieveRequestMsgExchangeWithoutStore(t *testing.T) {
  62. tester, streamer, _, teardown, err := newStreamerTester(t)
  63. defer teardown()
  64. if err != nil {
  65. t.Fatal(err)
  66. }
  67. peerID := tester.IDs[0]
  68. chunk := storage.NewChunk(storage.Address(hash0[:]), nil)
  69. peer := streamer.getPeer(peerID)
  70. peer.handleSubscribeMsg(&SubscribeMsg{
  71. Stream: NewStream(swarmChunkServerStreamName, "", false),
  72. History: nil,
  73. Priority: Top,
  74. })
  75. err = tester.TestExchanges(p2ptest.Exchange{
  76. Label: "RetrieveRequestMsg",
  77. Triggers: []p2ptest.Trigger{
  78. {
  79. Code: 5,
  80. Msg: &RetrieveRequestMsg{
  81. Addr: chunk.Addr[:],
  82. },
  83. Peer: peerID,
  84. },
  85. },
  86. Expects: []p2ptest.Expect{
  87. {
  88. Code: 1,
  89. Msg: &OfferedHashesMsg{
  90. HandoverProof: nil,
  91. Hashes: nil,
  92. From: 0,
  93. To: 0,
  94. },
  95. Peer: peerID,
  96. },
  97. },
  98. })
  99. expectedError := `exchange #0 "RetrieveRequestMsg": timed out`
  100. if err == nil || err.Error() != expectedError {
  101. t.Fatalf("Expected error %v, got %v", expectedError, err)
  102. }
  103. }
  104. // upstream request server receives a retrieve Request and responds with
  105. // offered hashes or delivery if skipHash is set to true
  106. func TestStreamerUpstreamRetrieveRequestMsgExchange(t *testing.T) {
  107. tester, streamer, localStore, teardown, err := newStreamerTester(t)
  108. defer teardown()
  109. if err != nil {
  110. t.Fatal(err)
  111. }
  112. peerID := tester.IDs[0]
  113. peer := streamer.getPeer(peerID)
  114. stream := NewStream(swarmChunkServerStreamName, "", false)
  115. peer.handleSubscribeMsg(&SubscribeMsg{
  116. Stream: stream,
  117. History: nil,
  118. Priority: Top,
  119. })
  120. hash := storage.Address(hash0[:])
  121. chunk := storage.NewChunk(hash, nil)
  122. chunk.SData = hash
  123. localStore.Put(chunk)
  124. chunk.WaitToStore()
  125. err = tester.TestExchanges(p2ptest.Exchange{
  126. Label: "RetrieveRequestMsg",
  127. Triggers: []p2ptest.Trigger{
  128. {
  129. Code: 5,
  130. Msg: &RetrieveRequestMsg{
  131. Addr: hash,
  132. },
  133. Peer: peerID,
  134. },
  135. },
  136. Expects: []p2ptest.Expect{
  137. {
  138. Code: 1,
  139. Msg: &OfferedHashesMsg{
  140. HandoverProof: &HandoverProof{
  141. Handover: &Handover{},
  142. },
  143. Hashes: hash,
  144. From: 0,
  145. // TODO: why is this 32???
  146. To: 32,
  147. Stream: stream,
  148. },
  149. Peer: peerID,
  150. },
  151. },
  152. })
  153. if err != nil {
  154. t.Fatal(err)
  155. }
  156. hash = storage.Address(hash1[:])
  157. chunk = storage.NewChunk(hash, nil)
  158. chunk.SData = hash1[:]
  159. localStore.Put(chunk)
  160. chunk.WaitToStore()
  161. err = tester.TestExchanges(p2ptest.Exchange{
  162. Label: "RetrieveRequestMsg",
  163. Triggers: []p2ptest.Trigger{
  164. {
  165. Code: 5,
  166. Msg: &RetrieveRequestMsg{
  167. Addr: hash,
  168. SkipCheck: true,
  169. },
  170. Peer: peerID,
  171. },
  172. },
  173. Expects: []p2ptest.Expect{
  174. {
  175. Code: 6,
  176. Msg: &ChunkDeliveryMsg{
  177. Addr: hash,
  178. SData: hash,
  179. },
  180. Peer: peerID,
  181. },
  182. },
  183. })
  184. if err != nil {
  185. t.Fatal(err)
  186. }
  187. }
  188. func TestStreamerDownstreamChunkDeliveryMsgExchange(t *testing.T) {
  189. tester, streamer, localStore, teardown, err := newStreamerTester(t)
  190. defer teardown()
  191. if err != nil {
  192. t.Fatal(err)
  193. }
  194. streamer.RegisterClientFunc("foo", func(p *Peer, t string, live bool) (Client, error) {
  195. return &testClient{
  196. t: t,
  197. }, nil
  198. })
  199. peerID := tester.IDs[0]
  200. stream := NewStream("foo", "", true)
  201. err = streamer.Subscribe(peerID, stream, NewRange(5, 8), Top)
  202. if err != nil {
  203. t.Fatalf("Expected no error, got %v", err)
  204. }
  205. chunkKey := hash0[:]
  206. chunkData := hash1[:]
  207. chunk, created := localStore.GetOrCreateRequest(chunkKey)
  208. if !created {
  209. t.Fatal("chunk already exists")
  210. }
  211. select {
  212. case <-chunk.ReqC:
  213. t.Fatal("chunk is already received")
  214. default:
  215. }
  216. err = tester.TestExchanges(p2ptest.Exchange{
  217. Label: "Subscribe message",
  218. Expects: []p2ptest.Expect{
  219. {
  220. Code: 4,
  221. Msg: &SubscribeMsg{
  222. Stream: stream,
  223. History: NewRange(5, 8),
  224. Priority: Top,
  225. },
  226. Peer: peerID,
  227. },
  228. },
  229. },
  230. p2ptest.Exchange{
  231. Label: "ChunkDeliveryRequest message",
  232. Triggers: []p2ptest.Trigger{
  233. {
  234. Code: 6,
  235. Msg: &ChunkDeliveryMsg{
  236. Addr: chunkKey,
  237. SData: chunkData,
  238. },
  239. Peer: peerID,
  240. },
  241. },
  242. })
  243. if err != nil {
  244. t.Fatalf("Expected no error, got %v", err)
  245. }
  246. timeout := time.NewTimer(1 * time.Second)
  247. select {
  248. case <-timeout.C:
  249. t.Fatal("timeout receiving chunk")
  250. case <-chunk.ReqC:
  251. }
  252. storedChunk, err := localStore.Get(chunkKey)
  253. if err != nil {
  254. t.Fatalf("Expected no error, got %v", err)
  255. }
  256. if !bytes.Equal(storedChunk.SData, chunkData) {
  257. t.Fatal("Retrieved chunk has different data than original")
  258. }
  259. }
  260. func TestDeliveryFromNodes(t *testing.T) {
  261. testDeliveryFromNodes(t, 2, 1, dataChunkCount, true)
  262. testDeliveryFromNodes(t, 2, 1, dataChunkCount, false)
  263. testDeliveryFromNodes(t, 4, 1, dataChunkCount, true)
  264. testDeliveryFromNodes(t, 4, 1, dataChunkCount, false)
  265. testDeliveryFromNodes(t, 8, 1, dataChunkCount, true)
  266. testDeliveryFromNodes(t, 8, 1, dataChunkCount, false)
  267. testDeliveryFromNodes(t, 16, 1, dataChunkCount, true)
  268. testDeliveryFromNodes(t, 16, 1, dataChunkCount, false)
  269. }
  270. func testDeliveryFromNodes(t *testing.T, nodes, conns, chunkCount int, skipCheck bool) {
  271. defaultSkipCheck = skipCheck
  272. toAddr = network.NewAddrFromNodeID
  273. createStoreFunc = createTestLocalStorageFromSim
  274. conf := &streamTesting.RunConfig{
  275. Adapter: *adapter,
  276. NodeCount: nodes,
  277. ConnLevel: conns,
  278. ToAddr: toAddr,
  279. Services: services,
  280. EnableMsgEvents: false,
  281. }
  282. sim, teardown, err := streamTesting.NewSimulation(conf)
  283. var rpcSubscriptionsWg sync.WaitGroup
  284. defer func() {
  285. rpcSubscriptionsWg.Wait()
  286. teardown()
  287. }()
  288. if err != nil {
  289. t.Fatal(err.Error())
  290. }
  291. stores = make(map[discover.NodeID]storage.ChunkStore)
  292. for i, id := range sim.IDs {
  293. stores[id] = sim.Stores[i]
  294. }
  295. registries = make(map[discover.NodeID]*TestRegistry)
  296. deliveries = make(map[discover.NodeID]*Delivery)
  297. peerCount = func(id discover.NodeID) int {
  298. if sim.IDs[0] == id || sim.IDs[nodes-1] == id {
  299. return 1
  300. }
  301. return 2
  302. }
  303. // here we distribute chunks of a random file into Stores of nodes 1 to nodes
  304. rrFileStore := storage.NewFileStore(newRoundRobinStore(sim.Stores[1:]...), storage.NewFileStoreParams())
  305. size := chunkCount * chunkSize
  306. fileHash, wait, err := rrFileStore.Store(io.LimitReader(crand.Reader, int64(size)), int64(size), false)
  307. // wait until all chunks stored
  308. wait()
  309. if err != nil {
  310. t.Fatal(err.Error())
  311. }
  312. errc := make(chan error, 1)
  313. waitPeerErrC = make(chan error)
  314. quitC := make(chan struct{})
  315. defer close(quitC)
  316. action := func(ctx context.Context) error {
  317. // each node Subscribes to each other's swarmChunkServerStreamName
  318. // need to wait till an aynchronous process registers the peers in streamer.peers
  319. // that is used by Subscribe
  320. // using a global err channel to share betweem action and node service
  321. i := 0
  322. for err := range waitPeerErrC {
  323. if err != nil {
  324. return fmt.Errorf("error waiting for peers: %s", err)
  325. }
  326. i++
  327. if i == nodes {
  328. break
  329. }
  330. }
  331. // each node subscribes to the upstream swarm chunk server stream
  332. // which responds to chunk retrieve requests all but the last node in the chain does not
  333. for j := 0; j < nodes-1; j++ {
  334. id := sim.IDs[j]
  335. err := sim.CallClient(id, func(client *rpc.Client) error {
  336. doneC, err := streamTesting.WatchDisconnections(id, client, errc, quitC)
  337. if err != nil {
  338. return err
  339. }
  340. rpcSubscriptionsWg.Add(1)
  341. go func() {
  342. <-doneC
  343. rpcSubscriptionsWg.Done()
  344. }()
  345. ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
  346. defer cancel()
  347. sid := sim.IDs[j+1]
  348. return client.CallContext(ctx, nil, "stream_subscribeStream", sid, NewStream(swarmChunkServerStreamName, "", false), NewRange(0, 0), Top)
  349. })
  350. if err != nil {
  351. return err
  352. }
  353. }
  354. // create a retriever FileStore for the pivot node
  355. delivery := deliveries[sim.IDs[0]]
  356. retrieveFunc := func(chunk *storage.Chunk) error {
  357. return delivery.RequestFromPeers(chunk.Addr[:], skipCheck)
  358. }
  359. netStore := storage.NewNetStore(sim.Stores[0].(*storage.LocalStore), retrieveFunc)
  360. fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams())
  361. go func() {
  362. // start the retrieval on the pivot node - this will spawn retrieve requests for missing chunks
  363. // we must wait for the peer connections to have started before requesting
  364. n, err := readAll(fileStore, fileHash)
  365. log.Info(fmt.Sprintf("retrieved %v", fileHash), "read", n, "err", err)
  366. if err != nil {
  367. errc <- fmt.Errorf("requesting chunks action error: %v", err)
  368. }
  369. }()
  370. return nil
  371. }
  372. check := func(ctx context.Context, id discover.NodeID) (bool, error) {
  373. select {
  374. case err := <-errc:
  375. return false, err
  376. case <-ctx.Done():
  377. return false, ctx.Err()
  378. default:
  379. }
  380. var total int64
  381. err := sim.CallClient(id, func(client *rpc.Client) error {
  382. ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
  383. defer cancel()
  384. return client.CallContext(ctx, &total, "stream_readAll", common.BytesToHash(fileHash))
  385. })
  386. log.Info(fmt.Sprintf("check if %08x is available locally: number of bytes read %v/%v (error: %v)", fileHash, total, size, err))
  387. if err != nil || total != int64(size) {
  388. return false, nil
  389. }
  390. return true, nil
  391. }
  392. conf.Step = &simulations.Step{
  393. Action: action,
  394. Trigger: streamTesting.Trigger(10*time.Millisecond, quitC, sim.IDs[0]),
  395. // we are only testing the pivot node (net.Nodes[0])
  396. Expect: &simulations.Expectation{
  397. Nodes: sim.IDs[0:1],
  398. Check: check,
  399. },
  400. }
  401. startedAt := time.Now()
  402. timeout := 300 * time.Second
  403. ctx, cancel := context.WithTimeout(context.Background(), timeout)
  404. defer cancel()
  405. result, err := sim.Run(ctx, conf)
  406. finishedAt := time.Now()
  407. if err != nil {
  408. t.Fatalf("Setting up simulation failed: %v", err)
  409. }
  410. if result.Error != nil {
  411. t.Fatalf("Simulation failed: %s", result.Error)
  412. }
  413. streamTesting.CheckResult(t, result, startedAt, finishedAt)
  414. }
  415. func BenchmarkDeliveryFromNodesWithoutCheck(b *testing.B) {
  416. for chunks := 32; chunks <= 128; chunks *= 2 {
  417. for i := 2; i < 32; i *= 2 {
  418. b.Run(
  419. fmt.Sprintf("nodes=%v,chunks=%v", i, chunks),
  420. func(b *testing.B) {
  421. benchmarkDeliveryFromNodes(b, i, 1, chunks, true)
  422. },
  423. )
  424. }
  425. }
  426. }
  427. func BenchmarkDeliveryFromNodesWithCheck(b *testing.B) {
  428. for chunks := 32; chunks <= 128; chunks *= 2 {
  429. for i := 2; i < 32; i *= 2 {
  430. b.Run(
  431. fmt.Sprintf("nodes=%v,chunks=%v", i, chunks),
  432. func(b *testing.B) {
  433. benchmarkDeliveryFromNodes(b, i, 1, chunks, false)
  434. },
  435. )
  436. }
  437. }
  438. }
  439. func benchmarkDeliveryFromNodes(b *testing.B, nodes, conns, chunkCount int, skipCheck bool) {
  440. defaultSkipCheck = skipCheck
  441. toAddr = network.NewAddrFromNodeID
  442. createStoreFunc = createTestLocalStorageFromSim
  443. registries = make(map[discover.NodeID]*TestRegistry)
  444. timeout := 300 * time.Second
  445. ctx, cancel := context.WithTimeout(context.Background(), timeout)
  446. defer cancel()
  447. conf := &streamTesting.RunConfig{
  448. Adapter: *adapter,
  449. NodeCount: nodes,
  450. ConnLevel: conns,
  451. ToAddr: toAddr,
  452. Services: services,
  453. EnableMsgEvents: false,
  454. }
  455. sim, teardown, err := streamTesting.NewSimulation(conf)
  456. var rpcSubscriptionsWg sync.WaitGroup
  457. defer func() {
  458. rpcSubscriptionsWg.Wait()
  459. teardown()
  460. }()
  461. if err != nil {
  462. b.Fatal(err.Error())
  463. }
  464. stores = make(map[discover.NodeID]storage.ChunkStore)
  465. deliveries = make(map[discover.NodeID]*Delivery)
  466. for i, id := range sim.IDs {
  467. stores[id] = sim.Stores[i]
  468. }
  469. peerCount = func(id discover.NodeID) int {
  470. if sim.IDs[0] == id || sim.IDs[nodes-1] == id {
  471. return 1
  472. }
  473. return 2
  474. }
  475. // wait channel for all nodes all peer connections to set up
  476. waitPeerErrC = make(chan error)
  477. // create a FileStore for the last node in the chain which we are gonna write to
  478. remoteFileStore := storage.NewFileStore(sim.Stores[nodes-1], storage.NewFileStoreParams())
  479. // channel to signal simulation initialisation with action call complete
  480. // or node disconnections
  481. disconnectC := make(chan error)
  482. quitC := make(chan struct{})
  483. initC := make(chan error)
  484. action := func(ctx context.Context) error {
  485. // each node Subscribes to each other's swarmChunkServerStreamName
  486. // need to wait till an aynchronous process registers the peers in streamer.peers
  487. // that is used by Subscribe
  488. // waitPeerErrC using a global err channel to share betweem action and node service
  489. i := 0
  490. for err := range waitPeerErrC {
  491. if err != nil {
  492. return fmt.Errorf("error waiting for peers: %s", err)
  493. }
  494. i++
  495. if i == nodes {
  496. break
  497. }
  498. }
  499. var err error
  500. // each node except the last one subscribes to the upstream swarm chunk server stream
  501. // which responds to chunk retrieve requests
  502. for j := 0; j < nodes-1; j++ {
  503. id := sim.IDs[j]
  504. err = sim.CallClient(id, func(client *rpc.Client) error {
  505. doneC, err := streamTesting.WatchDisconnections(id, client, disconnectC, quitC)
  506. if err != nil {
  507. return err
  508. }
  509. rpcSubscriptionsWg.Add(1)
  510. go func() {
  511. <-doneC
  512. rpcSubscriptionsWg.Done()
  513. }()
  514. ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
  515. defer cancel()
  516. sid := sim.IDs[j+1] // the upstream peer's id
  517. return client.CallContext(ctx, nil, "stream_subscribeStream", sid, NewStream(swarmChunkServerStreamName, "", false), NewRange(0, 0), Top)
  518. })
  519. if err != nil {
  520. break
  521. }
  522. }
  523. initC <- err
  524. return nil
  525. }
  526. // the check function is only triggered when the benchmark finishes
  527. trigger := make(chan discover.NodeID)
  528. check := func(ctx context.Context, id discover.NodeID) (_ bool, err error) {
  529. return true, nil
  530. }
  531. conf.Step = &simulations.Step{
  532. Action: action,
  533. Trigger: trigger,
  534. // we are only testing the pivot node (net.Nodes[0])
  535. Expect: &simulations.Expectation{
  536. Nodes: sim.IDs[0:1],
  537. Check: check,
  538. },
  539. }
  540. // run the simulation in the background
  541. errc := make(chan error)
  542. go func() {
  543. _, err := sim.Run(ctx, conf)
  544. close(quitC)
  545. errc <- err
  546. }()
  547. // wait for simulation action to complete stream subscriptions
  548. err = <-initC
  549. if err != nil {
  550. b.Fatalf("simulation failed to initialise. expected no error. got %v", err)
  551. }
  552. // create a retriever FileStore for the pivot node
  553. // by now deliveries are set for each node by the streamer service
  554. delivery := deliveries[sim.IDs[0]]
  555. retrieveFunc := func(chunk *storage.Chunk) error {
  556. return delivery.RequestFromPeers(chunk.Addr[:], skipCheck)
  557. }
  558. netStore := storage.NewNetStore(sim.Stores[0].(*storage.LocalStore), retrieveFunc)
  559. // benchmark loop
  560. b.ResetTimer()
  561. b.StopTimer()
  562. Loop:
  563. for i := 0; i < b.N; i++ {
  564. // uploading chunkCount random chunks to the last node
  565. hashes := make([]storage.Address, chunkCount)
  566. for i := 0; i < chunkCount; i++ {
  567. // create actual size real chunks
  568. hash, wait, err := remoteFileStore.Store(io.LimitReader(crand.Reader, int64(chunkSize)), int64(chunkSize), false)
  569. // wait until all chunks stored
  570. wait()
  571. if err != nil {
  572. b.Fatalf("expected no error. got %v", err)
  573. }
  574. // collect the hashes
  575. hashes[i] = hash
  576. }
  577. // now benchmark the actual retrieval
  578. // netstore.Get is called for each hash in a go routine and errors are collected
  579. b.StartTimer()
  580. errs := make(chan error)
  581. for _, hash := range hashes {
  582. go func(h storage.Address) {
  583. _, err := netStore.Get(h)
  584. log.Warn("test check netstore get", "hash", h, "err", err)
  585. errs <- err
  586. }(hash)
  587. }
  588. // count and report retrieval errors
  589. // if there are misses then chunk timeout is too low for the distance and volume (?)
  590. var total, misses int
  591. for err := range errs {
  592. if err != nil {
  593. log.Warn(err.Error())
  594. misses++
  595. }
  596. total++
  597. if total == chunkCount {
  598. break
  599. }
  600. }
  601. b.StopTimer()
  602. select {
  603. case err = <-disconnectC:
  604. if err != nil {
  605. break Loop
  606. }
  607. default:
  608. }
  609. if misses > 0 {
  610. err = fmt.Errorf("%v chunk not found out of %v", misses, total)
  611. break Loop
  612. }
  613. }
  614. select {
  615. case <-quitC:
  616. case trigger <- sim.IDs[0]:
  617. }
  618. if err == nil {
  619. err = <-errc
  620. } else {
  621. if e := <-errc; e != nil {
  622. b.Errorf("sim.Run function error: %v", e)
  623. }
  624. }
  625. // benchmark over, trigger the check function to conclude the simulation
  626. if err != nil {
  627. b.Fatalf("expected no error. got %v", err)
  628. }
  629. }
  630. func createTestLocalStorageFromSim(id discover.NodeID, addr *network.BzzAddr) (storage.ChunkStore, error) {
  631. return stores[id], nil
  632. }