streamer_test.go 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917
  1. // Copyright 2018 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package stream
  17. import (
  18. "bytes"
  19. "context"
  20. "errors"
  21. "strconv"
  22. "testing"
  23. "time"
  24. "github.com/ethereum/go-ethereum/crypto/sha3"
  25. p2ptest "github.com/ethereum/go-ethereum/p2p/testing"
  26. )
  27. func TestStreamerSubscribe(t *testing.T) {
  28. tester, streamer, _, teardown, err := newStreamerTester(t, nil)
  29. defer teardown()
  30. if err != nil {
  31. t.Fatal(err)
  32. }
  33. stream := NewStream("foo", "", true)
  34. err = streamer.Subscribe(tester.Nodes[0].ID(), stream, NewRange(0, 0), Top)
  35. if err == nil || err.Error() != "stream foo not registered" {
  36. t.Fatalf("Expected error %v, got %v", "stream foo not registered", err)
  37. }
  38. }
  39. func TestStreamerRequestSubscription(t *testing.T) {
  40. tester, streamer, _, teardown, err := newStreamerTester(t, nil)
  41. defer teardown()
  42. if err != nil {
  43. t.Fatal(err)
  44. }
  45. stream := NewStream("foo", "", false)
  46. err = streamer.RequestSubscription(tester.Nodes[0].ID(), stream, &Range{}, Top)
  47. if err == nil || err.Error() != "stream foo not registered" {
  48. t.Fatalf("Expected error %v, got %v", "stream foo not registered", err)
  49. }
  50. }
  51. var (
  52. hash0 = sha3.Sum256([]byte{0})
  53. hash1 = sha3.Sum256([]byte{1})
  54. hash2 = sha3.Sum256([]byte{2})
  55. hashesTmp = append(hash0[:], hash1[:]...)
  56. hashes = append(hashesTmp, hash2[:]...)
  57. corruptHashes = append(hashes[:40])
  58. )
  59. type testClient struct {
  60. t string
  61. wait0 chan bool
  62. wait2 chan bool
  63. batchDone chan bool
  64. receivedHashes map[string][]byte
  65. }
  66. func newTestClient(t string) *testClient {
  67. return &testClient{
  68. t: t,
  69. wait0: make(chan bool),
  70. wait2: make(chan bool),
  71. batchDone: make(chan bool),
  72. receivedHashes: make(map[string][]byte),
  73. }
  74. }
  75. func (self *testClient) NeedData(ctx context.Context, hash []byte) func(context.Context) error {
  76. self.receivedHashes[string(hash)] = hash
  77. if bytes.Equal(hash, hash0[:]) {
  78. return func(context.Context) error {
  79. <-self.wait0
  80. return nil
  81. }
  82. } else if bytes.Equal(hash, hash2[:]) {
  83. return func(context.Context) error {
  84. <-self.wait2
  85. return nil
  86. }
  87. }
  88. return nil
  89. }
  90. func (self *testClient) BatchDone(Stream, uint64, []byte, []byte) func() (*TakeoverProof, error) {
  91. close(self.batchDone)
  92. return nil
  93. }
  94. func (self *testClient) Close() {}
  95. type testServer struct {
  96. t string
  97. }
  98. func newTestServer(t string) *testServer {
  99. return &testServer{
  100. t: t,
  101. }
  102. }
  103. func (self *testServer) SetNextBatch(from uint64, to uint64) ([]byte, uint64, uint64, *HandoverProof, error) {
  104. return make([]byte, HashSize), from + 1, to + 1, nil, nil
  105. }
  106. func (self *testServer) GetData(context.Context, []byte) ([]byte, error) {
  107. return nil, nil
  108. }
  109. func (self *testServer) Close() {
  110. }
  111. func TestStreamerDownstreamSubscribeUnsubscribeMsgExchange(t *testing.T) {
  112. tester, streamer, _, teardown, err := newStreamerTester(t, nil)
  113. defer teardown()
  114. if err != nil {
  115. t.Fatal(err)
  116. }
  117. streamer.RegisterClientFunc("foo", func(p *Peer, t string, live bool) (Client, error) {
  118. return newTestClient(t), nil
  119. })
  120. node := tester.Nodes[0]
  121. stream := NewStream("foo", "", true)
  122. err = streamer.Subscribe(node.ID(), stream, NewRange(5, 8), Top)
  123. if err != nil {
  124. t.Fatalf("Expected no error, got %v", err)
  125. }
  126. err = tester.TestExchanges(
  127. p2ptest.Exchange{
  128. Label: "Subscribe message",
  129. Expects: []p2ptest.Expect{
  130. {
  131. Code: 4,
  132. Msg: &SubscribeMsg{
  133. Stream: stream,
  134. History: NewRange(5, 8),
  135. Priority: Top,
  136. },
  137. Peer: node.ID(),
  138. },
  139. },
  140. },
  141. // trigger OfferedHashesMsg to actually create the client
  142. p2ptest.Exchange{
  143. Label: "OfferedHashes message",
  144. Triggers: []p2ptest.Trigger{
  145. {
  146. Code: 1,
  147. Msg: &OfferedHashesMsg{
  148. HandoverProof: &HandoverProof{
  149. Handover: &Handover{},
  150. },
  151. Hashes: hashes,
  152. From: 5,
  153. To: 8,
  154. Stream: stream,
  155. },
  156. Peer: node.ID(),
  157. },
  158. },
  159. Expects: []p2ptest.Expect{
  160. {
  161. Code: 2,
  162. Msg: &WantedHashesMsg{
  163. Stream: stream,
  164. Want: []byte{5},
  165. From: 9,
  166. To: 0,
  167. },
  168. Peer: node.ID(),
  169. },
  170. },
  171. },
  172. )
  173. if err != nil {
  174. t.Fatal(err)
  175. }
  176. err = streamer.Unsubscribe(node.ID(), stream)
  177. if err != nil {
  178. t.Fatalf("Expected no error, got %v", err)
  179. }
  180. err = tester.TestExchanges(p2ptest.Exchange{
  181. Label: "Unsubscribe message",
  182. Expects: []p2ptest.Expect{
  183. {
  184. Code: 0,
  185. Msg: &UnsubscribeMsg{
  186. Stream: stream,
  187. },
  188. Peer: node.ID(),
  189. },
  190. },
  191. })
  192. if err != nil {
  193. t.Fatal(err)
  194. }
  195. }
  196. func TestStreamerUpstreamSubscribeUnsubscribeMsgExchange(t *testing.T) {
  197. tester, streamer, _, teardown, err := newStreamerTester(t, nil)
  198. defer teardown()
  199. if err != nil {
  200. t.Fatal(err)
  201. }
  202. stream := NewStream("foo", "", false)
  203. streamer.RegisterServerFunc("foo", func(p *Peer, t string, live bool) (Server, error) {
  204. return newTestServer(t), nil
  205. })
  206. node := tester.Nodes[0]
  207. err = tester.TestExchanges(p2ptest.Exchange{
  208. Label: "Subscribe message",
  209. Triggers: []p2ptest.Trigger{
  210. {
  211. Code: 4,
  212. Msg: &SubscribeMsg{
  213. Stream: stream,
  214. History: NewRange(5, 8),
  215. Priority: Top,
  216. },
  217. Peer: node.ID(),
  218. },
  219. },
  220. Expects: []p2ptest.Expect{
  221. {
  222. Code: 1,
  223. Msg: &OfferedHashesMsg{
  224. Stream: stream,
  225. HandoverProof: &HandoverProof{
  226. Handover: &Handover{},
  227. },
  228. Hashes: make([]byte, HashSize),
  229. From: 6,
  230. To: 9,
  231. },
  232. Peer: node.ID(),
  233. },
  234. },
  235. })
  236. if err != nil {
  237. t.Fatal(err)
  238. }
  239. err = tester.TestExchanges(p2ptest.Exchange{
  240. Label: "unsubscribe message",
  241. Triggers: []p2ptest.Trigger{
  242. {
  243. Code: 0,
  244. Msg: &UnsubscribeMsg{
  245. Stream: stream,
  246. },
  247. Peer: node.ID(),
  248. },
  249. },
  250. })
  251. if err != nil {
  252. t.Fatal(err)
  253. }
  254. }
  255. func TestStreamerUpstreamSubscribeUnsubscribeMsgExchangeLive(t *testing.T) {
  256. tester, streamer, _, teardown, err := newStreamerTester(t, nil)
  257. defer teardown()
  258. if err != nil {
  259. t.Fatal(err)
  260. }
  261. stream := NewStream("foo", "", true)
  262. streamer.RegisterServerFunc("foo", func(p *Peer, t string, live bool) (Server, error) {
  263. return newTestServer(t), nil
  264. })
  265. node := tester.Nodes[0]
  266. err = tester.TestExchanges(p2ptest.Exchange{
  267. Label: "Subscribe message",
  268. Triggers: []p2ptest.Trigger{
  269. {
  270. Code: 4,
  271. Msg: &SubscribeMsg{
  272. Stream: stream,
  273. Priority: Top,
  274. },
  275. Peer: node.ID(),
  276. },
  277. },
  278. Expects: []p2ptest.Expect{
  279. {
  280. Code: 1,
  281. Msg: &OfferedHashesMsg{
  282. Stream: stream,
  283. HandoverProof: &HandoverProof{
  284. Handover: &Handover{},
  285. },
  286. Hashes: make([]byte, HashSize),
  287. From: 1,
  288. To: 1,
  289. },
  290. Peer: node.ID(),
  291. },
  292. },
  293. })
  294. if err != nil {
  295. t.Fatal(err)
  296. }
  297. err = tester.TestExchanges(p2ptest.Exchange{
  298. Label: "unsubscribe message",
  299. Triggers: []p2ptest.Trigger{
  300. {
  301. Code: 0,
  302. Msg: &UnsubscribeMsg{
  303. Stream: stream,
  304. },
  305. Peer: node.ID(),
  306. },
  307. },
  308. })
  309. if err != nil {
  310. t.Fatal(err)
  311. }
  312. }
  313. func TestStreamerUpstreamSubscribeErrorMsgExchange(t *testing.T) {
  314. tester, streamer, _, teardown, err := newStreamerTester(t, nil)
  315. defer teardown()
  316. if err != nil {
  317. t.Fatal(err)
  318. }
  319. streamer.RegisterServerFunc("foo", func(p *Peer, t string, live bool) (Server, error) {
  320. return newTestServer(t), nil
  321. })
  322. stream := NewStream("bar", "", true)
  323. node := tester.Nodes[0]
  324. err = tester.TestExchanges(p2ptest.Exchange{
  325. Label: "Subscribe message",
  326. Triggers: []p2ptest.Trigger{
  327. {
  328. Code: 4,
  329. Msg: &SubscribeMsg{
  330. Stream: stream,
  331. History: NewRange(5, 8),
  332. Priority: Top,
  333. },
  334. Peer: node.ID(),
  335. },
  336. },
  337. Expects: []p2ptest.Expect{
  338. {
  339. Code: 7,
  340. Msg: &SubscribeErrorMsg{
  341. Error: "stream bar not registered",
  342. },
  343. Peer: node.ID(),
  344. },
  345. },
  346. })
  347. if err != nil {
  348. t.Fatal(err)
  349. }
  350. }
  351. func TestStreamerUpstreamSubscribeLiveAndHistory(t *testing.T) {
  352. tester, streamer, _, teardown, err := newStreamerTester(t, nil)
  353. defer teardown()
  354. if err != nil {
  355. t.Fatal(err)
  356. }
  357. stream := NewStream("foo", "", true)
  358. streamer.RegisterServerFunc("foo", func(p *Peer, t string, live bool) (Server, error) {
  359. return &testServer{
  360. t: t,
  361. }, nil
  362. })
  363. node := tester.Nodes[0]
  364. err = tester.TestExchanges(p2ptest.Exchange{
  365. Label: "Subscribe message",
  366. Triggers: []p2ptest.Trigger{
  367. {
  368. Code: 4,
  369. Msg: &SubscribeMsg{
  370. Stream: stream,
  371. History: NewRange(5, 8),
  372. Priority: Top,
  373. },
  374. Peer: node.ID(),
  375. },
  376. },
  377. Expects: []p2ptest.Expect{
  378. {
  379. Code: 1,
  380. Msg: &OfferedHashesMsg{
  381. Stream: NewStream("foo", "", false),
  382. HandoverProof: &HandoverProof{
  383. Handover: &Handover{},
  384. },
  385. Hashes: make([]byte, HashSize),
  386. From: 6,
  387. To: 9,
  388. },
  389. Peer: node.ID(),
  390. },
  391. {
  392. Code: 1,
  393. Msg: &OfferedHashesMsg{
  394. Stream: stream,
  395. HandoverProof: &HandoverProof{
  396. Handover: &Handover{},
  397. },
  398. From: 1,
  399. To: 1,
  400. Hashes: make([]byte, HashSize),
  401. },
  402. Peer: node.ID(),
  403. },
  404. },
  405. })
  406. if err != nil {
  407. t.Fatal(err)
  408. }
  409. }
  410. func TestStreamerDownstreamCorruptHashesMsgExchange(t *testing.T) {
  411. tester, streamer, _, teardown, err := newStreamerTester(t, nil)
  412. defer teardown()
  413. if err != nil {
  414. t.Fatal(err)
  415. }
  416. stream := NewStream("foo", "", true)
  417. var tc *testClient
  418. streamer.RegisterClientFunc("foo", func(p *Peer, t string, live bool) (Client, error) {
  419. tc = newTestClient(t)
  420. return tc, nil
  421. })
  422. node := tester.Nodes[0]
  423. err = streamer.Subscribe(node.ID(), stream, NewRange(5, 8), Top)
  424. if err != nil {
  425. t.Fatalf("Expected no error, got %v", err)
  426. }
  427. err = tester.TestExchanges(p2ptest.Exchange{
  428. Label: "Subscribe message",
  429. Expects: []p2ptest.Expect{
  430. {
  431. Code: 4,
  432. Msg: &SubscribeMsg{
  433. Stream: stream,
  434. History: NewRange(5, 8),
  435. Priority: Top,
  436. },
  437. Peer: node.ID(),
  438. },
  439. },
  440. },
  441. p2ptest.Exchange{
  442. Label: "Corrupt offered hash message",
  443. Triggers: []p2ptest.Trigger{
  444. {
  445. Code: 1,
  446. Msg: &OfferedHashesMsg{
  447. HandoverProof: &HandoverProof{
  448. Handover: &Handover{},
  449. },
  450. Hashes: corruptHashes,
  451. From: 5,
  452. To: 8,
  453. Stream: stream,
  454. },
  455. Peer: node.ID(),
  456. },
  457. },
  458. })
  459. if err != nil {
  460. t.Fatal(err)
  461. }
  462. expectedError := errors.New("Message handler error: (msg code 1): error invalid hashes length (len: 40)")
  463. if err := tester.TestDisconnected(&p2ptest.Disconnect{Peer: node.ID(), Error: expectedError}); err != nil {
  464. t.Fatal(err)
  465. }
  466. }
  467. func TestStreamerDownstreamOfferedHashesMsgExchange(t *testing.T) {
  468. tester, streamer, _, teardown, err := newStreamerTester(t, nil)
  469. defer teardown()
  470. if err != nil {
  471. t.Fatal(err)
  472. }
  473. stream := NewStream("foo", "", true)
  474. var tc *testClient
  475. streamer.RegisterClientFunc("foo", func(p *Peer, t string, live bool) (Client, error) {
  476. tc = newTestClient(t)
  477. return tc, nil
  478. })
  479. node := tester.Nodes[0]
  480. err = streamer.Subscribe(node.ID(), stream, NewRange(5, 8), Top)
  481. if err != nil {
  482. t.Fatalf("Expected no error, got %v", err)
  483. }
  484. err = tester.TestExchanges(p2ptest.Exchange{
  485. Label: "Subscribe message",
  486. Expects: []p2ptest.Expect{
  487. {
  488. Code: 4,
  489. Msg: &SubscribeMsg{
  490. Stream: stream,
  491. History: NewRange(5, 8),
  492. Priority: Top,
  493. },
  494. Peer: node.ID(),
  495. },
  496. },
  497. },
  498. p2ptest.Exchange{
  499. Label: "WantedHashes message",
  500. Triggers: []p2ptest.Trigger{
  501. {
  502. Code: 1,
  503. Msg: &OfferedHashesMsg{
  504. HandoverProof: &HandoverProof{
  505. Handover: &Handover{},
  506. },
  507. Hashes: hashes,
  508. From: 5,
  509. To: 8,
  510. Stream: stream,
  511. },
  512. Peer: node.ID(),
  513. },
  514. },
  515. Expects: []p2ptest.Expect{
  516. {
  517. Code: 2,
  518. Msg: &WantedHashesMsg{
  519. Stream: stream,
  520. Want: []byte{5},
  521. From: 9,
  522. To: 0,
  523. },
  524. Peer: node.ID(),
  525. },
  526. },
  527. })
  528. if err != nil {
  529. t.Fatal(err)
  530. }
  531. if len(tc.receivedHashes) != 3 {
  532. t.Fatalf("Expected number of received hashes %v, got %v", 3, len(tc.receivedHashes))
  533. }
  534. close(tc.wait0)
  535. timeout := time.NewTimer(100 * time.Millisecond)
  536. defer timeout.Stop()
  537. select {
  538. case <-tc.batchDone:
  539. t.Fatal("batch done early")
  540. case <-timeout.C:
  541. }
  542. close(tc.wait2)
  543. timeout2 := time.NewTimer(10000 * time.Millisecond)
  544. defer timeout2.Stop()
  545. select {
  546. case <-tc.batchDone:
  547. case <-timeout2.C:
  548. t.Fatal("timeout waiting batchdone call")
  549. }
  550. }
  551. func TestStreamerRequestSubscriptionQuitMsgExchange(t *testing.T) {
  552. tester, streamer, _, teardown, err := newStreamerTester(t, nil)
  553. defer teardown()
  554. if err != nil {
  555. t.Fatal(err)
  556. }
  557. streamer.RegisterServerFunc("foo", func(p *Peer, t string, live bool) (Server, error) {
  558. return newTestServer(t), nil
  559. })
  560. node := tester.Nodes[0]
  561. stream := NewStream("foo", "", true)
  562. err = streamer.RequestSubscription(node.ID(), stream, NewRange(5, 8), Top)
  563. if err != nil {
  564. t.Fatalf("Expected no error, got %v", err)
  565. }
  566. err = tester.TestExchanges(
  567. p2ptest.Exchange{
  568. Label: "RequestSubscription message",
  569. Expects: []p2ptest.Expect{
  570. {
  571. Code: 8,
  572. Msg: &RequestSubscriptionMsg{
  573. Stream: stream,
  574. History: NewRange(5, 8),
  575. Priority: Top,
  576. },
  577. Peer: node.ID(),
  578. },
  579. },
  580. },
  581. p2ptest.Exchange{
  582. Label: "Subscribe message",
  583. Triggers: []p2ptest.Trigger{
  584. {
  585. Code: 4,
  586. Msg: &SubscribeMsg{
  587. Stream: stream,
  588. History: NewRange(5, 8),
  589. Priority: Top,
  590. },
  591. Peer: node.ID(),
  592. },
  593. },
  594. Expects: []p2ptest.Expect{
  595. {
  596. Code: 1,
  597. Msg: &OfferedHashesMsg{
  598. Stream: NewStream("foo", "", false),
  599. HandoverProof: &HandoverProof{
  600. Handover: &Handover{},
  601. },
  602. Hashes: make([]byte, HashSize),
  603. From: 6,
  604. To: 9,
  605. },
  606. Peer: node.ID(),
  607. },
  608. {
  609. Code: 1,
  610. Msg: &OfferedHashesMsg{
  611. Stream: stream,
  612. HandoverProof: &HandoverProof{
  613. Handover: &Handover{},
  614. },
  615. From: 1,
  616. To: 1,
  617. Hashes: make([]byte, HashSize),
  618. },
  619. Peer: node.ID(),
  620. },
  621. },
  622. },
  623. )
  624. if err != nil {
  625. t.Fatal(err)
  626. }
  627. err = streamer.Quit(node.ID(), stream)
  628. if err != nil {
  629. t.Fatalf("Expected no error, got %v", err)
  630. }
  631. err = tester.TestExchanges(p2ptest.Exchange{
  632. Label: "Quit message",
  633. Expects: []p2ptest.Expect{
  634. {
  635. Code: 9,
  636. Msg: &QuitMsg{
  637. Stream: stream,
  638. },
  639. Peer: node.ID(),
  640. },
  641. },
  642. })
  643. if err != nil {
  644. t.Fatal(err)
  645. }
  646. historyStream := getHistoryStream(stream)
  647. err = streamer.Quit(node.ID(), historyStream)
  648. if err != nil {
  649. t.Fatalf("Expected no error, got %v", err)
  650. }
  651. err = tester.TestExchanges(p2ptest.Exchange{
  652. Label: "Quit message",
  653. Expects: []p2ptest.Expect{
  654. {
  655. Code: 9,
  656. Msg: &QuitMsg{
  657. Stream: historyStream,
  658. },
  659. Peer: node.ID(),
  660. },
  661. },
  662. })
  663. if err != nil {
  664. t.Fatal(err)
  665. }
  666. }
  667. // TestMaxPeerServersWithUnsubscribe creates a registry with a limited
  668. // number of stream servers, and performs a test with subscriptions and
  669. // unsubscriptions, checking if unsubscriptions will remove streams,
  670. // leaving place for new streams.
  671. func TestMaxPeerServersWithUnsubscribe(t *testing.T) {
  672. var maxPeerServers = 6
  673. tester, streamer, _, teardown, err := newStreamerTester(t, &RegistryOptions{
  674. MaxPeerServers: maxPeerServers,
  675. })
  676. defer teardown()
  677. if err != nil {
  678. t.Fatal(err)
  679. }
  680. streamer.RegisterServerFunc("foo", func(p *Peer, t string, live bool) (Server, error) {
  681. return newTestServer(t), nil
  682. })
  683. node := tester.Nodes[0]
  684. for i := 0; i < maxPeerServers+10; i++ {
  685. stream := NewStream("foo", strconv.Itoa(i), true)
  686. err = tester.TestExchanges(p2ptest.Exchange{
  687. Label: "Subscribe message",
  688. Triggers: []p2ptest.Trigger{
  689. {
  690. Code: 4,
  691. Msg: &SubscribeMsg{
  692. Stream: stream,
  693. Priority: Top,
  694. },
  695. Peer: node.ID(),
  696. },
  697. },
  698. Expects: []p2ptest.Expect{
  699. {
  700. Code: 1,
  701. Msg: &OfferedHashesMsg{
  702. Stream: stream,
  703. HandoverProof: &HandoverProof{
  704. Handover: &Handover{},
  705. },
  706. Hashes: make([]byte, HashSize),
  707. From: 1,
  708. To: 1,
  709. },
  710. Peer: node.ID(),
  711. },
  712. },
  713. })
  714. if err != nil {
  715. t.Fatal(err)
  716. }
  717. err = tester.TestExchanges(p2ptest.Exchange{
  718. Label: "unsubscribe message",
  719. Triggers: []p2ptest.Trigger{
  720. {
  721. Code: 0,
  722. Msg: &UnsubscribeMsg{
  723. Stream: stream,
  724. },
  725. Peer: node.ID(),
  726. },
  727. },
  728. })
  729. if err != nil {
  730. t.Fatal(err)
  731. }
  732. }
  733. }
  734. // TestMaxPeerServersWithoutUnsubscribe creates a registry with a limited
  735. // number of stream servers, and performs subscriptions to detect subscriptions
  736. // error message exchange.
  737. func TestMaxPeerServersWithoutUnsubscribe(t *testing.T) {
  738. var maxPeerServers = 6
  739. tester, streamer, _, teardown, err := newStreamerTester(t, &RegistryOptions{
  740. MaxPeerServers: maxPeerServers,
  741. })
  742. defer teardown()
  743. if err != nil {
  744. t.Fatal(err)
  745. }
  746. streamer.RegisterServerFunc("foo", func(p *Peer, t string, live bool) (Server, error) {
  747. return newTestServer(t), nil
  748. })
  749. node := tester.Nodes[0]
  750. for i := 0; i < maxPeerServers+10; i++ {
  751. stream := NewStream("foo", strconv.Itoa(i), true)
  752. if i >= maxPeerServers {
  753. err = tester.TestExchanges(p2ptest.Exchange{
  754. Label: "Subscribe message",
  755. Triggers: []p2ptest.Trigger{
  756. {
  757. Code: 4,
  758. Msg: &SubscribeMsg{
  759. Stream: stream,
  760. Priority: Top,
  761. },
  762. Peer: node.ID(),
  763. },
  764. },
  765. Expects: []p2ptest.Expect{
  766. {
  767. Code: 7,
  768. Msg: &SubscribeErrorMsg{
  769. Error: ErrMaxPeerServers.Error(),
  770. },
  771. Peer: node.ID(),
  772. },
  773. },
  774. })
  775. if err != nil {
  776. t.Fatal(err)
  777. }
  778. continue
  779. }
  780. err = tester.TestExchanges(p2ptest.Exchange{
  781. Label: "Subscribe message",
  782. Triggers: []p2ptest.Trigger{
  783. {
  784. Code: 4,
  785. Msg: &SubscribeMsg{
  786. Stream: stream,
  787. Priority: Top,
  788. },
  789. Peer: node.ID(),
  790. },
  791. },
  792. Expects: []p2ptest.Expect{
  793. {
  794. Code: 1,
  795. Msg: &OfferedHashesMsg{
  796. Stream: stream,
  797. HandoverProof: &HandoverProof{
  798. Handover: &Handover{},
  799. },
  800. Hashes: make([]byte, HashSize),
  801. From: 1,
  802. To: 1,
  803. },
  804. Peer: node.ID(),
  805. },
  806. },
  807. })
  808. if err != nil {
  809. t.Fatal(err)
  810. }
  811. }
  812. }