streamer_test.go 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923
  1. // Copyright 2018 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package stream
  17. import (
  18. "bytes"
  19. "context"
  20. "errors"
  21. "strconv"
  22. "testing"
  23. "time"
  24. p2ptest "github.com/ethereum/go-ethereum/p2p/testing"
  25. "golang.org/x/crypto/sha3"
  26. )
  27. func TestStreamerSubscribe(t *testing.T) {
  28. tester, streamer, _, teardown, err := newStreamerTester(t, nil)
  29. defer teardown()
  30. if err != nil {
  31. t.Fatal(err)
  32. }
  33. stream := NewStream("foo", "", true)
  34. err = streamer.Subscribe(tester.Nodes[0].ID(), stream, NewRange(0, 0), Top)
  35. if err == nil || err.Error() != "stream foo not registered" {
  36. t.Fatalf("Expected error %v, got %v", "stream foo not registered", err)
  37. }
  38. }
  39. func TestStreamerRequestSubscription(t *testing.T) {
  40. tester, streamer, _, teardown, err := newStreamerTester(t, nil)
  41. defer teardown()
  42. if err != nil {
  43. t.Fatal(err)
  44. }
  45. stream := NewStream("foo", "", false)
  46. err = streamer.RequestSubscription(tester.Nodes[0].ID(), stream, &Range{}, Top)
  47. if err == nil || err.Error() != "stream foo not registered" {
  48. t.Fatalf("Expected error %v, got %v", "stream foo not registered", err)
  49. }
  50. }
  51. var (
  52. hash0 = sha3.Sum256([]byte{0})
  53. hash1 = sha3.Sum256([]byte{1})
  54. hash2 = sha3.Sum256([]byte{2})
  55. hashesTmp = append(hash0[:], hash1[:]...)
  56. hashes = append(hashesTmp, hash2[:]...)
  57. corruptHashes = append(hashes[:40])
  58. )
  59. type testClient struct {
  60. t string
  61. wait0 chan bool
  62. wait2 chan bool
  63. batchDone chan bool
  64. receivedHashes map[string][]byte
  65. }
  66. func newTestClient(t string) *testClient {
  67. return &testClient{
  68. t: t,
  69. wait0: make(chan bool),
  70. wait2: make(chan bool),
  71. batchDone: make(chan bool),
  72. receivedHashes: make(map[string][]byte),
  73. }
  74. }
  75. func (self *testClient) NeedData(ctx context.Context, hash []byte) func(context.Context) error {
  76. self.receivedHashes[string(hash)] = hash
  77. if bytes.Equal(hash, hash0[:]) {
  78. return func(context.Context) error {
  79. <-self.wait0
  80. return nil
  81. }
  82. } else if bytes.Equal(hash, hash2[:]) {
  83. return func(context.Context) error {
  84. <-self.wait2
  85. return nil
  86. }
  87. }
  88. return nil
  89. }
  90. func (self *testClient) BatchDone(Stream, uint64, []byte, []byte) func() (*TakeoverProof, error) {
  91. close(self.batchDone)
  92. return nil
  93. }
  94. func (self *testClient) Close() {}
  95. type testServer struct {
  96. t string
  97. sessionIndex uint64
  98. }
  99. func newTestServer(t string, sessionIndex uint64) *testServer {
  100. return &testServer{
  101. t: t,
  102. sessionIndex: sessionIndex,
  103. }
  104. }
  105. func (s *testServer) SessionIndex() (uint64, error) {
  106. return s.sessionIndex, nil
  107. }
  108. func (self *testServer) SetNextBatch(from uint64, to uint64) ([]byte, uint64, uint64, *HandoverProof, error) {
  109. return make([]byte, HashSize), from + 1, to + 1, nil, nil
  110. }
  111. func (self *testServer) GetData(context.Context, []byte) ([]byte, error) {
  112. return nil, nil
  113. }
  114. func (self *testServer) Close() {
  115. }
  116. func TestStreamerDownstreamSubscribeUnsubscribeMsgExchange(t *testing.T) {
  117. tester, streamer, _, teardown, err := newStreamerTester(t, nil)
  118. defer teardown()
  119. if err != nil {
  120. t.Fatal(err)
  121. }
  122. streamer.RegisterClientFunc("foo", func(p *Peer, t string, live bool) (Client, error) {
  123. return newTestClient(t), nil
  124. })
  125. node := tester.Nodes[0]
  126. stream := NewStream("foo", "", true)
  127. err = streamer.Subscribe(node.ID(), stream, NewRange(5, 8), Top)
  128. if err != nil {
  129. t.Fatalf("Expected no error, got %v", err)
  130. }
  131. err = tester.TestExchanges(
  132. p2ptest.Exchange{
  133. Label: "Subscribe message",
  134. Expects: []p2ptest.Expect{
  135. {
  136. Code: 4,
  137. Msg: &SubscribeMsg{
  138. Stream: stream,
  139. History: NewRange(5, 8),
  140. Priority: Top,
  141. },
  142. Peer: node.ID(),
  143. },
  144. },
  145. },
  146. // trigger OfferedHashesMsg to actually create the client
  147. p2ptest.Exchange{
  148. Label: "OfferedHashes message",
  149. Triggers: []p2ptest.Trigger{
  150. {
  151. Code: 1,
  152. Msg: &OfferedHashesMsg{
  153. HandoverProof: &HandoverProof{
  154. Handover: &Handover{},
  155. },
  156. Hashes: hashes,
  157. From: 5,
  158. To: 8,
  159. Stream: stream,
  160. },
  161. Peer: node.ID(),
  162. },
  163. },
  164. Expects: []p2ptest.Expect{
  165. {
  166. Code: 2,
  167. Msg: &WantedHashesMsg{
  168. Stream: stream,
  169. Want: []byte{5},
  170. From: 9,
  171. To: 0,
  172. },
  173. Peer: node.ID(),
  174. },
  175. },
  176. },
  177. )
  178. if err != nil {
  179. t.Fatal(err)
  180. }
  181. err = streamer.Unsubscribe(node.ID(), stream)
  182. if err != nil {
  183. t.Fatalf("Expected no error, got %v", err)
  184. }
  185. err = tester.TestExchanges(p2ptest.Exchange{
  186. Label: "Unsubscribe message",
  187. Expects: []p2ptest.Expect{
  188. {
  189. Code: 0,
  190. Msg: &UnsubscribeMsg{
  191. Stream: stream,
  192. },
  193. Peer: node.ID(),
  194. },
  195. },
  196. })
  197. if err != nil {
  198. t.Fatal(err)
  199. }
  200. }
  201. func TestStreamerUpstreamSubscribeUnsubscribeMsgExchange(t *testing.T) {
  202. tester, streamer, _, teardown, err := newStreamerTester(t, nil)
  203. defer teardown()
  204. if err != nil {
  205. t.Fatal(err)
  206. }
  207. stream := NewStream("foo", "", false)
  208. streamer.RegisterServerFunc("foo", func(p *Peer, t string, live bool) (Server, error) {
  209. return newTestServer(t, 10), nil
  210. })
  211. node := tester.Nodes[0]
  212. err = tester.TestExchanges(p2ptest.Exchange{
  213. Label: "Subscribe message",
  214. Triggers: []p2ptest.Trigger{
  215. {
  216. Code: 4,
  217. Msg: &SubscribeMsg{
  218. Stream: stream,
  219. History: NewRange(5, 8),
  220. Priority: Top,
  221. },
  222. Peer: node.ID(),
  223. },
  224. },
  225. Expects: []p2ptest.Expect{
  226. {
  227. Code: 1,
  228. Msg: &OfferedHashesMsg{
  229. Stream: stream,
  230. HandoverProof: &HandoverProof{
  231. Handover: &Handover{},
  232. },
  233. Hashes: make([]byte, HashSize),
  234. From: 6,
  235. To: 9,
  236. },
  237. Peer: node.ID(),
  238. },
  239. },
  240. })
  241. if err != nil {
  242. t.Fatal(err)
  243. }
  244. err = tester.TestExchanges(p2ptest.Exchange{
  245. Label: "unsubscribe message",
  246. Triggers: []p2ptest.Trigger{
  247. {
  248. Code: 0,
  249. Msg: &UnsubscribeMsg{
  250. Stream: stream,
  251. },
  252. Peer: node.ID(),
  253. },
  254. },
  255. })
  256. if err != nil {
  257. t.Fatal(err)
  258. }
  259. }
  260. func TestStreamerUpstreamSubscribeUnsubscribeMsgExchangeLive(t *testing.T) {
  261. tester, streamer, _, teardown, err := newStreamerTester(t, nil)
  262. defer teardown()
  263. if err != nil {
  264. t.Fatal(err)
  265. }
  266. stream := NewStream("foo", "", true)
  267. streamer.RegisterServerFunc("foo", func(p *Peer, t string, live bool) (Server, error) {
  268. return newTestServer(t, 0), nil
  269. })
  270. node := tester.Nodes[0]
  271. err = tester.TestExchanges(p2ptest.Exchange{
  272. Label: "Subscribe message",
  273. Triggers: []p2ptest.Trigger{
  274. {
  275. Code: 4,
  276. Msg: &SubscribeMsg{
  277. Stream: stream,
  278. Priority: Top,
  279. },
  280. Peer: node.ID(),
  281. },
  282. },
  283. Expects: []p2ptest.Expect{
  284. {
  285. Code: 1,
  286. Msg: &OfferedHashesMsg{
  287. Stream: stream,
  288. HandoverProof: &HandoverProof{
  289. Handover: &Handover{},
  290. },
  291. Hashes: make([]byte, HashSize),
  292. From: 1,
  293. To: 0,
  294. },
  295. Peer: node.ID(),
  296. },
  297. },
  298. })
  299. if err != nil {
  300. t.Fatal(err)
  301. }
  302. err = tester.TestExchanges(p2ptest.Exchange{
  303. Label: "unsubscribe message",
  304. Triggers: []p2ptest.Trigger{
  305. {
  306. Code: 0,
  307. Msg: &UnsubscribeMsg{
  308. Stream: stream,
  309. },
  310. Peer: node.ID(),
  311. },
  312. },
  313. })
  314. if err != nil {
  315. t.Fatal(err)
  316. }
  317. }
  318. func TestStreamerUpstreamSubscribeErrorMsgExchange(t *testing.T) {
  319. tester, streamer, _, teardown, err := newStreamerTester(t, nil)
  320. defer teardown()
  321. if err != nil {
  322. t.Fatal(err)
  323. }
  324. streamer.RegisterServerFunc("foo", func(p *Peer, t string, live bool) (Server, error) {
  325. return newTestServer(t, 0), nil
  326. })
  327. stream := NewStream("bar", "", true)
  328. node := tester.Nodes[0]
  329. err = tester.TestExchanges(p2ptest.Exchange{
  330. Label: "Subscribe message",
  331. Triggers: []p2ptest.Trigger{
  332. {
  333. Code: 4,
  334. Msg: &SubscribeMsg{
  335. Stream: stream,
  336. History: NewRange(5, 8),
  337. Priority: Top,
  338. },
  339. Peer: node.ID(),
  340. },
  341. },
  342. Expects: []p2ptest.Expect{
  343. {
  344. Code: 7,
  345. Msg: &SubscribeErrorMsg{
  346. Error: "stream bar not registered",
  347. },
  348. Peer: node.ID(),
  349. },
  350. },
  351. })
  352. if err != nil {
  353. t.Fatal(err)
  354. }
  355. }
  356. func TestStreamerUpstreamSubscribeLiveAndHistory(t *testing.T) {
  357. tester, streamer, _, teardown, err := newStreamerTester(t, nil)
  358. defer teardown()
  359. if err != nil {
  360. t.Fatal(err)
  361. }
  362. stream := NewStream("foo", "", true)
  363. streamer.RegisterServerFunc("foo", func(p *Peer, t string, live bool) (Server, error) {
  364. return newTestServer(t, 10), nil
  365. })
  366. node := tester.Nodes[0]
  367. err = tester.TestExchanges(p2ptest.Exchange{
  368. Label: "Subscribe message",
  369. Triggers: []p2ptest.Trigger{
  370. {
  371. Code: 4,
  372. Msg: &SubscribeMsg{
  373. Stream: stream,
  374. History: NewRange(5, 8),
  375. Priority: Top,
  376. },
  377. Peer: node.ID(),
  378. },
  379. },
  380. Expects: []p2ptest.Expect{
  381. {
  382. Code: 1,
  383. Msg: &OfferedHashesMsg{
  384. Stream: NewStream("foo", "", false),
  385. HandoverProof: &HandoverProof{
  386. Handover: &Handover{},
  387. },
  388. Hashes: make([]byte, HashSize),
  389. From: 6,
  390. To: 9,
  391. },
  392. Peer: node.ID(),
  393. },
  394. {
  395. Code: 1,
  396. Msg: &OfferedHashesMsg{
  397. Stream: stream,
  398. HandoverProof: &HandoverProof{
  399. Handover: &Handover{},
  400. },
  401. From: 11,
  402. To: 0,
  403. Hashes: make([]byte, HashSize),
  404. },
  405. Peer: node.ID(),
  406. },
  407. },
  408. })
  409. if err != nil {
  410. t.Fatal(err)
  411. }
  412. }
  413. func TestStreamerDownstreamCorruptHashesMsgExchange(t *testing.T) {
  414. tester, streamer, _, teardown, err := newStreamerTester(t, nil)
  415. defer teardown()
  416. if err != nil {
  417. t.Fatal(err)
  418. }
  419. stream := NewStream("foo", "", true)
  420. var tc *testClient
  421. streamer.RegisterClientFunc("foo", func(p *Peer, t string, live bool) (Client, error) {
  422. tc = newTestClient(t)
  423. return tc, nil
  424. })
  425. node := tester.Nodes[0]
  426. err = streamer.Subscribe(node.ID(), stream, NewRange(5, 8), Top)
  427. if err != nil {
  428. t.Fatalf("Expected no error, got %v", err)
  429. }
  430. err = tester.TestExchanges(p2ptest.Exchange{
  431. Label: "Subscribe message",
  432. Expects: []p2ptest.Expect{
  433. {
  434. Code: 4,
  435. Msg: &SubscribeMsg{
  436. Stream: stream,
  437. History: NewRange(5, 8),
  438. Priority: Top,
  439. },
  440. Peer: node.ID(),
  441. },
  442. },
  443. },
  444. p2ptest.Exchange{
  445. Label: "Corrupt offered hash message",
  446. Triggers: []p2ptest.Trigger{
  447. {
  448. Code: 1,
  449. Msg: &OfferedHashesMsg{
  450. HandoverProof: &HandoverProof{
  451. Handover: &Handover{},
  452. },
  453. Hashes: corruptHashes,
  454. From: 5,
  455. To: 8,
  456. Stream: stream,
  457. },
  458. Peer: node.ID(),
  459. },
  460. },
  461. })
  462. if err != nil {
  463. t.Fatal(err)
  464. }
  465. expectedError := errors.New("Message handler error: (msg code 1): error invalid hashes length (len: 40)")
  466. if err := tester.TestDisconnected(&p2ptest.Disconnect{Peer: node.ID(), Error: expectedError}); err != nil {
  467. t.Fatal(err)
  468. }
  469. }
  470. func TestStreamerDownstreamOfferedHashesMsgExchange(t *testing.T) {
  471. tester, streamer, _, teardown, err := newStreamerTester(t, nil)
  472. defer teardown()
  473. if err != nil {
  474. t.Fatal(err)
  475. }
  476. stream := NewStream("foo", "", true)
  477. var tc *testClient
  478. streamer.RegisterClientFunc("foo", func(p *Peer, t string, live bool) (Client, error) {
  479. tc = newTestClient(t)
  480. return tc, nil
  481. })
  482. node := tester.Nodes[0]
  483. err = streamer.Subscribe(node.ID(), stream, NewRange(5, 8), Top)
  484. if err != nil {
  485. t.Fatalf("Expected no error, got %v", err)
  486. }
  487. err = tester.TestExchanges(p2ptest.Exchange{
  488. Label: "Subscribe message",
  489. Expects: []p2ptest.Expect{
  490. {
  491. Code: 4,
  492. Msg: &SubscribeMsg{
  493. Stream: stream,
  494. History: NewRange(5, 8),
  495. Priority: Top,
  496. },
  497. Peer: node.ID(),
  498. },
  499. },
  500. },
  501. p2ptest.Exchange{
  502. Label: "WantedHashes message",
  503. Triggers: []p2ptest.Trigger{
  504. {
  505. Code: 1,
  506. Msg: &OfferedHashesMsg{
  507. HandoverProof: &HandoverProof{
  508. Handover: &Handover{},
  509. },
  510. Hashes: hashes,
  511. From: 5,
  512. To: 8,
  513. Stream: stream,
  514. },
  515. Peer: node.ID(),
  516. },
  517. },
  518. Expects: []p2ptest.Expect{
  519. {
  520. Code: 2,
  521. Msg: &WantedHashesMsg{
  522. Stream: stream,
  523. Want: []byte{5},
  524. From: 9,
  525. To: 0,
  526. },
  527. Peer: node.ID(),
  528. },
  529. },
  530. })
  531. if err != nil {
  532. t.Fatal(err)
  533. }
  534. if len(tc.receivedHashes) != 3 {
  535. t.Fatalf("Expected number of received hashes %v, got %v", 3, len(tc.receivedHashes))
  536. }
  537. close(tc.wait0)
  538. timeout := time.NewTimer(100 * time.Millisecond)
  539. defer timeout.Stop()
  540. select {
  541. case <-tc.batchDone:
  542. t.Fatal("batch done early")
  543. case <-timeout.C:
  544. }
  545. close(tc.wait2)
  546. timeout2 := time.NewTimer(10000 * time.Millisecond)
  547. defer timeout2.Stop()
  548. select {
  549. case <-tc.batchDone:
  550. case <-timeout2.C:
  551. t.Fatal("timeout waiting batchdone call")
  552. }
  553. }
  554. func TestStreamerRequestSubscriptionQuitMsgExchange(t *testing.T) {
  555. tester, streamer, _, teardown, err := newStreamerTester(t, nil)
  556. defer teardown()
  557. if err != nil {
  558. t.Fatal(err)
  559. }
  560. streamer.RegisterServerFunc("foo", func(p *Peer, t string, live bool) (Server, error) {
  561. return newTestServer(t, 10), nil
  562. })
  563. node := tester.Nodes[0]
  564. stream := NewStream("foo", "", true)
  565. err = streamer.RequestSubscription(node.ID(), stream, NewRange(5, 8), Top)
  566. if err != nil {
  567. t.Fatalf("Expected no error, got %v", err)
  568. }
  569. err = tester.TestExchanges(
  570. p2ptest.Exchange{
  571. Label: "RequestSubscription message",
  572. Expects: []p2ptest.Expect{
  573. {
  574. Code: 8,
  575. Msg: &RequestSubscriptionMsg{
  576. Stream: stream,
  577. History: NewRange(5, 8),
  578. Priority: Top,
  579. },
  580. Peer: node.ID(),
  581. },
  582. },
  583. },
  584. p2ptest.Exchange{
  585. Label: "Subscribe message",
  586. Triggers: []p2ptest.Trigger{
  587. {
  588. Code: 4,
  589. Msg: &SubscribeMsg{
  590. Stream: stream,
  591. History: NewRange(5, 8),
  592. Priority: Top,
  593. },
  594. Peer: node.ID(),
  595. },
  596. },
  597. Expects: []p2ptest.Expect{
  598. {
  599. Code: 1,
  600. Msg: &OfferedHashesMsg{
  601. Stream: NewStream("foo", "", false),
  602. HandoverProof: &HandoverProof{
  603. Handover: &Handover{},
  604. },
  605. Hashes: make([]byte, HashSize),
  606. From: 6,
  607. To: 9,
  608. },
  609. Peer: node.ID(),
  610. },
  611. {
  612. Code: 1,
  613. Msg: &OfferedHashesMsg{
  614. Stream: stream,
  615. HandoverProof: &HandoverProof{
  616. Handover: &Handover{},
  617. },
  618. From: 11,
  619. To: 0,
  620. Hashes: make([]byte, HashSize),
  621. },
  622. Peer: node.ID(),
  623. },
  624. },
  625. },
  626. )
  627. if err != nil {
  628. t.Fatal(err)
  629. }
  630. err = streamer.Quit(node.ID(), stream)
  631. if err != nil {
  632. t.Fatalf("Expected no error, got %v", err)
  633. }
  634. err = tester.TestExchanges(p2ptest.Exchange{
  635. Label: "Quit message",
  636. Expects: []p2ptest.Expect{
  637. {
  638. Code: 9,
  639. Msg: &QuitMsg{
  640. Stream: stream,
  641. },
  642. Peer: node.ID(),
  643. },
  644. },
  645. })
  646. if err != nil {
  647. t.Fatal(err)
  648. }
  649. historyStream := getHistoryStream(stream)
  650. err = streamer.Quit(node.ID(), historyStream)
  651. if err != nil {
  652. t.Fatalf("Expected no error, got %v", err)
  653. }
  654. err = tester.TestExchanges(p2ptest.Exchange{
  655. Label: "Quit message",
  656. Expects: []p2ptest.Expect{
  657. {
  658. Code: 9,
  659. Msg: &QuitMsg{
  660. Stream: historyStream,
  661. },
  662. Peer: node.ID(),
  663. },
  664. },
  665. })
  666. if err != nil {
  667. t.Fatal(err)
  668. }
  669. }
  670. // TestMaxPeerServersWithUnsubscribe creates a registry with a limited
  671. // number of stream servers, and performs a test with subscriptions and
  672. // unsubscriptions, checking if unsubscriptions will remove streams,
  673. // leaving place for new streams.
  674. func TestMaxPeerServersWithUnsubscribe(t *testing.T) {
  675. var maxPeerServers = 6
  676. tester, streamer, _, teardown, err := newStreamerTester(t, &RegistryOptions{
  677. Retrieval: RetrievalDisabled,
  678. Syncing: SyncingDisabled,
  679. MaxPeerServers: maxPeerServers,
  680. })
  681. defer teardown()
  682. if err != nil {
  683. t.Fatal(err)
  684. }
  685. streamer.RegisterServerFunc("foo", func(p *Peer, t string, live bool) (Server, error) {
  686. return newTestServer(t, 0), nil
  687. })
  688. node := tester.Nodes[0]
  689. for i := 0; i < maxPeerServers+10; i++ {
  690. stream := NewStream("foo", strconv.Itoa(i), true)
  691. err = tester.TestExchanges(p2ptest.Exchange{
  692. Label: "Subscribe message",
  693. Triggers: []p2ptest.Trigger{
  694. {
  695. Code: 4,
  696. Msg: &SubscribeMsg{
  697. Stream: stream,
  698. Priority: Top,
  699. },
  700. Peer: node.ID(),
  701. },
  702. },
  703. Expects: []p2ptest.Expect{
  704. {
  705. Code: 1,
  706. Msg: &OfferedHashesMsg{
  707. Stream: stream,
  708. HandoverProof: &HandoverProof{
  709. Handover: &Handover{},
  710. },
  711. Hashes: make([]byte, HashSize),
  712. From: 1,
  713. To: 0,
  714. },
  715. Peer: node.ID(),
  716. },
  717. },
  718. })
  719. if err != nil {
  720. t.Fatal(err)
  721. }
  722. err = tester.TestExchanges(p2ptest.Exchange{
  723. Label: "unsubscribe message",
  724. Triggers: []p2ptest.Trigger{
  725. {
  726. Code: 0,
  727. Msg: &UnsubscribeMsg{
  728. Stream: stream,
  729. },
  730. Peer: node.ID(),
  731. },
  732. },
  733. })
  734. if err != nil {
  735. t.Fatal(err)
  736. }
  737. }
  738. }
  739. // TestMaxPeerServersWithoutUnsubscribe creates a registry with a limited
  740. // number of stream servers, and performs subscriptions to detect subscriptions
  741. // error message exchange.
  742. func TestMaxPeerServersWithoutUnsubscribe(t *testing.T) {
  743. var maxPeerServers = 6
  744. tester, streamer, _, teardown, err := newStreamerTester(t, &RegistryOptions{
  745. MaxPeerServers: maxPeerServers,
  746. })
  747. defer teardown()
  748. if err != nil {
  749. t.Fatal(err)
  750. }
  751. streamer.RegisterServerFunc("foo", func(p *Peer, t string, live bool) (Server, error) {
  752. return newTestServer(t, 0), nil
  753. })
  754. node := tester.Nodes[0]
  755. for i := 0; i < maxPeerServers+10; i++ {
  756. stream := NewStream("foo", strconv.Itoa(i), true)
  757. if i >= maxPeerServers {
  758. err = tester.TestExchanges(p2ptest.Exchange{
  759. Label: "Subscribe message",
  760. Triggers: []p2ptest.Trigger{
  761. {
  762. Code: 4,
  763. Msg: &SubscribeMsg{
  764. Stream: stream,
  765. Priority: Top,
  766. },
  767. Peer: node.ID(),
  768. },
  769. },
  770. Expects: []p2ptest.Expect{
  771. {
  772. Code: 7,
  773. Msg: &SubscribeErrorMsg{
  774. Error: ErrMaxPeerServers.Error(),
  775. },
  776. Peer: node.ID(),
  777. },
  778. },
  779. })
  780. if err != nil {
  781. t.Fatal(err)
  782. }
  783. continue
  784. }
  785. err = tester.TestExchanges(p2ptest.Exchange{
  786. Label: "Subscribe message",
  787. Triggers: []p2ptest.Trigger{
  788. {
  789. Code: 4,
  790. Msg: &SubscribeMsg{
  791. Stream: stream,
  792. Priority: Top,
  793. },
  794. Peer: node.ID(),
  795. },
  796. },
  797. Expects: []p2ptest.Expect{
  798. {
  799. Code: 1,
  800. Msg: &OfferedHashesMsg{
  801. Stream: stream,
  802. HandoverProof: &HandoverProof{
  803. Handover: &Handover{},
  804. },
  805. Hashes: make([]byte, HashSize),
  806. From: 1,
  807. To: 0,
  808. },
  809. Peer: node.ID(),
  810. },
  811. },
  812. })
  813. if err != nil {
  814. t.Fatal(err)
  815. }
  816. }
  817. }