delivery_test.go 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775
  1. // Copyright 2018 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package stream
  17. import (
  18. "bytes"
  19. "context"
  20. "fmt"
  21. "os"
  22. "sync"
  23. "testing"
  24. "time"
  25. "github.com/ethereum/go-ethereum/node"
  26. "github.com/ethereum/go-ethereum/p2p"
  27. "github.com/ethereum/go-ethereum/p2p/enode"
  28. "github.com/ethereum/go-ethereum/p2p/protocols"
  29. "github.com/ethereum/go-ethereum/p2p/simulations/adapters"
  30. p2ptest "github.com/ethereum/go-ethereum/p2p/testing"
  31. "github.com/ethereum/go-ethereum/swarm/log"
  32. "github.com/ethereum/go-ethereum/swarm/network"
  33. pq "github.com/ethereum/go-ethereum/swarm/network/priorityqueue"
  34. "github.com/ethereum/go-ethereum/swarm/network/simulation"
  35. "github.com/ethereum/go-ethereum/swarm/state"
  36. "github.com/ethereum/go-ethereum/swarm/storage"
  37. "github.com/ethereum/go-ethereum/swarm/testutil"
  38. )
  39. //Tests initializing a retrieve request
  40. func TestStreamerRetrieveRequest(t *testing.T) {
  41. regOpts := &RegistryOptions{
  42. Retrieval: RetrievalClientOnly,
  43. Syncing: SyncingDisabled,
  44. }
  45. tester, streamer, _, teardown, err := newStreamerTester(t, regOpts)
  46. defer teardown()
  47. if err != nil {
  48. t.Fatal(err)
  49. }
  50. node := tester.Nodes[0]
  51. ctx := context.Background()
  52. req := network.NewRequest(
  53. storage.Address(hash0[:]),
  54. true,
  55. &sync.Map{},
  56. )
  57. streamer.delivery.RequestFromPeers(ctx, req)
  58. stream := NewStream(swarmChunkServerStreamName, "", true)
  59. err = tester.TestExchanges(p2ptest.Exchange{
  60. Label: "RetrieveRequestMsg",
  61. Expects: []p2ptest.Expect{
  62. { //start expecting a subscription for RETRIEVE_REQUEST due to `RetrievalClientOnly`
  63. Code: 4,
  64. Msg: &SubscribeMsg{
  65. Stream: stream,
  66. History: nil,
  67. Priority: Top,
  68. },
  69. Peer: node.ID(),
  70. },
  71. { //expect a retrieve request message for the given hash
  72. Code: 5,
  73. Msg: &RetrieveRequestMsg{
  74. Addr: hash0[:],
  75. SkipCheck: true,
  76. },
  77. Peer: node.ID(),
  78. },
  79. },
  80. })
  81. if err != nil {
  82. t.Fatalf("Expected no error, got %v", err)
  83. }
  84. }
  85. //Test requesting a chunk from a peer then issuing a "empty" OfferedHashesMsg (no hashes available yet)
  86. //Should time out as the peer does not have the chunk (no syncing happened previously)
  87. func TestStreamerUpstreamRetrieveRequestMsgExchangeWithoutStore(t *testing.T) {
  88. tester, streamer, _, teardown, err := newStreamerTester(t, &RegistryOptions{
  89. Retrieval: RetrievalEnabled,
  90. Syncing: SyncingDisabled, //do no syncing
  91. })
  92. defer teardown()
  93. if err != nil {
  94. t.Fatal(err)
  95. }
  96. node := tester.Nodes[0]
  97. chunk := storage.NewChunk(storage.Address(hash0[:]), nil)
  98. peer := streamer.getPeer(node.ID())
  99. stream := NewStream(swarmChunkServerStreamName, "", true)
  100. //simulate pre-subscription to RETRIEVE_REQUEST stream on peer
  101. peer.handleSubscribeMsg(context.TODO(), &SubscribeMsg{
  102. Stream: stream,
  103. History: nil,
  104. Priority: Top,
  105. })
  106. //test the exchange
  107. err = tester.TestExchanges(p2ptest.Exchange{
  108. Expects: []p2ptest.Expect{
  109. { //first expect a subscription to the RETRIEVE_REQUEST stream
  110. Code: 4,
  111. Msg: &SubscribeMsg{
  112. Stream: stream,
  113. History: nil,
  114. Priority: Top,
  115. },
  116. Peer: node.ID(),
  117. },
  118. },
  119. }, p2ptest.Exchange{
  120. Label: "RetrieveRequestMsg",
  121. Triggers: []p2ptest.Trigger{
  122. { //then the actual RETRIEVE_REQUEST....
  123. Code: 5,
  124. Msg: &RetrieveRequestMsg{
  125. Addr: chunk.Address()[:],
  126. },
  127. Peer: node.ID(),
  128. },
  129. },
  130. Expects: []p2ptest.Expect{
  131. { //to which the peer responds with offered hashes
  132. Code: 1,
  133. Msg: &OfferedHashesMsg{
  134. HandoverProof: nil,
  135. Hashes: nil,
  136. From: 0,
  137. To: 0,
  138. },
  139. Peer: node.ID(),
  140. },
  141. },
  142. })
  143. //should fail with a timeout as the peer we are requesting
  144. //the chunk from does not have the chunk
  145. expectedError := `exchange #1 "RetrieveRequestMsg": timed out`
  146. if err == nil || err.Error() != expectedError {
  147. t.Fatalf("Expected error %v, got %v", expectedError, err)
  148. }
  149. }
  150. // upstream request server receives a retrieve Request and responds with
  151. // offered hashes or delivery if skipHash is set to true
  152. func TestStreamerUpstreamRetrieveRequestMsgExchange(t *testing.T) {
  153. tester, streamer, localStore, teardown, err := newStreamerTester(t, &RegistryOptions{
  154. Retrieval: RetrievalEnabled,
  155. Syncing: SyncingDisabled,
  156. })
  157. defer teardown()
  158. if err != nil {
  159. t.Fatal(err)
  160. }
  161. node := tester.Nodes[0]
  162. peer := streamer.getPeer(node.ID())
  163. stream := NewStream(swarmChunkServerStreamName, "", true)
  164. peer.handleSubscribeMsg(context.TODO(), &SubscribeMsg{
  165. Stream: stream,
  166. History: nil,
  167. Priority: Top,
  168. })
  169. hash := storage.Address(hash0[:])
  170. chunk := storage.NewChunk(hash, hash)
  171. err = localStore.Put(context.TODO(), chunk)
  172. if err != nil {
  173. t.Fatalf("Expected no err got %v", err)
  174. }
  175. err = tester.TestExchanges(p2ptest.Exchange{
  176. Expects: []p2ptest.Expect{
  177. {
  178. Code: 4,
  179. Msg: &SubscribeMsg{
  180. Stream: stream,
  181. History: nil,
  182. Priority: Top,
  183. },
  184. Peer: node.ID(),
  185. },
  186. },
  187. }, p2ptest.Exchange{
  188. Label: "RetrieveRequestMsg",
  189. Triggers: []p2ptest.Trigger{
  190. {
  191. Code: 5,
  192. Msg: &RetrieveRequestMsg{
  193. Addr: hash,
  194. },
  195. Peer: node.ID(),
  196. },
  197. },
  198. Expects: []p2ptest.Expect{
  199. {
  200. Code: 1,
  201. Msg: &OfferedHashesMsg{
  202. HandoverProof: &HandoverProof{
  203. Handover: &Handover{},
  204. },
  205. Hashes: hash,
  206. From: 0,
  207. // TODO: why is this 32???
  208. To: 32,
  209. Stream: stream,
  210. },
  211. Peer: node.ID(),
  212. },
  213. },
  214. })
  215. if err != nil {
  216. t.Fatal(err)
  217. }
  218. hash = storage.Address(hash1[:])
  219. chunk = storage.NewChunk(hash, hash1[:])
  220. err = localStore.Put(context.TODO(), chunk)
  221. if err != nil {
  222. t.Fatalf("Expected no err got %v", err)
  223. }
  224. err = tester.TestExchanges(p2ptest.Exchange{
  225. Label: "RetrieveRequestMsg",
  226. Triggers: []p2ptest.Trigger{
  227. {
  228. Code: 5,
  229. Msg: &RetrieveRequestMsg{
  230. Addr: hash,
  231. SkipCheck: true,
  232. },
  233. Peer: node.ID(),
  234. },
  235. },
  236. Expects: []p2ptest.Expect{
  237. {
  238. Code: 6,
  239. Msg: &ChunkDeliveryMsg{
  240. Addr: hash,
  241. SData: hash,
  242. },
  243. Peer: node.ID(),
  244. },
  245. },
  246. })
  247. if err != nil {
  248. t.Fatal(err)
  249. }
  250. }
  251. // if there is one peer in the Kademlia, RequestFromPeers should return it
  252. func TestRequestFromPeers(t *testing.T) {
  253. dummyPeerID := enode.HexID("3431c3939e1ee2a6345e976a8234f9870152d64879f30bc272a074f6859e75e8")
  254. addr := network.RandomAddr()
  255. to := network.NewKademlia(addr.OAddr, network.NewKadParams())
  256. delivery := NewDelivery(to, nil)
  257. protocolsPeer := protocols.NewPeer(p2p.NewPeer(dummyPeerID, "dummy", nil), nil, nil)
  258. peer := network.NewPeer(&network.BzzPeer{
  259. BzzAddr: network.RandomAddr(),
  260. LightNode: false,
  261. Peer: protocolsPeer,
  262. }, to)
  263. to.On(peer)
  264. r := NewRegistry(addr.ID(), delivery, nil, nil, nil, nil)
  265. // an empty priorityQueue has to be created to prevent a goroutine being called after the test has finished
  266. sp := &Peer{
  267. Peer: protocolsPeer,
  268. pq: pq.New(int(PriorityQueue), PriorityQueueCap),
  269. streamer: r,
  270. }
  271. r.setPeer(sp)
  272. req := network.NewRequest(
  273. storage.Address(hash0[:]),
  274. true,
  275. &sync.Map{},
  276. )
  277. ctx := context.Background()
  278. id, _, err := delivery.RequestFromPeers(ctx, req)
  279. if err != nil {
  280. t.Fatal(err)
  281. }
  282. if *id != dummyPeerID {
  283. t.Fatalf("Expected an id, got %v", id)
  284. }
  285. }
  286. // RequestFromPeers should not return light nodes
  287. func TestRequestFromPeersWithLightNode(t *testing.T) {
  288. dummyPeerID := enode.HexID("3431c3939e1ee2a6345e976a8234f9870152d64879f30bc272a074f6859e75e8")
  289. addr := network.RandomAddr()
  290. to := network.NewKademlia(addr.OAddr, network.NewKadParams())
  291. delivery := NewDelivery(to, nil)
  292. protocolsPeer := protocols.NewPeer(p2p.NewPeer(dummyPeerID, "dummy", nil), nil, nil)
  293. // setting up a lightnode
  294. peer := network.NewPeer(&network.BzzPeer{
  295. BzzAddr: network.RandomAddr(),
  296. LightNode: true,
  297. Peer: protocolsPeer,
  298. }, to)
  299. to.On(peer)
  300. r := NewRegistry(addr.ID(), delivery, nil, nil, nil, nil)
  301. // an empty priorityQueue has to be created to prevent a goroutine being called after the test has finished
  302. sp := &Peer{
  303. Peer: protocolsPeer,
  304. pq: pq.New(int(PriorityQueue), PriorityQueueCap),
  305. streamer: r,
  306. }
  307. r.setPeer(sp)
  308. req := network.NewRequest(
  309. storage.Address(hash0[:]),
  310. true,
  311. &sync.Map{},
  312. )
  313. ctx := context.Background()
  314. // making a request which should return with "no peer found"
  315. _, _, err := delivery.RequestFromPeers(ctx, req)
  316. expectedError := "no peer found"
  317. if err.Error() != expectedError {
  318. t.Fatalf("expected '%v', got %v", expectedError, err)
  319. }
  320. }
  321. func TestStreamerDownstreamChunkDeliveryMsgExchange(t *testing.T) {
  322. tester, streamer, localStore, teardown, err := newStreamerTester(t, &RegistryOptions{
  323. Retrieval: RetrievalDisabled,
  324. Syncing: SyncingDisabled,
  325. })
  326. defer teardown()
  327. if err != nil {
  328. t.Fatal(err)
  329. }
  330. streamer.RegisterClientFunc("foo", func(p *Peer, t string, live bool) (Client, error) {
  331. return &testClient{
  332. t: t,
  333. }, nil
  334. })
  335. node := tester.Nodes[0]
  336. //subscribe to custom stream
  337. stream := NewStream("foo", "", true)
  338. err = streamer.Subscribe(node.ID(), stream, NewRange(5, 8), Top)
  339. if err != nil {
  340. t.Fatalf("Expected no error, got %v", err)
  341. }
  342. chunkKey := hash0[:]
  343. chunkData := hash1[:]
  344. err = tester.TestExchanges(p2ptest.Exchange{
  345. Label: "Subscribe message",
  346. Expects: []p2ptest.Expect{
  347. { //first expect subscription to the custom stream...
  348. Code: 4,
  349. Msg: &SubscribeMsg{
  350. Stream: stream,
  351. History: NewRange(5, 8),
  352. Priority: Top,
  353. },
  354. Peer: node.ID(),
  355. },
  356. },
  357. },
  358. p2ptest.Exchange{
  359. Label: "ChunkDelivery message",
  360. Triggers: []p2ptest.Trigger{
  361. { //...then trigger a chunk delivery for the given chunk from peer in order for
  362. //local node to get the chunk delivered
  363. Code: 6,
  364. Msg: &ChunkDeliveryMsg{
  365. Addr: chunkKey,
  366. SData: chunkData,
  367. },
  368. Peer: node.ID(),
  369. },
  370. },
  371. })
  372. if err != nil {
  373. t.Fatalf("Expected no error, got %v", err)
  374. }
  375. ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
  376. defer cancel()
  377. // wait for the chunk to get stored
  378. storedChunk, err := localStore.Get(ctx, chunkKey)
  379. for err != nil {
  380. select {
  381. case <-ctx.Done():
  382. t.Fatalf("Chunk is not in localstore after timeout, err: %v", err)
  383. default:
  384. }
  385. storedChunk, err = localStore.Get(ctx, chunkKey)
  386. time.Sleep(50 * time.Millisecond)
  387. }
  388. if err != nil {
  389. t.Fatalf("Expected no error, got %v", err)
  390. }
  391. if !bytes.Equal(storedChunk.Data(), chunkData) {
  392. t.Fatal("Retrieved chunk has different data than original")
  393. }
  394. }
  395. func TestDeliveryFromNodes(t *testing.T) {
  396. testDeliveryFromNodes(t, 2, dataChunkCount, true)
  397. testDeliveryFromNodes(t, 2, dataChunkCount, false)
  398. testDeliveryFromNodes(t, 4, dataChunkCount, true)
  399. testDeliveryFromNodes(t, 4, dataChunkCount, false)
  400. testDeliveryFromNodes(t, 8, dataChunkCount, true)
  401. testDeliveryFromNodes(t, 8, dataChunkCount, false)
  402. testDeliveryFromNodes(t, 16, dataChunkCount, true)
  403. testDeliveryFromNodes(t, 16, dataChunkCount, false)
  404. }
  405. func testDeliveryFromNodes(t *testing.T, nodes, chunkCount int, skipCheck bool) {
  406. sim := simulation.New(map[string]simulation.ServiceFunc{
  407. "streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
  408. node := ctx.Config.Node()
  409. addr := network.NewAddr(node)
  410. store, datadir, err := createTestLocalStorageForID(node.ID(), addr)
  411. if err != nil {
  412. return nil, nil, err
  413. }
  414. bucket.Store(bucketKeyStore, store)
  415. cleanup = func() {
  416. os.RemoveAll(datadir)
  417. store.Close()
  418. }
  419. localStore := store.(*storage.LocalStore)
  420. netStore, err := storage.NewNetStore(localStore, nil)
  421. if err != nil {
  422. return nil, nil, err
  423. }
  424. kad := network.NewKademlia(addr.Over(), network.NewKadParams())
  425. delivery := NewDelivery(kad, netStore)
  426. netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New
  427. r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
  428. SkipCheck: skipCheck,
  429. Syncing: SyncingDisabled,
  430. Retrieval: RetrievalEnabled,
  431. }, nil)
  432. bucket.Store(bucketKeyRegistry, r)
  433. fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams())
  434. bucket.Store(bucketKeyFileStore, fileStore)
  435. return r, cleanup, nil
  436. },
  437. })
  438. defer sim.Close()
  439. log.Info("Adding nodes to simulation")
  440. _, err := sim.AddNodesAndConnectChain(nodes)
  441. if err != nil {
  442. t.Fatal(err)
  443. }
  444. log.Info("Starting simulation")
  445. ctx := context.Background()
  446. result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error {
  447. nodeIDs := sim.UpNodeIDs()
  448. //determine the pivot node to be the first node of the simulation
  449. pivot := nodeIDs[0]
  450. //distribute chunks of a random file into Stores of nodes 1 to nodes
  451. //we will do this by creating a file store with an underlying round-robin store:
  452. //the file store will create a hash for the uploaded file, but every chunk will be
  453. //distributed to different nodes via round-robin scheduling
  454. log.Debug("Writing file to round-robin file store")
  455. //to do this, we create an array for chunkstores (length minus one, the pivot node)
  456. stores := make([]storage.ChunkStore, len(nodeIDs)-1)
  457. //we then need to get all stores from the sim....
  458. lStores := sim.NodesItems(bucketKeyStore)
  459. i := 0
  460. //...iterate the buckets...
  461. for id, bucketVal := range lStores {
  462. //...and remove the one which is the pivot node
  463. if id == pivot {
  464. continue
  465. }
  466. //the other ones are added to the array...
  467. stores[i] = bucketVal.(storage.ChunkStore)
  468. i++
  469. }
  470. //...which then gets passed to the round-robin file store
  471. roundRobinFileStore := storage.NewFileStore(newRoundRobinStore(stores...), storage.NewFileStoreParams())
  472. //now we can actually upload a (random) file to the round-robin store
  473. size := chunkCount * chunkSize
  474. log.Debug("Storing data to file store")
  475. fileHash, wait, err := roundRobinFileStore.Store(ctx, testutil.RandomReader(1, size), int64(size), false)
  476. // wait until all chunks stored
  477. if err != nil {
  478. return err
  479. }
  480. err = wait(ctx)
  481. if err != nil {
  482. return err
  483. }
  484. log.Debug("Waiting for kademlia")
  485. // TODO this does not seem to be correct usage of the function, as the simulation may have no kademlias
  486. if _, err := sim.WaitTillHealthy(ctx, 2); err != nil {
  487. return err
  488. }
  489. //get the pivot node's filestore
  490. item, ok := sim.NodeItem(pivot, bucketKeyFileStore)
  491. if !ok {
  492. return fmt.Errorf("No filestore")
  493. }
  494. pivotFileStore := item.(*storage.FileStore)
  495. log.Debug("Starting retrieval routine")
  496. go func() {
  497. // start the retrieval on the pivot node - this will spawn retrieve requests for missing chunks
  498. // we must wait for the peer connections to have started before requesting
  499. n, err := readAll(pivotFileStore, fileHash)
  500. log.Info(fmt.Sprintf("retrieved %v", fileHash), "read", n, "err", err)
  501. if err != nil {
  502. t.Fatalf("requesting chunks action error: %v", err)
  503. }
  504. }()
  505. log.Debug("Watching for disconnections")
  506. disconnections := sim.PeerEvents(
  507. context.Background(),
  508. sim.NodeIDs(),
  509. simulation.NewPeerEventsFilter().Drop(),
  510. )
  511. go func() {
  512. for d := range disconnections {
  513. if d.Error != nil {
  514. log.Error("peer drop", "node", d.NodeID, "peer", d.PeerID)
  515. t.Fatal(d.Error)
  516. }
  517. }
  518. }()
  519. //finally check that the pivot node gets all chunks via the root hash
  520. log.Debug("Check retrieval")
  521. success := true
  522. var total int64
  523. total, err = readAll(pivotFileStore, fileHash)
  524. if err != nil {
  525. return err
  526. }
  527. log.Info(fmt.Sprintf("check if %08x is available locally: number of bytes read %v/%v (error: %v)", fileHash, total, size, err))
  528. if err != nil || total != int64(size) {
  529. success = false
  530. }
  531. if !success {
  532. return fmt.Errorf("Test failed, chunks not available on all nodes")
  533. }
  534. log.Debug("Test terminated successfully")
  535. return nil
  536. })
  537. if result.Error != nil {
  538. t.Fatal(result.Error)
  539. }
  540. }
  541. func BenchmarkDeliveryFromNodesWithoutCheck(b *testing.B) {
  542. for chunks := 32; chunks <= 128; chunks *= 2 {
  543. for i := 2; i < 32; i *= 2 {
  544. b.Run(
  545. fmt.Sprintf("nodes=%v,chunks=%v", i, chunks),
  546. func(b *testing.B) {
  547. benchmarkDeliveryFromNodes(b, i, chunks, true)
  548. },
  549. )
  550. }
  551. }
  552. }
  553. func BenchmarkDeliveryFromNodesWithCheck(b *testing.B) {
  554. for chunks := 32; chunks <= 128; chunks *= 2 {
  555. for i := 2; i < 32; i *= 2 {
  556. b.Run(
  557. fmt.Sprintf("nodes=%v,chunks=%v", i, chunks),
  558. func(b *testing.B) {
  559. benchmarkDeliveryFromNodes(b, i, chunks, false)
  560. },
  561. )
  562. }
  563. }
  564. }
  565. func benchmarkDeliveryFromNodes(b *testing.B, nodes, chunkCount int, skipCheck bool) {
  566. sim := simulation.New(map[string]simulation.ServiceFunc{
  567. "streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
  568. node := ctx.Config.Node()
  569. addr := network.NewAddr(node)
  570. store, datadir, err := createTestLocalStorageForID(node.ID(), addr)
  571. if err != nil {
  572. return nil, nil, err
  573. }
  574. bucket.Store(bucketKeyStore, store)
  575. cleanup = func() {
  576. os.RemoveAll(datadir)
  577. store.Close()
  578. }
  579. localStore := store.(*storage.LocalStore)
  580. netStore, err := storage.NewNetStore(localStore, nil)
  581. if err != nil {
  582. return nil, nil, err
  583. }
  584. kad := network.NewKademlia(addr.Over(), network.NewKadParams())
  585. delivery := NewDelivery(kad, netStore)
  586. netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New
  587. r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
  588. SkipCheck: skipCheck,
  589. Syncing: SyncingDisabled,
  590. Retrieval: RetrievalDisabled,
  591. SyncUpdateDelay: 0,
  592. }, nil)
  593. fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams())
  594. bucket.Store(bucketKeyFileStore, fileStore)
  595. return r, cleanup, nil
  596. },
  597. })
  598. defer sim.Close()
  599. log.Info("Initializing test config")
  600. _, err := sim.AddNodesAndConnectChain(nodes)
  601. if err != nil {
  602. b.Fatal(err)
  603. }
  604. ctx := context.Background()
  605. result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error {
  606. nodeIDs := sim.UpNodeIDs()
  607. node := nodeIDs[len(nodeIDs)-1]
  608. item, ok := sim.NodeItem(node, bucketKeyFileStore)
  609. if !ok {
  610. b.Fatal("No filestore")
  611. }
  612. remoteFileStore := item.(*storage.FileStore)
  613. pivotNode := nodeIDs[0]
  614. item, ok = sim.NodeItem(pivotNode, bucketKeyNetStore)
  615. if !ok {
  616. b.Fatal("No filestore")
  617. }
  618. netStore := item.(*storage.NetStore)
  619. if _, err := sim.WaitTillHealthy(ctx, 2); err != nil {
  620. return err
  621. }
  622. disconnections := sim.PeerEvents(
  623. context.Background(),
  624. sim.NodeIDs(),
  625. simulation.NewPeerEventsFilter().Drop(),
  626. )
  627. go func() {
  628. for d := range disconnections {
  629. if d.Error != nil {
  630. log.Error("peer drop", "node", d.NodeID, "peer", d.PeerID)
  631. b.Fatal(d.Error)
  632. }
  633. }
  634. }()
  635. // benchmark loop
  636. b.ResetTimer()
  637. b.StopTimer()
  638. Loop:
  639. for i := 0; i < b.N; i++ {
  640. // uploading chunkCount random chunks to the last node
  641. hashes := make([]storage.Address, chunkCount)
  642. for i := 0; i < chunkCount; i++ {
  643. // create actual size real chunks
  644. ctx := context.TODO()
  645. hash, wait, err := remoteFileStore.Store(ctx, testutil.RandomReader(i, chunkSize), int64(chunkSize), false)
  646. if err != nil {
  647. b.Fatalf("expected no error. got %v", err)
  648. }
  649. // wait until all chunks stored
  650. err = wait(ctx)
  651. if err != nil {
  652. b.Fatalf("expected no error. got %v", err)
  653. }
  654. // collect the hashes
  655. hashes[i] = hash
  656. }
  657. // now benchmark the actual retrieval
  658. // netstore.Get is called for each hash in a go routine and errors are collected
  659. b.StartTimer()
  660. errs := make(chan error)
  661. for _, hash := range hashes {
  662. go func(h storage.Address) {
  663. _, err := netStore.Get(ctx, h)
  664. log.Warn("test check netstore get", "hash", h, "err", err)
  665. errs <- err
  666. }(hash)
  667. }
  668. // count and report retrieval errors
  669. // if there are misses then chunk timeout is too low for the distance and volume (?)
  670. var total, misses int
  671. for err := range errs {
  672. if err != nil {
  673. log.Warn(err.Error())
  674. misses++
  675. }
  676. total++
  677. if total == chunkCount {
  678. break
  679. }
  680. }
  681. b.StopTimer()
  682. if misses > 0 {
  683. err = fmt.Errorf("%v chunk not found out of %v", misses, total)
  684. break Loop
  685. }
  686. }
  687. if err != nil {
  688. b.Fatal(err)
  689. }
  690. return nil
  691. })
  692. if result.Error != nil {
  693. b.Fatal(result.Error)
  694. }
  695. }