streamer_test.go 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116
  1. // Copyright 2018 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package stream
  17. import (
  18. "bytes"
  19. "context"
  20. "errors"
  21. "fmt"
  22. "strconv"
  23. "testing"
  24. "time"
  25. "github.com/ethereum/go-ethereum/common"
  26. "github.com/ethereum/go-ethereum/log"
  27. "github.com/ethereum/go-ethereum/p2p/enode"
  28. p2ptest "github.com/ethereum/go-ethereum/p2p/testing"
  29. "github.com/ethereum/go-ethereum/swarm/network"
  30. "golang.org/x/crypto/sha3"
  31. )
  32. func TestStreamerSubscribe(t *testing.T) {
  33. tester, streamer, _, teardown, err := newStreamerTester(nil)
  34. defer teardown()
  35. if err != nil {
  36. t.Fatal(err)
  37. }
  38. stream := NewStream("foo", "", true)
  39. err = streamer.Subscribe(tester.Nodes[0].ID(), stream, NewRange(0, 0), Top)
  40. if err == nil || err.Error() != "stream foo not registered" {
  41. t.Fatalf("Expected error %v, got %v", "stream foo not registered", err)
  42. }
  43. }
  44. func TestStreamerRequestSubscription(t *testing.T) {
  45. tester, streamer, _, teardown, err := newStreamerTester(nil)
  46. defer teardown()
  47. if err != nil {
  48. t.Fatal(err)
  49. }
  50. stream := NewStream("foo", "", false)
  51. err = streamer.RequestSubscription(tester.Nodes[0].ID(), stream, &Range{}, Top)
  52. if err == nil || err.Error() != "stream foo not registered" {
  53. t.Fatalf("Expected error %v, got %v", "stream foo not registered", err)
  54. }
  55. }
  56. var (
  57. hash0 = sha3.Sum256([]byte{0})
  58. hash1 = sha3.Sum256([]byte{1})
  59. hash2 = sha3.Sum256([]byte{2})
  60. hashesTmp = append(hash0[:], hash1[:]...)
  61. hashes = append(hashesTmp, hash2[:]...)
  62. corruptHashes = append(hashes[:40])
  63. )
  64. type testClient struct {
  65. t string
  66. wait0 chan bool
  67. wait2 chan bool
  68. batchDone chan bool
  69. receivedHashes map[string][]byte
  70. }
  71. func newTestClient(t string) *testClient {
  72. return &testClient{
  73. t: t,
  74. wait0: make(chan bool),
  75. wait2: make(chan bool),
  76. batchDone: make(chan bool),
  77. receivedHashes: make(map[string][]byte),
  78. }
  79. }
  80. func (self *testClient) NeedData(ctx context.Context, hash []byte) func(context.Context) error {
  81. self.receivedHashes[string(hash)] = hash
  82. if bytes.Equal(hash, hash0[:]) {
  83. return func(context.Context) error {
  84. <-self.wait0
  85. return nil
  86. }
  87. } else if bytes.Equal(hash, hash2[:]) {
  88. return func(context.Context) error {
  89. <-self.wait2
  90. return nil
  91. }
  92. }
  93. return nil
  94. }
  95. func (self *testClient) BatchDone(Stream, uint64, []byte, []byte) func() (*TakeoverProof, error) {
  96. close(self.batchDone)
  97. return nil
  98. }
  99. func (self *testClient) Close() {}
  100. type testServer struct {
  101. t string
  102. sessionIndex uint64
  103. }
  104. func newTestServer(t string, sessionIndex uint64) *testServer {
  105. return &testServer{
  106. t: t,
  107. sessionIndex: sessionIndex,
  108. }
  109. }
  110. func (s *testServer) SessionIndex() (uint64, error) {
  111. return s.sessionIndex, nil
  112. }
  113. func (self *testServer) SetNextBatch(from uint64, to uint64) ([]byte, uint64, uint64, *HandoverProof, error) {
  114. return make([]byte, HashSize), from + 1, to + 1, nil, nil
  115. }
  116. func (self *testServer) GetData(context.Context, []byte) ([]byte, error) {
  117. return nil, nil
  118. }
  119. func (self *testServer) Close() {
  120. }
  121. func TestStreamerDownstreamSubscribeUnsubscribeMsgExchange(t *testing.T) {
  122. tester, streamer, _, teardown, err := newStreamerTester(nil)
  123. defer teardown()
  124. if err != nil {
  125. t.Fatal(err)
  126. }
  127. streamer.RegisterClientFunc("foo", func(p *Peer, t string, live bool) (Client, error) {
  128. return newTestClient(t), nil
  129. })
  130. node := tester.Nodes[0]
  131. stream := NewStream("foo", "", true)
  132. err = streamer.Subscribe(node.ID(), stream, NewRange(5, 8), Top)
  133. if err != nil {
  134. t.Fatalf("Expected no error, got %v", err)
  135. }
  136. err = tester.TestExchanges(
  137. p2ptest.Exchange{
  138. Label: "Subscribe message",
  139. Expects: []p2ptest.Expect{
  140. {
  141. Code: 4,
  142. Msg: &SubscribeMsg{
  143. Stream: stream,
  144. History: NewRange(5, 8),
  145. Priority: Top,
  146. },
  147. Peer: node.ID(),
  148. },
  149. },
  150. },
  151. // trigger OfferedHashesMsg to actually create the client
  152. p2ptest.Exchange{
  153. Label: "OfferedHashes message",
  154. Triggers: []p2ptest.Trigger{
  155. {
  156. Code: 1,
  157. Msg: &OfferedHashesMsg{
  158. HandoverProof: &HandoverProof{
  159. Handover: &Handover{},
  160. },
  161. Hashes: hashes,
  162. From: 5,
  163. To: 8,
  164. Stream: stream,
  165. },
  166. Peer: node.ID(),
  167. },
  168. },
  169. Expects: []p2ptest.Expect{
  170. {
  171. Code: 2,
  172. Msg: &WantedHashesMsg{
  173. Stream: stream,
  174. Want: []byte{5},
  175. From: 9,
  176. To: 0,
  177. },
  178. Peer: node.ID(),
  179. },
  180. },
  181. },
  182. )
  183. if err != nil {
  184. t.Fatal(err)
  185. }
  186. err = streamer.Unsubscribe(node.ID(), stream)
  187. if err != nil {
  188. t.Fatalf("Expected no error, got %v", err)
  189. }
  190. err = tester.TestExchanges(p2ptest.Exchange{
  191. Label: "Unsubscribe message",
  192. Expects: []p2ptest.Expect{
  193. {
  194. Code: 0,
  195. Msg: &UnsubscribeMsg{
  196. Stream: stream,
  197. },
  198. Peer: node.ID(),
  199. },
  200. },
  201. })
  202. if err != nil {
  203. t.Fatal(err)
  204. }
  205. }
  206. func TestStreamerUpstreamSubscribeUnsubscribeMsgExchange(t *testing.T) {
  207. tester, streamer, _, teardown, err := newStreamerTester(nil)
  208. defer teardown()
  209. if err != nil {
  210. t.Fatal(err)
  211. }
  212. stream := NewStream("foo", "", false)
  213. streamer.RegisterServerFunc("foo", func(p *Peer, t string, live bool) (Server, error) {
  214. return newTestServer(t, 10), nil
  215. })
  216. node := tester.Nodes[0]
  217. err = tester.TestExchanges(p2ptest.Exchange{
  218. Label: "Subscribe message",
  219. Triggers: []p2ptest.Trigger{
  220. {
  221. Code: 4,
  222. Msg: &SubscribeMsg{
  223. Stream: stream,
  224. History: NewRange(5, 8),
  225. Priority: Top,
  226. },
  227. Peer: node.ID(),
  228. },
  229. },
  230. Expects: []p2ptest.Expect{
  231. {
  232. Code: 1,
  233. Msg: &OfferedHashesMsg{
  234. Stream: stream,
  235. HandoverProof: &HandoverProof{
  236. Handover: &Handover{},
  237. },
  238. Hashes: make([]byte, HashSize),
  239. From: 6,
  240. To: 9,
  241. },
  242. Peer: node.ID(),
  243. },
  244. },
  245. })
  246. if err != nil {
  247. t.Fatal(err)
  248. }
  249. err = tester.TestExchanges(p2ptest.Exchange{
  250. Label: "unsubscribe message",
  251. Triggers: []p2ptest.Trigger{
  252. {
  253. Code: 0,
  254. Msg: &UnsubscribeMsg{
  255. Stream: stream,
  256. },
  257. Peer: node.ID(),
  258. },
  259. },
  260. })
  261. if err != nil {
  262. t.Fatal(err)
  263. }
  264. }
  265. func TestStreamerUpstreamSubscribeUnsubscribeMsgExchangeLive(t *testing.T) {
  266. tester, streamer, _, teardown, err := newStreamerTester(nil)
  267. defer teardown()
  268. if err != nil {
  269. t.Fatal(err)
  270. }
  271. stream := NewStream("foo", "", true)
  272. streamer.RegisterServerFunc("foo", func(p *Peer, t string, live bool) (Server, error) {
  273. return newTestServer(t, 0), nil
  274. })
  275. node := tester.Nodes[0]
  276. err = tester.TestExchanges(p2ptest.Exchange{
  277. Label: "Subscribe message",
  278. Triggers: []p2ptest.Trigger{
  279. {
  280. Code: 4,
  281. Msg: &SubscribeMsg{
  282. Stream: stream,
  283. Priority: Top,
  284. },
  285. Peer: node.ID(),
  286. },
  287. },
  288. Expects: []p2ptest.Expect{
  289. {
  290. Code: 1,
  291. Msg: &OfferedHashesMsg{
  292. Stream: stream,
  293. HandoverProof: &HandoverProof{
  294. Handover: &Handover{},
  295. },
  296. Hashes: make([]byte, HashSize),
  297. From: 1,
  298. To: 0,
  299. },
  300. Peer: node.ID(),
  301. },
  302. },
  303. })
  304. if err != nil {
  305. t.Fatal(err)
  306. }
  307. err = tester.TestExchanges(p2ptest.Exchange{
  308. Label: "unsubscribe message",
  309. Triggers: []p2ptest.Trigger{
  310. {
  311. Code: 0,
  312. Msg: &UnsubscribeMsg{
  313. Stream: stream,
  314. },
  315. Peer: node.ID(),
  316. },
  317. },
  318. })
  319. if err != nil {
  320. t.Fatal(err)
  321. }
  322. }
  323. func TestStreamerUpstreamSubscribeErrorMsgExchange(t *testing.T) {
  324. tester, streamer, _, teardown, err := newStreamerTester(nil)
  325. defer teardown()
  326. if err != nil {
  327. t.Fatal(err)
  328. }
  329. streamer.RegisterServerFunc("foo", func(p *Peer, t string, live bool) (Server, error) {
  330. return newTestServer(t, 0), nil
  331. })
  332. stream := NewStream("bar", "", true)
  333. node := tester.Nodes[0]
  334. err = tester.TestExchanges(p2ptest.Exchange{
  335. Label: "Subscribe message",
  336. Triggers: []p2ptest.Trigger{
  337. {
  338. Code: 4,
  339. Msg: &SubscribeMsg{
  340. Stream: stream,
  341. History: NewRange(5, 8),
  342. Priority: Top,
  343. },
  344. Peer: node.ID(),
  345. },
  346. },
  347. Expects: []p2ptest.Expect{
  348. {
  349. Code: 7,
  350. Msg: &SubscribeErrorMsg{
  351. Error: "stream bar not registered",
  352. },
  353. Peer: node.ID(),
  354. },
  355. },
  356. })
  357. if err != nil {
  358. t.Fatal(err)
  359. }
  360. }
  361. func TestStreamerUpstreamSubscribeLiveAndHistory(t *testing.T) {
  362. tester, streamer, _, teardown, err := newStreamerTester(nil)
  363. defer teardown()
  364. if err != nil {
  365. t.Fatal(err)
  366. }
  367. stream := NewStream("foo", "", true)
  368. streamer.RegisterServerFunc("foo", func(p *Peer, t string, live bool) (Server, error) {
  369. return newTestServer(t, 10), nil
  370. })
  371. node := tester.Nodes[0]
  372. err = tester.TestExchanges(p2ptest.Exchange{
  373. Label: "Subscribe message",
  374. Triggers: []p2ptest.Trigger{
  375. {
  376. Code: 4,
  377. Msg: &SubscribeMsg{
  378. Stream: stream,
  379. History: NewRange(5, 8),
  380. Priority: Top,
  381. },
  382. Peer: node.ID(),
  383. },
  384. },
  385. Expects: []p2ptest.Expect{
  386. {
  387. Code: 1,
  388. Msg: &OfferedHashesMsg{
  389. Stream: NewStream("foo", "", false),
  390. HandoverProof: &HandoverProof{
  391. Handover: &Handover{},
  392. },
  393. Hashes: make([]byte, HashSize),
  394. From: 6,
  395. To: 9,
  396. },
  397. Peer: node.ID(),
  398. },
  399. {
  400. Code: 1,
  401. Msg: &OfferedHashesMsg{
  402. Stream: stream,
  403. HandoverProof: &HandoverProof{
  404. Handover: &Handover{},
  405. },
  406. From: 11,
  407. To: 0,
  408. Hashes: make([]byte, HashSize),
  409. },
  410. Peer: node.ID(),
  411. },
  412. },
  413. })
  414. if err != nil {
  415. t.Fatal(err)
  416. }
  417. }
  418. func TestStreamerDownstreamCorruptHashesMsgExchange(t *testing.T) {
  419. tester, streamer, _, teardown, err := newStreamerTester(nil)
  420. defer teardown()
  421. if err != nil {
  422. t.Fatal(err)
  423. }
  424. stream := NewStream("foo", "", true)
  425. var tc *testClient
  426. streamer.RegisterClientFunc("foo", func(p *Peer, t string, live bool) (Client, error) {
  427. tc = newTestClient(t)
  428. return tc, nil
  429. })
  430. node := tester.Nodes[0]
  431. err = streamer.Subscribe(node.ID(), stream, NewRange(5, 8), Top)
  432. if err != nil {
  433. t.Fatalf("Expected no error, got %v", err)
  434. }
  435. err = tester.TestExchanges(p2ptest.Exchange{
  436. Label: "Subscribe message",
  437. Expects: []p2ptest.Expect{
  438. {
  439. Code: 4,
  440. Msg: &SubscribeMsg{
  441. Stream: stream,
  442. History: NewRange(5, 8),
  443. Priority: Top,
  444. },
  445. Peer: node.ID(),
  446. },
  447. },
  448. },
  449. p2ptest.Exchange{
  450. Label: "Corrupt offered hash message",
  451. Triggers: []p2ptest.Trigger{
  452. {
  453. Code: 1,
  454. Msg: &OfferedHashesMsg{
  455. HandoverProof: &HandoverProof{
  456. Handover: &Handover{},
  457. },
  458. Hashes: corruptHashes,
  459. From: 5,
  460. To: 8,
  461. Stream: stream,
  462. },
  463. Peer: node.ID(),
  464. },
  465. },
  466. })
  467. if err != nil {
  468. t.Fatal(err)
  469. }
  470. expectedError := errors.New("Message handler error: (msg code 1): error invalid hashes length (len: 40)")
  471. if err := tester.TestDisconnected(&p2ptest.Disconnect{Peer: node.ID(), Error: expectedError}); err != nil {
  472. t.Fatal(err)
  473. }
  474. }
  475. func TestStreamerDownstreamOfferedHashesMsgExchange(t *testing.T) {
  476. tester, streamer, _, teardown, err := newStreamerTester(nil)
  477. defer teardown()
  478. if err != nil {
  479. t.Fatal(err)
  480. }
  481. stream := NewStream("foo", "", true)
  482. var tc *testClient
  483. streamer.RegisterClientFunc("foo", func(p *Peer, t string, live bool) (Client, error) {
  484. tc = newTestClient(t)
  485. return tc, nil
  486. })
  487. node := tester.Nodes[0]
  488. err = streamer.Subscribe(node.ID(), stream, NewRange(5, 8), Top)
  489. if err != nil {
  490. t.Fatalf("Expected no error, got %v", err)
  491. }
  492. err = tester.TestExchanges(p2ptest.Exchange{
  493. Label: "Subscribe message",
  494. Expects: []p2ptest.Expect{
  495. {
  496. Code: 4,
  497. Msg: &SubscribeMsg{
  498. Stream: stream,
  499. History: NewRange(5, 8),
  500. Priority: Top,
  501. },
  502. Peer: node.ID(),
  503. },
  504. },
  505. },
  506. p2ptest.Exchange{
  507. Label: "WantedHashes message",
  508. Triggers: []p2ptest.Trigger{
  509. {
  510. Code: 1,
  511. Msg: &OfferedHashesMsg{
  512. HandoverProof: &HandoverProof{
  513. Handover: &Handover{},
  514. },
  515. Hashes: hashes,
  516. From: 5,
  517. To: 8,
  518. Stream: stream,
  519. },
  520. Peer: node.ID(),
  521. },
  522. },
  523. Expects: []p2ptest.Expect{
  524. {
  525. Code: 2,
  526. Msg: &WantedHashesMsg{
  527. Stream: stream,
  528. Want: []byte{5},
  529. From: 9,
  530. To: 0,
  531. },
  532. Peer: node.ID(),
  533. },
  534. },
  535. })
  536. if err != nil {
  537. t.Fatal(err)
  538. }
  539. if len(tc.receivedHashes) != 3 {
  540. t.Fatalf("Expected number of received hashes %v, got %v", 3, len(tc.receivedHashes))
  541. }
  542. close(tc.wait0)
  543. timeout := time.NewTimer(100 * time.Millisecond)
  544. defer timeout.Stop()
  545. select {
  546. case <-tc.batchDone:
  547. t.Fatal("batch done early")
  548. case <-timeout.C:
  549. }
  550. close(tc.wait2)
  551. timeout2 := time.NewTimer(10000 * time.Millisecond)
  552. defer timeout2.Stop()
  553. select {
  554. case <-tc.batchDone:
  555. case <-timeout2.C:
  556. t.Fatal("timeout waiting batchdone call")
  557. }
  558. }
  559. func TestStreamerRequestSubscriptionQuitMsgExchange(t *testing.T) {
  560. tester, streamer, _, teardown, err := newStreamerTester(nil)
  561. defer teardown()
  562. if err != nil {
  563. t.Fatal(err)
  564. }
  565. streamer.RegisterServerFunc("foo", func(p *Peer, t string, live bool) (Server, error) {
  566. return newTestServer(t, 10), nil
  567. })
  568. node := tester.Nodes[0]
  569. stream := NewStream("foo", "", true)
  570. err = streamer.RequestSubscription(node.ID(), stream, NewRange(5, 8), Top)
  571. if err != nil {
  572. t.Fatalf("Expected no error, got %v", err)
  573. }
  574. err = tester.TestExchanges(
  575. p2ptest.Exchange{
  576. Label: "RequestSubscription message",
  577. Expects: []p2ptest.Expect{
  578. {
  579. Code: 8,
  580. Msg: &RequestSubscriptionMsg{
  581. Stream: stream,
  582. History: NewRange(5, 8),
  583. Priority: Top,
  584. },
  585. Peer: node.ID(),
  586. },
  587. },
  588. },
  589. p2ptest.Exchange{
  590. Label: "Subscribe message",
  591. Triggers: []p2ptest.Trigger{
  592. {
  593. Code: 4,
  594. Msg: &SubscribeMsg{
  595. Stream: stream,
  596. History: NewRange(5, 8),
  597. Priority: Top,
  598. },
  599. Peer: node.ID(),
  600. },
  601. },
  602. Expects: []p2ptest.Expect{
  603. {
  604. Code: 1,
  605. Msg: &OfferedHashesMsg{
  606. Stream: NewStream("foo", "", false),
  607. HandoverProof: &HandoverProof{
  608. Handover: &Handover{},
  609. },
  610. Hashes: make([]byte, HashSize),
  611. From: 6,
  612. To: 9,
  613. },
  614. Peer: node.ID(),
  615. },
  616. {
  617. Code: 1,
  618. Msg: &OfferedHashesMsg{
  619. Stream: stream,
  620. HandoverProof: &HandoverProof{
  621. Handover: &Handover{},
  622. },
  623. From: 11,
  624. To: 0,
  625. Hashes: make([]byte, HashSize),
  626. },
  627. Peer: node.ID(),
  628. },
  629. },
  630. },
  631. )
  632. if err != nil {
  633. t.Fatal(err)
  634. }
  635. err = streamer.Quit(node.ID(), stream)
  636. if err != nil {
  637. t.Fatalf("Expected no error, got %v", err)
  638. }
  639. err = tester.TestExchanges(p2ptest.Exchange{
  640. Label: "Quit message",
  641. Expects: []p2ptest.Expect{
  642. {
  643. Code: 9,
  644. Msg: &QuitMsg{
  645. Stream: stream,
  646. },
  647. Peer: node.ID(),
  648. },
  649. },
  650. })
  651. if err != nil {
  652. t.Fatal(err)
  653. }
  654. historyStream := getHistoryStream(stream)
  655. err = streamer.Quit(node.ID(), historyStream)
  656. if err != nil {
  657. t.Fatalf("Expected no error, got %v", err)
  658. }
  659. err = tester.TestExchanges(p2ptest.Exchange{
  660. Label: "Quit message",
  661. Expects: []p2ptest.Expect{
  662. {
  663. Code: 9,
  664. Msg: &QuitMsg{
  665. Stream: historyStream,
  666. },
  667. Peer: node.ID(),
  668. },
  669. },
  670. })
  671. if err != nil {
  672. t.Fatal(err)
  673. }
  674. }
  675. // TestMaxPeerServersWithUnsubscribe creates a registry with a limited
  676. // number of stream servers, and performs a test with subscriptions and
  677. // unsubscriptions, checking if unsubscriptions will remove streams,
  678. // leaving place for new streams.
  679. func TestMaxPeerServersWithUnsubscribe(t *testing.T) {
  680. var maxPeerServers = 6
  681. tester, streamer, _, teardown, err := newStreamerTester(&RegistryOptions{
  682. Retrieval: RetrievalDisabled,
  683. Syncing: SyncingDisabled,
  684. MaxPeerServers: maxPeerServers,
  685. })
  686. defer teardown()
  687. if err != nil {
  688. t.Fatal(err)
  689. }
  690. streamer.RegisterServerFunc("foo", func(p *Peer, t string, live bool) (Server, error) {
  691. return newTestServer(t, 0), nil
  692. })
  693. node := tester.Nodes[0]
  694. for i := 0; i < maxPeerServers+10; i++ {
  695. stream := NewStream("foo", strconv.Itoa(i), true)
  696. err = tester.TestExchanges(p2ptest.Exchange{
  697. Label: "Subscribe message",
  698. Triggers: []p2ptest.Trigger{
  699. {
  700. Code: 4,
  701. Msg: &SubscribeMsg{
  702. Stream: stream,
  703. Priority: Top,
  704. },
  705. Peer: node.ID(),
  706. },
  707. },
  708. Expects: []p2ptest.Expect{
  709. {
  710. Code: 1,
  711. Msg: &OfferedHashesMsg{
  712. Stream: stream,
  713. HandoverProof: &HandoverProof{
  714. Handover: &Handover{},
  715. },
  716. Hashes: make([]byte, HashSize),
  717. From: 1,
  718. To: 0,
  719. },
  720. Peer: node.ID(),
  721. },
  722. },
  723. })
  724. if err != nil {
  725. t.Fatal(err)
  726. }
  727. err = tester.TestExchanges(p2ptest.Exchange{
  728. Label: "unsubscribe message",
  729. Triggers: []p2ptest.Trigger{
  730. {
  731. Code: 0,
  732. Msg: &UnsubscribeMsg{
  733. Stream: stream,
  734. },
  735. Peer: node.ID(),
  736. },
  737. },
  738. })
  739. if err != nil {
  740. t.Fatal(err)
  741. }
  742. }
  743. }
  744. // TestMaxPeerServersWithoutUnsubscribe creates a registry with a limited
  745. // number of stream servers, and performs subscriptions to detect subscriptions
  746. // error message exchange.
  747. func TestMaxPeerServersWithoutUnsubscribe(t *testing.T) {
  748. var maxPeerServers = 6
  749. tester, streamer, _, teardown, err := newStreamerTester(&RegistryOptions{
  750. MaxPeerServers: maxPeerServers,
  751. })
  752. defer teardown()
  753. if err != nil {
  754. t.Fatal(err)
  755. }
  756. streamer.RegisterServerFunc("foo", func(p *Peer, t string, live bool) (Server, error) {
  757. return newTestServer(t, 0), nil
  758. })
  759. node := tester.Nodes[0]
  760. for i := 0; i < maxPeerServers+10; i++ {
  761. stream := NewStream("foo", strconv.Itoa(i), true)
  762. if i >= maxPeerServers {
  763. err = tester.TestExchanges(p2ptest.Exchange{
  764. Label: "Subscribe message",
  765. Triggers: []p2ptest.Trigger{
  766. {
  767. Code: 4,
  768. Msg: &SubscribeMsg{
  769. Stream: stream,
  770. Priority: Top,
  771. },
  772. Peer: node.ID(),
  773. },
  774. },
  775. Expects: []p2ptest.Expect{
  776. {
  777. Code: 7,
  778. Msg: &SubscribeErrorMsg{
  779. Error: ErrMaxPeerServers.Error(),
  780. },
  781. Peer: node.ID(),
  782. },
  783. },
  784. })
  785. if err != nil {
  786. t.Fatal(err)
  787. }
  788. continue
  789. }
  790. err = tester.TestExchanges(p2ptest.Exchange{
  791. Label: "Subscribe message",
  792. Triggers: []p2ptest.Trigger{
  793. {
  794. Code: 4,
  795. Msg: &SubscribeMsg{
  796. Stream: stream,
  797. Priority: Top,
  798. },
  799. Peer: node.ID(),
  800. },
  801. },
  802. Expects: []p2ptest.Expect{
  803. {
  804. Code: 1,
  805. Msg: &OfferedHashesMsg{
  806. Stream: stream,
  807. HandoverProof: &HandoverProof{
  808. Handover: &Handover{},
  809. },
  810. Hashes: make([]byte, HashSize),
  811. From: 1,
  812. To: 0,
  813. },
  814. Peer: node.ID(),
  815. },
  816. },
  817. })
  818. if err != nil {
  819. t.Fatal(err)
  820. }
  821. }
  822. }
  823. //TestHasPriceImplementation is to check that the Registry has a
  824. //`Price` interface implementation
  825. func TestHasPriceImplementation(t *testing.T) {
  826. _, r, _, teardown, err := newStreamerTester(&RegistryOptions{
  827. Retrieval: RetrievalDisabled,
  828. Syncing: SyncingDisabled,
  829. })
  830. defer teardown()
  831. if err != nil {
  832. t.Fatal(err)
  833. }
  834. if r.prices == nil {
  835. t.Fatal("No prices implementation available for the stream protocol")
  836. }
  837. pricesInstance, ok := r.prices.(*StreamerPrices)
  838. if !ok {
  839. t.Fatal("`Registry` does not have the expected Prices instance")
  840. }
  841. price := pricesInstance.Price(&ChunkDeliveryMsgRetrieval{})
  842. if price == nil || price.Value == 0 || price.Value != pricesInstance.getChunkDeliveryMsgRetrievalPrice() {
  843. t.Fatal("No prices set for chunk delivery msg")
  844. }
  845. price = pricesInstance.Price(&RetrieveRequestMsg{})
  846. if price == nil || price.Value == 0 || price.Value != pricesInstance.getRetrieveRequestMsgPrice() {
  847. t.Fatal("No prices set for chunk delivery msg")
  848. }
  849. }
  850. /*
  851. TestRequestPeerSubscriptions is a unit test for stream's pull sync subscriptions.
  852. The test does:
  853. * assign each connected peer to a bin map
  854. * build up a known kademlia in advance
  855. * run the EachConn function, which returns supposed subscription bins
  856. * store all supposed bins per peer in a map
  857. * check that all peers have the expected subscriptions
  858. This kad table and its peers are copied from network.TestKademliaCase1,
  859. it represents an edge case but for the purpose of testing the
  860. syncing subscriptions it is just fine.
  861. Addresses used in this test are discovered as part of the simulation network
  862. in higher level tests for streaming. They were generated randomly.
  863. The resulting kademlia looks like this:
  864. =========================================================================
  865. Fri Dec 21 20:02:39 UTC 2018 KΛÐΞMLIΛ hive: queen's address: 7efef1
  866. population: 12 (12), MinProxBinSize: 2, MinBinSize: 2, MaxBinSize: 4
  867. 000 2 8196 835f | 2 8196 (0) 835f (0)
  868. 001 2 2690 28f0 | 2 2690 (0) 28f0 (0)
  869. 002 2 4d72 4a45 | 2 4d72 (0) 4a45 (0)
  870. 003 1 646e | 1 646e (0)
  871. 004 3 769c 76d1 7656 | 3 769c (0) 76d1 (0) 7656 (0)
  872. ============ DEPTH: 5 ==========================================
  873. 005 1 7a48 | 1 7a48 (0)
  874. 006 1 7cbd | 1 7cbd (0)
  875. 007 0 | 0
  876. 008 0 | 0
  877. 009 0 | 0
  878. 010 0 | 0
  879. 011 0 | 0
  880. 012 0 | 0
  881. 013 0 | 0
  882. 014 0 | 0
  883. 015 0 | 0
  884. =========================================================================
  885. */
  886. func TestRequestPeerSubscriptions(t *testing.T) {
  887. // the pivot address; this is the actual kademlia node
  888. pivotAddr := "7efef1c41d77f843ad167be95f6660567eb8a4a59f39240000cce2e0d65baf8e"
  889. // a map of bin number to addresses from the given kademlia
  890. binMap := make(map[int][]string)
  891. binMap[0] = []string{
  892. "835fbbf1d16ba7347b6e2fc552d6e982148d29c624ea20383850df3c810fa8fc",
  893. "81968a2d8fb39114342ee1da85254ec51e0608d7f0f6997c2a8354c260a71009",
  894. }
  895. binMap[1] = []string{
  896. "28f0bc1b44658548d6e05dd16d4c2fe77f1da5d48b6774bc4263b045725d0c19",
  897. "2690a910c33ee37b91eb6c4e0731d1d345e2dc3b46d308503a6e85bbc242c69e",
  898. }
  899. binMap[2] = []string{
  900. "4a45f1fc63e1a9cb9dfa44c98da2f3d20c2923e5d75ff60b2db9d1bdb0c54d51",
  901. "4d72a04ddeb851a68cd197ef9a92a3e2ff01fbbff638e64929dd1a9c2e150112",
  902. }
  903. binMap[3] = []string{
  904. "646e9540c84f6a2f9cf6585d45a4c219573b4fd1b64a3c9a1386fc5cf98c0d4d",
  905. }
  906. binMap[4] = []string{
  907. "7656caccdc79cd8d7ce66d415cc96a718e8271c62fb35746bfc2b49faf3eebf3",
  908. "76d1e83c71ca246d042e37ff1db181f2776265fbcfdc890ce230bfa617c9c2f0",
  909. "769ce86aa90b518b7ed382f9fdacfbed93574e18dc98fe6c342e4f9f409c2d5a",
  910. }
  911. binMap[5] = []string{
  912. "7a48f75f8ca60487ae42d6f92b785581b40b91f2da551ae73d5eae46640e02e8",
  913. }
  914. binMap[6] = []string{
  915. "7cbd42350bde8e18ae5b955b5450f8e2cef3419f92fbf5598160c60fd78619f0",
  916. }
  917. // create the pivot's kademlia
  918. addr := common.FromHex(pivotAddr)
  919. k := network.NewKademlia(addr, network.NewKadParams())
  920. // construct the peers and the kademlia
  921. for _, binaddrs := range binMap {
  922. for _, a := range binaddrs {
  923. addr := common.FromHex(a)
  924. k.On(network.NewPeer(&network.BzzPeer{BzzAddr: &network.BzzAddr{OAddr: addr}}, k))
  925. }
  926. }
  927. // TODO: check kad table is same
  928. // currently k.String() prints date so it will never be the same :)
  929. // --> implement JSON representation of kad table
  930. log.Debug(k.String())
  931. // simulate that we would do subscriptions: just store the bin numbers
  932. fakeSubscriptions := make(map[string][]int)
  933. //after the test, we need to reset the subscriptionFunc to the default
  934. defer func() { subscriptionFunc = doRequestSubscription }()
  935. // define the function which should run for each connection
  936. // instead of doing real subscriptions, we just store the bin numbers
  937. subscriptionFunc = func(r *Registry, p *network.Peer, bin uint8, subs map[enode.ID]map[Stream]struct{}) bool {
  938. // get the peer ID
  939. peerstr := fmt.Sprintf("%x", p.Over())
  940. // create the array of bins per peer
  941. if _, ok := fakeSubscriptions[peerstr]; !ok {
  942. fakeSubscriptions[peerstr] = make([]int, 0)
  943. }
  944. // store the (fake) bin subscription
  945. log.Debug(fmt.Sprintf("Adding fake subscription for peer %s with bin %d", peerstr, bin))
  946. fakeSubscriptions[peerstr] = append(fakeSubscriptions[peerstr], int(bin))
  947. return true
  948. }
  949. // create just a simple Registry object in order to be able to call...
  950. r := &Registry{}
  951. r.requestPeerSubscriptions(k, nil)
  952. // calculate the kademlia depth
  953. kdepth := k.NeighbourhoodDepth()
  954. // now, check that all peers have the expected (fake) subscriptions
  955. // iterate the bin map
  956. for bin, peers := range binMap {
  957. // for every peer...
  958. for _, peer := range peers {
  959. // ...get its (fake) subscriptions
  960. fakeSubsForPeer := fakeSubscriptions[peer]
  961. // if the peer's bin is shallower than the kademlia depth...
  962. if bin < kdepth {
  963. // (iterate all (fake) subscriptions)
  964. for _, subbin := range fakeSubsForPeer {
  965. // ...only the peer's bin should be "subscribed"
  966. // (and thus have only one subscription)
  967. if subbin != bin || len(fakeSubsForPeer) != 1 {
  968. t.Fatalf("Did not get expected subscription for bin < depth; bin of peer %s: %d, subscription: %d", peer, bin, subbin)
  969. }
  970. }
  971. } else { //if the peer's bin is equal or higher than the kademlia depth...
  972. // (iterate all (fake) subscriptions)
  973. for i, subbin := range fakeSubsForPeer {
  974. // ...each bin from the peer's bin number up to k.MaxProxDisplay should be "subscribed"
  975. // as we start from depth we can use the iteration index to check
  976. if subbin != i+kdepth {
  977. t.Fatalf("Did not get expected subscription for bin > depth; bin of peer %s: %d, subscription: %d", peer, bin, subbin)
  978. }
  979. // the last "subscription" should be k.MaxProxDisplay
  980. if i == len(fakeSubsForPeer)-1 && subbin != k.MaxProxDisplay {
  981. t.Fatalf("Expected last subscription to be: %d, but is: %d", k.MaxProxDisplay, subbin)
  982. }
  983. }
  984. }
  985. }
  986. }
  987. // print some output
  988. for p, subs := range fakeSubscriptions {
  989. log.Debug(fmt.Sprintf("Peer %s has the following fake subscriptions: ", p))
  990. for _, bin := range subs {
  991. log.Debug(fmt.Sprintf("%d,", bin))
  992. }
  993. }
  994. }