delivery_test.go 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707
  1. // Copyright 2018 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package stream
  17. import (
  18. "bytes"
  19. "context"
  20. crand "crypto/rand"
  21. "fmt"
  22. "io"
  23. "sync"
  24. "testing"
  25. "time"
  26. "github.com/ethereum/go-ethereum/common"
  27. "github.com/ethereum/go-ethereum/p2p/discover"
  28. "github.com/ethereum/go-ethereum/p2p/simulations"
  29. p2ptest "github.com/ethereum/go-ethereum/p2p/testing"
  30. "github.com/ethereum/go-ethereum/rpc"
  31. "github.com/ethereum/go-ethereum/swarm/log"
  32. "github.com/ethereum/go-ethereum/swarm/network"
  33. streamTesting "github.com/ethereum/go-ethereum/swarm/network/stream/testing"
  34. "github.com/ethereum/go-ethereum/swarm/storage"
  35. )
  36. func TestStreamerRetrieveRequest(t *testing.T) {
  37. tester, streamer, _, teardown, err := newStreamerTester(t)
  38. defer teardown()
  39. if err != nil {
  40. t.Fatal(err)
  41. }
  42. peerID := tester.IDs[0]
  43. streamer.delivery.RequestFromPeers(hash0[:], true)
  44. err = tester.TestExchanges(p2ptest.Exchange{
  45. Label: "RetrieveRequestMsg",
  46. Expects: []p2ptest.Expect{
  47. {
  48. Code: 5,
  49. Msg: &RetrieveRequestMsg{
  50. Addr: hash0[:],
  51. SkipCheck: true,
  52. },
  53. Peer: peerID,
  54. },
  55. },
  56. })
  57. if err != nil {
  58. t.Fatalf("Expected no error, got %v", err)
  59. }
  60. }
  61. func TestStreamerUpstreamRetrieveRequestMsgExchangeWithoutStore(t *testing.T) {
  62. tester, streamer, _, teardown, err := newStreamerTester(t)
  63. defer teardown()
  64. if err != nil {
  65. t.Fatal(err)
  66. }
  67. peerID := tester.IDs[0]
  68. chunk := storage.NewChunk(storage.Address(hash0[:]), nil)
  69. peer := streamer.getPeer(peerID)
  70. peer.handleSubscribeMsg(&SubscribeMsg{
  71. Stream: NewStream(swarmChunkServerStreamName, "", false),
  72. History: nil,
  73. Priority: Top,
  74. })
  75. err = tester.TestExchanges(p2ptest.Exchange{
  76. Label: "RetrieveRequestMsg",
  77. Triggers: []p2ptest.Trigger{
  78. {
  79. Code: 5,
  80. Msg: &RetrieveRequestMsg{
  81. Addr: chunk.Addr[:],
  82. },
  83. Peer: peerID,
  84. },
  85. },
  86. Expects: []p2ptest.Expect{
  87. {
  88. Code: 1,
  89. Msg: &OfferedHashesMsg{
  90. HandoverProof: nil,
  91. Hashes: nil,
  92. From: 0,
  93. To: 0,
  94. },
  95. Peer: peerID,
  96. },
  97. },
  98. })
  99. expectedError := `exchange #0 "RetrieveRequestMsg": timed out`
  100. if err == nil || err.Error() != expectedError {
  101. t.Fatalf("Expected error %v, got %v", expectedError, err)
  102. }
  103. }
  104. // upstream request server receives a retrieve Request and responds with
  105. // offered hashes or delivery if skipHash is set to true
  106. func TestStreamerUpstreamRetrieveRequestMsgExchange(t *testing.T) {
  107. tester, streamer, localStore, teardown, err := newStreamerTester(t)
  108. defer teardown()
  109. if err != nil {
  110. t.Fatal(err)
  111. }
  112. peerID := tester.IDs[0]
  113. peer := streamer.getPeer(peerID)
  114. stream := NewStream(swarmChunkServerStreamName, "", false)
  115. peer.handleSubscribeMsg(&SubscribeMsg{
  116. Stream: stream,
  117. History: nil,
  118. Priority: Top,
  119. })
  120. hash := storage.Address(hash0[:])
  121. chunk := storage.NewChunk(hash, nil)
  122. chunk.SData = hash
  123. localStore.Put(chunk)
  124. chunk.WaitToStore()
  125. err = tester.TestExchanges(p2ptest.Exchange{
  126. Label: "RetrieveRequestMsg",
  127. Triggers: []p2ptest.Trigger{
  128. {
  129. Code: 5,
  130. Msg: &RetrieveRequestMsg{
  131. Addr: hash,
  132. },
  133. Peer: peerID,
  134. },
  135. },
  136. Expects: []p2ptest.Expect{
  137. {
  138. Code: 1,
  139. Msg: &OfferedHashesMsg{
  140. HandoverProof: &HandoverProof{
  141. Handover: &Handover{},
  142. },
  143. Hashes: hash,
  144. From: 0,
  145. // TODO: why is this 32???
  146. To: 32,
  147. Stream: stream,
  148. },
  149. Peer: peerID,
  150. },
  151. },
  152. })
  153. if err != nil {
  154. t.Fatal(err)
  155. }
  156. hash = storage.Address(hash1[:])
  157. chunk = storage.NewChunk(hash, nil)
  158. chunk.SData = hash1[:]
  159. localStore.Put(chunk)
  160. chunk.WaitToStore()
  161. err = tester.TestExchanges(p2ptest.Exchange{
  162. Label: "RetrieveRequestMsg",
  163. Triggers: []p2ptest.Trigger{
  164. {
  165. Code: 5,
  166. Msg: &RetrieveRequestMsg{
  167. Addr: hash,
  168. SkipCheck: true,
  169. },
  170. Peer: peerID,
  171. },
  172. },
  173. Expects: []p2ptest.Expect{
  174. {
  175. Code: 6,
  176. Msg: &ChunkDeliveryMsg{
  177. Addr: hash,
  178. SData: hash,
  179. },
  180. Peer: peerID,
  181. },
  182. },
  183. })
  184. if err != nil {
  185. t.Fatal(err)
  186. }
  187. }
  188. func TestStreamerDownstreamChunkDeliveryMsgExchange(t *testing.T) {
  189. tester, streamer, localStore, teardown, err := newStreamerTester(t)
  190. defer teardown()
  191. if err != nil {
  192. t.Fatal(err)
  193. }
  194. streamer.RegisterClientFunc("foo", func(p *Peer, t string, live bool) (Client, error) {
  195. return &testClient{
  196. t: t,
  197. }, nil
  198. })
  199. peerID := tester.IDs[0]
  200. stream := NewStream("foo", "", true)
  201. err = streamer.Subscribe(peerID, stream, NewRange(5, 8), Top)
  202. if err != nil {
  203. t.Fatalf("Expected no error, got %v", err)
  204. }
  205. chunkKey := hash0[:]
  206. chunkData := hash1[:]
  207. chunk, created := localStore.GetOrCreateRequest(chunkKey)
  208. if !created {
  209. t.Fatal("chunk already exists")
  210. }
  211. select {
  212. case <-chunk.ReqC:
  213. t.Fatal("chunk is already received")
  214. default:
  215. }
  216. err = tester.TestExchanges(p2ptest.Exchange{
  217. Label: "Subscribe message",
  218. Expects: []p2ptest.Expect{
  219. {
  220. Code: 4,
  221. Msg: &SubscribeMsg{
  222. Stream: stream,
  223. History: NewRange(5, 8),
  224. Priority: Top,
  225. },
  226. Peer: peerID,
  227. },
  228. },
  229. },
  230. p2ptest.Exchange{
  231. Label: "ChunkDeliveryRequest message",
  232. Triggers: []p2ptest.Trigger{
  233. {
  234. Code: 6,
  235. Msg: &ChunkDeliveryMsg{
  236. Addr: chunkKey,
  237. SData: chunkData,
  238. },
  239. Peer: peerID,
  240. },
  241. },
  242. })
  243. if err != nil {
  244. t.Fatalf("Expected no error, got %v", err)
  245. }
  246. timeout := time.NewTimer(1 * time.Second)
  247. select {
  248. case <-timeout.C:
  249. t.Fatal("timeout receiving chunk")
  250. case <-chunk.ReqC:
  251. }
  252. storedChunk, err := localStore.Get(chunkKey)
  253. if err != nil {
  254. t.Fatalf("Expected no error, got %v", err)
  255. }
  256. if !bytes.Equal(storedChunk.SData, chunkData) {
  257. t.Fatal("Retrieved chunk has different data than original")
  258. }
  259. }
  260. func TestDeliveryFromNodes(t *testing.T) {
  261. testDeliveryFromNodes(t, 2, 1, dataChunkCount, true)
  262. testDeliveryFromNodes(t, 2, 1, dataChunkCount, false)
  263. testDeliveryFromNodes(t, 4, 1, dataChunkCount, true)
  264. testDeliveryFromNodes(t, 4, 1, dataChunkCount, false)
  265. testDeliveryFromNodes(t, 8, 1, dataChunkCount, true)
  266. testDeliveryFromNodes(t, 8, 1, dataChunkCount, false)
  267. testDeliveryFromNodes(t, 16, 1, dataChunkCount, true)
  268. testDeliveryFromNodes(t, 16, 1, dataChunkCount, false)
  269. }
  270. func testDeliveryFromNodes(t *testing.T, nodes, conns, chunkCount int, skipCheck bool) {
  271. defaultSkipCheck = skipCheck
  272. toAddr = network.NewAddrFromNodeID
  273. createStoreFunc = createTestLocalStorageFromSim
  274. conf := &streamTesting.RunConfig{
  275. Adapter: *adapter,
  276. NodeCount: nodes,
  277. ConnLevel: conns,
  278. ToAddr: toAddr,
  279. Services: services,
  280. EnableMsgEvents: false,
  281. }
  282. sim, teardown, err := streamTesting.NewSimulation(conf)
  283. var rpcSubscriptionsWg sync.WaitGroup
  284. defer func() {
  285. rpcSubscriptionsWg.Wait()
  286. teardown()
  287. }()
  288. if err != nil {
  289. t.Fatal(err.Error())
  290. }
  291. stores = make(map[discover.NodeID]storage.ChunkStore)
  292. for i, id := range sim.IDs {
  293. stores[id] = sim.Stores[i]
  294. }
  295. registries = make(map[discover.NodeID]*TestRegistry)
  296. deliveries = make(map[discover.NodeID]*Delivery)
  297. peerCount = func(id discover.NodeID) int {
  298. if sim.IDs[0] == id || sim.IDs[nodes-1] == id {
  299. return 1
  300. }
  301. return 2
  302. }
  303. // here we distribute chunks of a random file into Stores of nodes 1 to nodes
  304. rrFileStore := storage.NewFileStore(newRoundRobinStore(sim.Stores[1:]...), storage.NewFileStoreParams())
  305. size := chunkCount * chunkSize
  306. ctx := context.TODO()
  307. fileHash, wait, err := rrFileStore.Store(ctx, io.LimitReader(crand.Reader, int64(size)), int64(size), false)
  308. // wait until all chunks stored
  309. if err != nil {
  310. t.Fatal(err.Error())
  311. }
  312. err = wait(ctx)
  313. if err != nil {
  314. t.Fatal(err.Error())
  315. }
  316. errc := make(chan error, 1)
  317. waitPeerErrC = make(chan error)
  318. quitC := make(chan struct{})
  319. defer close(quitC)
  320. action := func(ctx context.Context) error {
  321. // each node Subscribes to each other's swarmChunkServerStreamName
  322. // need to wait till an aynchronous process registers the peers in streamer.peers
  323. // that is used by Subscribe
  324. // using a global err channel to share betweem action and node service
  325. i := 0
  326. for err := range waitPeerErrC {
  327. if err != nil {
  328. return fmt.Errorf("error waiting for peers: %s", err)
  329. }
  330. i++
  331. if i == nodes {
  332. break
  333. }
  334. }
  335. // each node subscribes to the upstream swarm chunk server stream
  336. // which responds to chunk retrieve requests all but the last node in the chain does not
  337. for j := 0; j < nodes-1; j++ {
  338. id := sim.IDs[j]
  339. err := sim.CallClient(id, func(client *rpc.Client) error {
  340. doneC, err := streamTesting.WatchDisconnections(id, client, errc, quitC)
  341. if err != nil {
  342. return err
  343. }
  344. rpcSubscriptionsWg.Add(1)
  345. go func() {
  346. <-doneC
  347. rpcSubscriptionsWg.Done()
  348. }()
  349. ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
  350. defer cancel()
  351. sid := sim.IDs[j+1]
  352. return client.CallContext(ctx, nil, "stream_subscribeStream", sid, NewStream(swarmChunkServerStreamName, "", false), NewRange(0, 0), Top)
  353. })
  354. if err != nil {
  355. return err
  356. }
  357. }
  358. // create a retriever FileStore for the pivot node
  359. delivery := deliveries[sim.IDs[0]]
  360. retrieveFunc := func(chunk *storage.Chunk) error {
  361. return delivery.RequestFromPeers(chunk.Addr[:], skipCheck)
  362. }
  363. netStore := storage.NewNetStore(sim.Stores[0].(*storage.LocalStore), retrieveFunc)
  364. fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams())
  365. go func() {
  366. // start the retrieval on the pivot node - this will spawn retrieve requests for missing chunks
  367. // we must wait for the peer connections to have started before requesting
  368. n, err := readAll(fileStore, fileHash)
  369. log.Info(fmt.Sprintf("retrieved %v", fileHash), "read", n, "err", err)
  370. if err != nil {
  371. errc <- fmt.Errorf("requesting chunks action error: %v", err)
  372. }
  373. }()
  374. return nil
  375. }
  376. check := func(ctx context.Context, id discover.NodeID) (bool, error) {
  377. select {
  378. case err := <-errc:
  379. return false, err
  380. case <-ctx.Done():
  381. return false, ctx.Err()
  382. default:
  383. }
  384. var total int64
  385. err := sim.CallClient(id, func(client *rpc.Client) error {
  386. ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
  387. defer cancel()
  388. return client.CallContext(ctx, &total, "stream_readAll", common.BytesToHash(fileHash))
  389. })
  390. log.Info(fmt.Sprintf("check if %08x is available locally: number of bytes read %v/%v (error: %v)", fileHash, total, size, err))
  391. if err != nil || total != int64(size) {
  392. return false, nil
  393. }
  394. return true, nil
  395. }
  396. conf.Step = &simulations.Step{
  397. Action: action,
  398. Trigger: streamTesting.Trigger(10*time.Millisecond, quitC, sim.IDs[0]),
  399. // we are only testing the pivot node (net.Nodes[0])
  400. Expect: &simulations.Expectation{
  401. Nodes: sim.IDs[0:1],
  402. Check: check,
  403. },
  404. }
  405. startedAt := time.Now()
  406. timeout := 300 * time.Second
  407. ctx, cancel := context.WithTimeout(context.Background(), timeout)
  408. defer cancel()
  409. result, err := sim.Run(ctx, conf)
  410. finishedAt := time.Now()
  411. if err != nil {
  412. t.Fatalf("Setting up simulation failed: %v", err)
  413. }
  414. if result.Error != nil {
  415. t.Fatalf("Simulation failed: %s", result.Error)
  416. }
  417. streamTesting.CheckResult(t, result, startedAt, finishedAt)
  418. }
  419. func BenchmarkDeliveryFromNodesWithoutCheck(b *testing.B) {
  420. for chunks := 32; chunks <= 128; chunks *= 2 {
  421. for i := 2; i < 32; i *= 2 {
  422. b.Run(
  423. fmt.Sprintf("nodes=%v,chunks=%v", i, chunks),
  424. func(b *testing.B) {
  425. benchmarkDeliveryFromNodes(b, i, 1, chunks, true)
  426. },
  427. )
  428. }
  429. }
  430. }
  431. func BenchmarkDeliveryFromNodesWithCheck(b *testing.B) {
  432. for chunks := 32; chunks <= 128; chunks *= 2 {
  433. for i := 2; i < 32; i *= 2 {
  434. b.Run(
  435. fmt.Sprintf("nodes=%v,chunks=%v", i, chunks),
  436. func(b *testing.B) {
  437. benchmarkDeliveryFromNodes(b, i, 1, chunks, false)
  438. },
  439. )
  440. }
  441. }
  442. }
  443. func benchmarkDeliveryFromNodes(b *testing.B, nodes, conns, chunkCount int, skipCheck bool) {
  444. defaultSkipCheck = skipCheck
  445. toAddr = network.NewAddrFromNodeID
  446. createStoreFunc = createTestLocalStorageFromSim
  447. registries = make(map[discover.NodeID]*TestRegistry)
  448. timeout := 300 * time.Second
  449. ctx, cancel := context.WithTimeout(context.Background(), timeout)
  450. defer cancel()
  451. conf := &streamTesting.RunConfig{
  452. Adapter: *adapter,
  453. NodeCount: nodes,
  454. ConnLevel: conns,
  455. ToAddr: toAddr,
  456. Services: services,
  457. EnableMsgEvents: false,
  458. }
  459. sim, teardown, err := streamTesting.NewSimulation(conf)
  460. var rpcSubscriptionsWg sync.WaitGroup
  461. defer func() {
  462. rpcSubscriptionsWg.Wait()
  463. teardown()
  464. }()
  465. if err != nil {
  466. b.Fatal(err.Error())
  467. }
  468. stores = make(map[discover.NodeID]storage.ChunkStore)
  469. deliveries = make(map[discover.NodeID]*Delivery)
  470. for i, id := range sim.IDs {
  471. stores[id] = sim.Stores[i]
  472. }
  473. peerCount = func(id discover.NodeID) int {
  474. if sim.IDs[0] == id || sim.IDs[nodes-1] == id {
  475. return 1
  476. }
  477. return 2
  478. }
  479. // wait channel for all nodes all peer connections to set up
  480. waitPeerErrC = make(chan error)
  481. // create a FileStore for the last node in the chain which we are gonna write to
  482. remoteFileStore := storage.NewFileStore(sim.Stores[nodes-1], storage.NewFileStoreParams())
  483. // channel to signal simulation initialisation with action call complete
  484. // or node disconnections
  485. disconnectC := make(chan error)
  486. quitC := make(chan struct{})
  487. initC := make(chan error)
  488. action := func(ctx context.Context) error {
  489. // each node Subscribes to each other's swarmChunkServerStreamName
  490. // need to wait till an aynchronous process registers the peers in streamer.peers
  491. // that is used by Subscribe
  492. // waitPeerErrC using a global err channel to share betweem action and node service
  493. i := 0
  494. for err := range waitPeerErrC {
  495. if err != nil {
  496. return fmt.Errorf("error waiting for peers: %s", err)
  497. }
  498. i++
  499. if i == nodes {
  500. break
  501. }
  502. }
  503. var err error
  504. // each node except the last one subscribes to the upstream swarm chunk server stream
  505. // which responds to chunk retrieve requests
  506. for j := 0; j < nodes-1; j++ {
  507. id := sim.IDs[j]
  508. err = sim.CallClient(id, func(client *rpc.Client) error {
  509. doneC, err := streamTesting.WatchDisconnections(id, client, disconnectC, quitC)
  510. if err != nil {
  511. return err
  512. }
  513. rpcSubscriptionsWg.Add(1)
  514. go func() {
  515. <-doneC
  516. rpcSubscriptionsWg.Done()
  517. }()
  518. ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
  519. defer cancel()
  520. sid := sim.IDs[j+1] // the upstream peer's id
  521. return client.CallContext(ctx, nil, "stream_subscribeStream", sid, NewStream(swarmChunkServerStreamName, "", false), NewRange(0, 0), Top)
  522. })
  523. if err != nil {
  524. break
  525. }
  526. }
  527. initC <- err
  528. return nil
  529. }
  530. // the check function is only triggered when the benchmark finishes
  531. trigger := make(chan discover.NodeID)
  532. check := func(ctx context.Context, id discover.NodeID) (_ bool, err error) {
  533. return true, nil
  534. }
  535. conf.Step = &simulations.Step{
  536. Action: action,
  537. Trigger: trigger,
  538. // we are only testing the pivot node (net.Nodes[0])
  539. Expect: &simulations.Expectation{
  540. Nodes: sim.IDs[0:1],
  541. Check: check,
  542. },
  543. }
  544. // run the simulation in the background
  545. errc := make(chan error)
  546. go func() {
  547. _, err := sim.Run(ctx, conf)
  548. close(quitC)
  549. errc <- err
  550. }()
  551. // wait for simulation action to complete stream subscriptions
  552. err = <-initC
  553. if err != nil {
  554. b.Fatalf("simulation failed to initialise. expected no error. got %v", err)
  555. }
  556. // create a retriever FileStore for the pivot node
  557. // by now deliveries are set for each node by the streamer service
  558. delivery := deliveries[sim.IDs[0]]
  559. retrieveFunc := func(chunk *storage.Chunk) error {
  560. return delivery.RequestFromPeers(chunk.Addr[:], skipCheck)
  561. }
  562. netStore := storage.NewNetStore(sim.Stores[0].(*storage.LocalStore), retrieveFunc)
  563. // benchmark loop
  564. b.ResetTimer()
  565. b.StopTimer()
  566. Loop:
  567. for i := 0; i < b.N; i++ {
  568. // uploading chunkCount random chunks to the last node
  569. hashes := make([]storage.Address, chunkCount)
  570. for i := 0; i < chunkCount; i++ {
  571. // create actual size real chunks
  572. ctx := context.TODO()
  573. hash, wait, err := remoteFileStore.Store(ctx, io.LimitReader(crand.Reader, int64(chunkSize)), int64(chunkSize), false)
  574. if err != nil {
  575. b.Fatalf("expected no error. got %v", err)
  576. }
  577. // wait until all chunks stored
  578. err = wait(ctx)
  579. if err != nil {
  580. b.Fatalf("expected no error. got %v", err)
  581. }
  582. // collect the hashes
  583. hashes[i] = hash
  584. }
  585. // now benchmark the actual retrieval
  586. // netstore.Get is called for each hash in a go routine and errors are collected
  587. b.StartTimer()
  588. errs := make(chan error)
  589. for _, hash := range hashes {
  590. go func(h storage.Address) {
  591. _, err := netStore.Get(h)
  592. log.Warn("test check netstore get", "hash", h, "err", err)
  593. errs <- err
  594. }(hash)
  595. }
  596. // count and report retrieval errors
  597. // if there are misses then chunk timeout is too low for the distance and volume (?)
  598. var total, misses int
  599. for err := range errs {
  600. if err != nil {
  601. log.Warn(err.Error())
  602. misses++
  603. }
  604. total++
  605. if total == chunkCount {
  606. break
  607. }
  608. }
  609. b.StopTimer()
  610. select {
  611. case err = <-disconnectC:
  612. if err != nil {
  613. break Loop
  614. }
  615. default:
  616. }
  617. if misses > 0 {
  618. err = fmt.Errorf("%v chunk not found out of %v", misses, total)
  619. break Loop
  620. }
  621. }
  622. select {
  623. case <-quitC:
  624. case trigger <- sim.IDs[0]:
  625. }
  626. if err == nil {
  627. err = <-errc
  628. } else {
  629. if e := <-errc; e != nil {
  630. b.Errorf("sim.Run function error: %v", e)
  631. }
  632. }
  633. // benchmark over, trigger the check function to conclude the simulation
  634. if err != nil {
  635. b.Fatalf("expected no error. got %v", err)
  636. }
  637. }
  638. func createTestLocalStorageFromSim(id discover.NodeID, addr *network.BzzAddr) (storage.ChunkStore, error) {
  639. return stores[id], nil
  640. }