delivery_test.go 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795
  1. // Copyright 2018 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package stream
  17. import (
  18. "bytes"
  19. "context"
  20. "errors"
  21. "fmt"
  22. "os"
  23. "sync"
  24. "sync/atomic"
  25. "testing"
  26. "time"
  27. "github.com/ethereum/go-ethereum/node"
  28. "github.com/ethereum/go-ethereum/p2p"
  29. "github.com/ethereum/go-ethereum/p2p/enode"
  30. "github.com/ethereum/go-ethereum/p2p/protocols"
  31. "github.com/ethereum/go-ethereum/p2p/simulations/adapters"
  32. p2ptest "github.com/ethereum/go-ethereum/p2p/testing"
  33. "github.com/ethereum/go-ethereum/swarm/log"
  34. "github.com/ethereum/go-ethereum/swarm/network"
  35. pq "github.com/ethereum/go-ethereum/swarm/network/priorityqueue"
  36. "github.com/ethereum/go-ethereum/swarm/network/simulation"
  37. "github.com/ethereum/go-ethereum/swarm/state"
  38. "github.com/ethereum/go-ethereum/swarm/storage"
  39. "github.com/ethereum/go-ethereum/swarm/testutil"
  40. )
  41. //Tests initializing a retrieve request
  42. func TestStreamerRetrieveRequest(t *testing.T) {
  43. regOpts := &RegistryOptions{
  44. Retrieval: RetrievalClientOnly,
  45. Syncing: SyncingDisabled,
  46. }
  47. tester, streamer, _, teardown, err := newStreamerTester(regOpts)
  48. defer teardown()
  49. if err != nil {
  50. t.Fatal(err)
  51. }
  52. node := tester.Nodes[0]
  53. ctx := context.Background()
  54. req := network.NewRequest(
  55. storage.Address(hash0[:]),
  56. true,
  57. &sync.Map{},
  58. )
  59. streamer.delivery.RequestFromPeers(ctx, req)
  60. stream := NewStream(swarmChunkServerStreamName, "", true)
  61. err = tester.TestExchanges(p2ptest.Exchange{
  62. Label: "RetrieveRequestMsg",
  63. Expects: []p2ptest.Expect{
  64. { //start expecting a subscription for RETRIEVE_REQUEST due to `RetrievalClientOnly`
  65. Code: 4,
  66. Msg: &SubscribeMsg{
  67. Stream: stream,
  68. History: nil,
  69. Priority: Top,
  70. },
  71. Peer: node.ID(),
  72. },
  73. { //expect a retrieve request message for the given hash
  74. Code: 5,
  75. Msg: &RetrieveRequestMsg{
  76. Addr: hash0[:],
  77. SkipCheck: true,
  78. },
  79. Peer: node.ID(),
  80. },
  81. },
  82. })
  83. if err != nil {
  84. t.Fatalf("Expected no error, got %v", err)
  85. }
  86. }
  87. //Test requesting a chunk from a peer then issuing a "empty" OfferedHashesMsg (no hashes available yet)
  88. //Should time out as the peer does not have the chunk (no syncing happened previously)
  89. func TestStreamerUpstreamRetrieveRequestMsgExchangeWithoutStore(t *testing.T) {
  90. tester, streamer, _, teardown, err := newStreamerTester(&RegistryOptions{
  91. Retrieval: RetrievalEnabled,
  92. Syncing: SyncingDisabled, //do no syncing
  93. })
  94. defer teardown()
  95. if err != nil {
  96. t.Fatal(err)
  97. }
  98. node := tester.Nodes[0]
  99. chunk := storage.NewChunk(storage.Address(hash0[:]), nil)
  100. peer := streamer.getPeer(node.ID())
  101. stream := NewStream(swarmChunkServerStreamName, "", true)
  102. //simulate pre-subscription to RETRIEVE_REQUEST stream on peer
  103. peer.handleSubscribeMsg(context.TODO(), &SubscribeMsg{
  104. Stream: stream,
  105. History: nil,
  106. Priority: Top,
  107. })
  108. //test the exchange
  109. err = tester.TestExchanges(p2ptest.Exchange{
  110. Expects: []p2ptest.Expect{
  111. { //first expect a subscription to the RETRIEVE_REQUEST stream
  112. Code: 4,
  113. Msg: &SubscribeMsg{
  114. Stream: stream,
  115. History: nil,
  116. Priority: Top,
  117. },
  118. Peer: node.ID(),
  119. },
  120. },
  121. }, p2ptest.Exchange{
  122. Label: "RetrieveRequestMsg",
  123. Triggers: []p2ptest.Trigger{
  124. { //then the actual RETRIEVE_REQUEST....
  125. Code: 5,
  126. Msg: &RetrieveRequestMsg{
  127. Addr: chunk.Address()[:],
  128. },
  129. Peer: node.ID(),
  130. },
  131. },
  132. Expects: []p2ptest.Expect{
  133. { //to which the peer responds with offered hashes
  134. Code: 1,
  135. Msg: &OfferedHashesMsg{
  136. HandoverProof: nil,
  137. Hashes: nil,
  138. From: 0,
  139. To: 0,
  140. },
  141. Peer: node.ID(),
  142. },
  143. },
  144. })
  145. //should fail with a timeout as the peer we are requesting
  146. //the chunk from does not have the chunk
  147. expectedError := `exchange #1 "RetrieveRequestMsg": timed out`
  148. if err == nil || err.Error() != expectedError {
  149. t.Fatalf("Expected error %v, got %v", expectedError, err)
  150. }
  151. }
  152. // upstream request server receives a retrieve Request and responds with
  153. // offered hashes or delivery if skipHash is set to true
  154. func TestStreamerUpstreamRetrieveRequestMsgExchange(t *testing.T) {
  155. tester, streamer, localStore, teardown, err := newStreamerTester(&RegistryOptions{
  156. Retrieval: RetrievalEnabled,
  157. Syncing: SyncingDisabled,
  158. })
  159. defer teardown()
  160. if err != nil {
  161. t.Fatal(err)
  162. }
  163. node := tester.Nodes[0]
  164. peer := streamer.getPeer(node.ID())
  165. stream := NewStream(swarmChunkServerStreamName, "", true)
  166. peer.handleSubscribeMsg(context.TODO(), &SubscribeMsg{
  167. Stream: stream,
  168. History: nil,
  169. Priority: Top,
  170. })
  171. hash := storage.Address(hash0[:])
  172. chunk := storage.NewChunk(hash, hash)
  173. err = localStore.Put(context.TODO(), chunk)
  174. if err != nil {
  175. t.Fatalf("Expected no err got %v", err)
  176. }
  177. err = tester.TestExchanges(p2ptest.Exchange{
  178. Expects: []p2ptest.Expect{
  179. {
  180. Code: 4,
  181. Msg: &SubscribeMsg{
  182. Stream: stream,
  183. History: nil,
  184. Priority: Top,
  185. },
  186. Peer: node.ID(),
  187. },
  188. },
  189. }, p2ptest.Exchange{
  190. Label: "RetrieveRequestMsg",
  191. Triggers: []p2ptest.Trigger{
  192. {
  193. Code: 5,
  194. Msg: &RetrieveRequestMsg{
  195. Addr: hash,
  196. },
  197. Peer: node.ID(),
  198. },
  199. },
  200. Expects: []p2ptest.Expect{
  201. {
  202. Code: 1,
  203. Msg: &OfferedHashesMsg{
  204. HandoverProof: &HandoverProof{
  205. Handover: &Handover{},
  206. },
  207. Hashes: hash,
  208. From: 0,
  209. // TODO: why is this 32???
  210. To: 32,
  211. Stream: stream,
  212. },
  213. Peer: node.ID(),
  214. },
  215. },
  216. })
  217. if err != nil {
  218. t.Fatal(err)
  219. }
  220. hash = storage.Address(hash1[:])
  221. chunk = storage.NewChunk(hash, hash1[:])
  222. err = localStore.Put(context.TODO(), chunk)
  223. if err != nil {
  224. t.Fatalf("Expected no err got %v", err)
  225. }
  226. err = tester.TestExchanges(p2ptest.Exchange{
  227. Label: "RetrieveRequestMsg",
  228. Triggers: []p2ptest.Trigger{
  229. {
  230. Code: 5,
  231. Msg: &RetrieveRequestMsg{
  232. Addr: hash,
  233. SkipCheck: true,
  234. },
  235. Peer: node.ID(),
  236. },
  237. },
  238. Expects: []p2ptest.Expect{
  239. {
  240. Code: 6,
  241. Msg: &ChunkDeliveryMsg{
  242. Addr: hash,
  243. SData: hash,
  244. },
  245. Peer: node.ID(),
  246. },
  247. },
  248. })
  249. if err != nil {
  250. t.Fatal(err)
  251. }
  252. }
  253. // if there is one peer in the Kademlia, RequestFromPeers should return it
  254. func TestRequestFromPeers(t *testing.T) {
  255. dummyPeerID := enode.HexID("3431c3939e1ee2a6345e976a8234f9870152d64879f30bc272a074f6859e75e8")
  256. addr := network.RandomAddr()
  257. to := network.NewKademlia(addr.OAddr, network.NewKadParams())
  258. delivery := NewDelivery(to, nil)
  259. protocolsPeer := protocols.NewPeer(p2p.NewPeer(dummyPeerID, "dummy", []p2p.Cap{{Name: "stream"}}), nil, nil)
  260. peer := network.NewPeer(&network.BzzPeer{
  261. BzzAddr: network.RandomAddr(),
  262. LightNode: false,
  263. Peer: protocolsPeer,
  264. }, to)
  265. to.On(peer)
  266. r := NewRegistry(addr.ID(), delivery, nil, nil, nil, nil)
  267. // an empty priorityQueue has to be created to prevent a goroutine being called after the test has finished
  268. sp := &Peer{
  269. Peer: protocolsPeer,
  270. pq: pq.New(int(PriorityQueue), PriorityQueueCap),
  271. streamer: r,
  272. }
  273. r.setPeer(sp)
  274. req := network.NewRequest(
  275. storage.Address(hash0[:]),
  276. true,
  277. &sync.Map{},
  278. )
  279. ctx := context.Background()
  280. id, _, err := delivery.RequestFromPeers(ctx, req)
  281. if err != nil {
  282. t.Fatal(err)
  283. }
  284. if *id != dummyPeerID {
  285. t.Fatalf("Expected an id, got %v", id)
  286. }
  287. }
  288. // RequestFromPeers should not return light nodes
  289. func TestRequestFromPeersWithLightNode(t *testing.T) {
  290. dummyPeerID := enode.HexID("3431c3939e1ee2a6345e976a8234f9870152d64879f30bc272a074f6859e75e8")
  291. addr := network.RandomAddr()
  292. to := network.NewKademlia(addr.OAddr, network.NewKadParams())
  293. delivery := NewDelivery(to, nil)
  294. protocolsPeer := protocols.NewPeer(p2p.NewPeer(dummyPeerID, "dummy", nil), nil, nil)
  295. // setting up a lightnode
  296. peer := network.NewPeer(&network.BzzPeer{
  297. BzzAddr: network.RandomAddr(),
  298. LightNode: true,
  299. Peer: protocolsPeer,
  300. }, to)
  301. to.On(peer)
  302. r := NewRegistry(addr.ID(), delivery, nil, nil, nil, nil)
  303. // an empty priorityQueue has to be created to prevent a goroutine being called after the test has finished
  304. sp := &Peer{
  305. Peer: protocolsPeer,
  306. pq: pq.New(int(PriorityQueue), PriorityQueueCap),
  307. streamer: r,
  308. }
  309. r.setPeer(sp)
  310. req := network.NewRequest(
  311. storage.Address(hash0[:]),
  312. true,
  313. &sync.Map{},
  314. )
  315. ctx := context.Background()
  316. // making a request which should return with "no peer found"
  317. _, _, err := delivery.RequestFromPeers(ctx, req)
  318. expectedError := "no peer found"
  319. if err.Error() != expectedError {
  320. t.Fatalf("expected '%v', got %v", expectedError, err)
  321. }
  322. }
  323. func TestStreamerDownstreamChunkDeliveryMsgExchange(t *testing.T) {
  324. tester, streamer, localStore, teardown, err := newStreamerTester(&RegistryOptions{
  325. Retrieval: RetrievalDisabled,
  326. Syncing: SyncingDisabled,
  327. })
  328. defer teardown()
  329. if err != nil {
  330. t.Fatal(err)
  331. }
  332. streamer.RegisterClientFunc("foo", func(p *Peer, t string, live bool) (Client, error) {
  333. return &testClient{
  334. t: t,
  335. }, nil
  336. })
  337. node := tester.Nodes[0]
  338. //subscribe to custom stream
  339. stream := NewStream("foo", "", true)
  340. err = streamer.Subscribe(node.ID(), stream, NewRange(5, 8), Top)
  341. if err != nil {
  342. t.Fatalf("Expected no error, got %v", err)
  343. }
  344. chunkKey := hash0[:]
  345. chunkData := hash1[:]
  346. err = tester.TestExchanges(p2ptest.Exchange{
  347. Label: "Subscribe message",
  348. Expects: []p2ptest.Expect{
  349. { //first expect subscription to the custom stream...
  350. Code: 4,
  351. Msg: &SubscribeMsg{
  352. Stream: stream,
  353. History: NewRange(5, 8),
  354. Priority: Top,
  355. },
  356. Peer: node.ID(),
  357. },
  358. },
  359. },
  360. p2ptest.Exchange{
  361. Label: "ChunkDelivery message",
  362. Triggers: []p2ptest.Trigger{
  363. { //...then trigger a chunk delivery for the given chunk from peer in order for
  364. //local node to get the chunk delivered
  365. Code: 6,
  366. Msg: &ChunkDeliveryMsg{
  367. Addr: chunkKey,
  368. SData: chunkData,
  369. },
  370. Peer: node.ID(),
  371. },
  372. },
  373. })
  374. if err != nil {
  375. t.Fatalf("Expected no error, got %v", err)
  376. }
  377. ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
  378. defer cancel()
  379. // wait for the chunk to get stored
  380. storedChunk, err := localStore.Get(ctx, chunkKey)
  381. for err != nil {
  382. select {
  383. case <-ctx.Done():
  384. t.Fatalf("Chunk is not in localstore after timeout, err: %v", err)
  385. default:
  386. }
  387. storedChunk, err = localStore.Get(ctx, chunkKey)
  388. time.Sleep(50 * time.Millisecond)
  389. }
  390. if err != nil {
  391. t.Fatalf("Expected no error, got %v", err)
  392. }
  393. if !bytes.Equal(storedChunk.Data(), chunkData) {
  394. t.Fatal("Retrieved chunk has different data than original")
  395. }
  396. }
  397. func TestDeliveryFromNodes(t *testing.T) {
  398. testDeliveryFromNodes(t, 2, dataChunkCount, true)
  399. testDeliveryFromNodes(t, 2, dataChunkCount, false)
  400. testDeliveryFromNodes(t, 4, dataChunkCount, true)
  401. testDeliveryFromNodes(t, 4, dataChunkCount, false)
  402. testDeliveryFromNodes(t, 8, dataChunkCount, true)
  403. testDeliveryFromNodes(t, 8, dataChunkCount, false)
  404. testDeliveryFromNodes(t, 16, dataChunkCount, true)
  405. testDeliveryFromNodes(t, 16, dataChunkCount, false)
  406. }
  407. func testDeliveryFromNodes(t *testing.T, nodes, chunkCount int, skipCheck bool) {
  408. sim := simulation.New(map[string]simulation.ServiceFunc{
  409. "streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
  410. node := ctx.Config.Node()
  411. addr := network.NewAddr(node)
  412. store, datadir, err := createTestLocalStorageForID(node.ID(), addr)
  413. if err != nil {
  414. return nil, nil, err
  415. }
  416. bucket.Store(bucketKeyStore, store)
  417. cleanup = func() {
  418. os.RemoveAll(datadir)
  419. store.Close()
  420. }
  421. localStore := store.(*storage.LocalStore)
  422. netStore, err := storage.NewNetStore(localStore, nil)
  423. if err != nil {
  424. return nil, nil, err
  425. }
  426. kad := network.NewKademlia(addr.Over(), network.NewKadParams())
  427. delivery := NewDelivery(kad, netStore)
  428. netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New
  429. r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
  430. SkipCheck: skipCheck,
  431. Syncing: SyncingDisabled,
  432. Retrieval: RetrievalEnabled,
  433. }, nil)
  434. bucket.Store(bucketKeyRegistry, r)
  435. fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams())
  436. bucket.Store(bucketKeyFileStore, fileStore)
  437. return r, cleanup, nil
  438. },
  439. })
  440. defer sim.Close()
  441. log.Info("Adding nodes to simulation")
  442. _, err := sim.AddNodesAndConnectChain(nodes)
  443. if err != nil {
  444. t.Fatal(err)
  445. }
  446. log.Info("Starting simulation")
  447. ctx := context.Background()
  448. result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) (err error) {
  449. nodeIDs := sim.UpNodeIDs()
  450. //determine the pivot node to be the first node of the simulation
  451. pivot := nodeIDs[0]
  452. //distribute chunks of a random file into Stores of nodes 1 to nodes
  453. //we will do this by creating a file store with an underlying round-robin store:
  454. //the file store will create a hash for the uploaded file, but every chunk will be
  455. //distributed to different nodes via round-robin scheduling
  456. log.Debug("Writing file to round-robin file store")
  457. //to do this, we create an array for chunkstores (length minus one, the pivot node)
  458. stores := make([]storage.ChunkStore, len(nodeIDs)-1)
  459. //we then need to get all stores from the sim....
  460. lStores := sim.NodesItems(bucketKeyStore)
  461. i := 0
  462. //...iterate the buckets...
  463. for id, bucketVal := range lStores {
  464. //...and remove the one which is the pivot node
  465. if id == pivot {
  466. continue
  467. }
  468. //the other ones are added to the array...
  469. stores[i] = bucketVal.(storage.ChunkStore)
  470. i++
  471. }
  472. //...which then gets passed to the round-robin file store
  473. roundRobinFileStore := storage.NewFileStore(newRoundRobinStore(stores...), storage.NewFileStoreParams())
  474. //now we can actually upload a (random) file to the round-robin store
  475. size := chunkCount * chunkSize
  476. log.Debug("Storing data to file store")
  477. fileHash, wait, err := roundRobinFileStore.Store(ctx, testutil.RandomReader(1, size), int64(size), false)
  478. // wait until all chunks stored
  479. if err != nil {
  480. return err
  481. }
  482. err = wait(ctx)
  483. if err != nil {
  484. return err
  485. }
  486. log.Debug("Waiting for kademlia")
  487. // TODO this does not seem to be correct usage of the function, as the simulation may have no kademlias
  488. if _, err := sim.WaitTillHealthy(ctx); err != nil {
  489. return err
  490. }
  491. //get the pivot node's filestore
  492. item, ok := sim.NodeItem(pivot, bucketKeyFileStore)
  493. if !ok {
  494. return fmt.Errorf("No filestore")
  495. }
  496. pivotFileStore := item.(*storage.FileStore)
  497. log.Debug("Starting retrieval routine")
  498. retErrC := make(chan error)
  499. go func() {
  500. // start the retrieval on the pivot node - this will spawn retrieve requests for missing chunks
  501. // we must wait for the peer connections to have started before requesting
  502. n, err := readAll(pivotFileStore, fileHash)
  503. log.Info(fmt.Sprintf("retrieved %v", fileHash), "read", n, "err", err)
  504. retErrC <- err
  505. }()
  506. log.Debug("Watching for disconnections")
  507. disconnections := sim.PeerEvents(
  508. context.Background(),
  509. sim.NodeIDs(),
  510. simulation.NewPeerEventsFilter().Drop(),
  511. )
  512. var disconnected atomic.Value
  513. go func() {
  514. for d := range disconnections {
  515. if d.Error != nil {
  516. log.Error("peer drop", "node", d.NodeID, "peer", d.PeerID)
  517. disconnected.Store(true)
  518. }
  519. }
  520. }()
  521. defer func() {
  522. if err != nil {
  523. if yes, ok := disconnected.Load().(bool); ok && yes {
  524. err = errors.New("disconnect events received")
  525. }
  526. }
  527. }()
  528. //finally check that the pivot node gets all chunks via the root hash
  529. log.Debug("Check retrieval")
  530. success := true
  531. var total int64
  532. total, err = readAll(pivotFileStore, fileHash)
  533. if err != nil {
  534. return err
  535. }
  536. log.Info(fmt.Sprintf("check if %08x is available locally: number of bytes read %v/%v (error: %v)", fileHash, total, size, err))
  537. if err != nil || total != int64(size) {
  538. success = false
  539. }
  540. if !success {
  541. return fmt.Errorf("Test failed, chunks not available on all nodes")
  542. }
  543. if err := <-retErrC; err != nil {
  544. t.Fatalf("requesting chunks: %v", err)
  545. }
  546. log.Debug("Test terminated successfully")
  547. return nil
  548. })
  549. if result.Error != nil {
  550. t.Fatal(result.Error)
  551. }
  552. }
  553. func BenchmarkDeliveryFromNodesWithoutCheck(b *testing.B) {
  554. for chunks := 32; chunks <= 128; chunks *= 2 {
  555. for i := 2; i < 32; i *= 2 {
  556. b.Run(
  557. fmt.Sprintf("nodes=%v,chunks=%v", i, chunks),
  558. func(b *testing.B) {
  559. benchmarkDeliveryFromNodes(b, i, chunks, true)
  560. },
  561. )
  562. }
  563. }
  564. }
  565. func BenchmarkDeliveryFromNodesWithCheck(b *testing.B) {
  566. for chunks := 32; chunks <= 128; chunks *= 2 {
  567. for i := 2; i < 32; i *= 2 {
  568. b.Run(
  569. fmt.Sprintf("nodes=%v,chunks=%v", i, chunks),
  570. func(b *testing.B) {
  571. benchmarkDeliveryFromNodes(b, i, chunks, false)
  572. },
  573. )
  574. }
  575. }
  576. }
  577. func benchmarkDeliveryFromNodes(b *testing.B, nodes, chunkCount int, skipCheck bool) {
  578. sim := simulation.New(map[string]simulation.ServiceFunc{
  579. "streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
  580. node := ctx.Config.Node()
  581. addr := network.NewAddr(node)
  582. store, datadir, err := createTestLocalStorageForID(node.ID(), addr)
  583. if err != nil {
  584. return nil, nil, err
  585. }
  586. bucket.Store(bucketKeyStore, store)
  587. cleanup = func() {
  588. os.RemoveAll(datadir)
  589. store.Close()
  590. }
  591. localStore := store.(*storage.LocalStore)
  592. netStore, err := storage.NewNetStore(localStore, nil)
  593. if err != nil {
  594. return nil, nil, err
  595. }
  596. kad := network.NewKademlia(addr.Over(), network.NewKadParams())
  597. delivery := NewDelivery(kad, netStore)
  598. netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New
  599. r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
  600. SkipCheck: skipCheck,
  601. Syncing: SyncingDisabled,
  602. Retrieval: RetrievalDisabled,
  603. SyncUpdateDelay: 0,
  604. }, nil)
  605. fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams())
  606. bucket.Store(bucketKeyFileStore, fileStore)
  607. return r, cleanup, nil
  608. },
  609. })
  610. defer sim.Close()
  611. log.Info("Initializing test config")
  612. _, err := sim.AddNodesAndConnectChain(nodes)
  613. if err != nil {
  614. b.Fatal(err)
  615. }
  616. ctx := context.Background()
  617. result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) (err error) {
  618. nodeIDs := sim.UpNodeIDs()
  619. node := nodeIDs[len(nodeIDs)-1]
  620. item, ok := sim.NodeItem(node, bucketKeyFileStore)
  621. if !ok {
  622. b.Fatal("No filestore")
  623. }
  624. remoteFileStore := item.(*storage.FileStore)
  625. pivotNode := nodeIDs[0]
  626. item, ok = sim.NodeItem(pivotNode, bucketKeyNetStore)
  627. if !ok {
  628. b.Fatal("No filestore")
  629. }
  630. netStore := item.(*storage.NetStore)
  631. if _, err := sim.WaitTillHealthy(ctx); err != nil {
  632. return err
  633. }
  634. disconnections := sim.PeerEvents(
  635. context.Background(),
  636. sim.NodeIDs(),
  637. simulation.NewPeerEventsFilter().Drop(),
  638. )
  639. var disconnected atomic.Value
  640. go func() {
  641. for d := range disconnections {
  642. if d.Error != nil {
  643. log.Error("peer drop", "node", d.NodeID, "peer", d.PeerID)
  644. disconnected.Store(true)
  645. }
  646. }
  647. }()
  648. defer func() {
  649. if err != nil {
  650. if yes, ok := disconnected.Load().(bool); ok && yes {
  651. err = errors.New("disconnect events received")
  652. }
  653. }
  654. }()
  655. // benchmark loop
  656. b.ResetTimer()
  657. b.StopTimer()
  658. Loop:
  659. for i := 0; i < b.N; i++ {
  660. // uploading chunkCount random chunks to the last node
  661. hashes := make([]storage.Address, chunkCount)
  662. for i := 0; i < chunkCount; i++ {
  663. // create actual size real chunks
  664. ctx := context.TODO()
  665. hash, wait, err := remoteFileStore.Store(ctx, testutil.RandomReader(i, chunkSize), int64(chunkSize), false)
  666. if err != nil {
  667. b.Fatalf("expected no error. got %v", err)
  668. }
  669. // wait until all chunks stored
  670. err = wait(ctx)
  671. if err != nil {
  672. b.Fatalf("expected no error. got %v", err)
  673. }
  674. // collect the hashes
  675. hashes[i] = hash
  676. }
  677. // now benchmark the actual retrieval
  678. // netstore.Get is called for each hash in a go routine and errors are collected
  679. b.StartTimer()
  680. errs := make(chan error)
  681. for _, hash := range hashes {
  682. go func(h storage.Address) {
  683. _, err := netStore.Get(ctx, h)
  684. log.Warn("test check netstore get", "hash", h, "err", err)
  685. errs <- err
  686. }(hash)
  687. }
  688. // count and report retrieval errors
  689. // if there are misses then chunk timeout is too low for the distance and volume (?)
  690. var total, misses int
  691. for err := range errs {
  692. if err != nil {
  693. log.Warn(err.Error())
  694. misses++
  695. }
  696. total++
  697. if total == chunkCount {
  698. break
  699. }
  700. }
  701. b.StopTimer()
  702. if misses > 0 {
  703. err = fmt.Errorf("%v chunk not found out of %v", misses, total)
  704. break Loop
  705. }
  706. }
  707. if err != nil {
  708. b.Fatal(err)
  709. }
  710. return nil
  711. })
  712. if result.Error != nil {
  713. b.Fatal(result.Error)
  714. }
  715. }