streamer_test.go 14 KB


  1. // Copyright 2018 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package stream
  17. import (
  18. "bytes"
  19. "context"
  20. "testing"
  21. "time"
  22. "github.com/ethereum/go-ethereum/crypto/sha3"
  23. p2ptest "github.com/ethereum/go-ethereum/p2p/testing"
  24. )
  25. func TestStreamerSubscribe(t *testing.T) {
  26. tester, streamer, _, teardown, err := newStreamerTester(t)
  27. defer teardown()
  28. if err != nil {
  29. t.Fatal(err)
  30. }
  31. stream := NewStream("foo", "", true)
  32. err = streamer.Subscribe(tester.Nodes[0].ID(), stream, NewRange(0, 0), Top)
  33. if err == nil || err.Error() != "stream foo not registered" {
  34. t.Fatalf("Expected error %v, got %v", "stream foo not registered", err)
  35. }
  36. }
  37. func TestStreamerRequestSubscription(t *testing.T) {
  38. tester, streamer, _, teardown, err := newStreamerTester(t)
  39. defer teardown()
  40. if err != nil {
  41. t.Fatal(err)
  42. }
  43. stream := NewStream("foo", "", false)
  44. err = streamer.RequestSubscription(tester.Nodes[0].ID(), stream, &Range{}, Top)
  45. if err == nil || err.Error() != "stream foo not registered" {
  46. t.Fatalf("Expected error %v, got %v", "stream foo not registered", err)
  47. }
  48. }
  49. var (
  50. hash0 = sha3.Sum256([]byte{0})
  51. hash1 = sha3.Sum256([]byte{1})
  52. hash2 = sha3.Sum256([]byte{2})
  53. hashesTmp = append(hash0[:], hash1[:]...)
  54. hashes = append(hashesTmp, hash2[:]...)
  55. )
  56. type testClient struct {
  57. t string
  58. wait0 chan bool
  59. wait2 chan bool
  60. batchDone chan bool
  61. receivedHashes map[string][]byte
  62. }
  63. func newTestClient(t string) *testClient {
  64. return &testClient{
  65. t: t,
  66. wait0: make(chan bool),
  67. wait2: make(chan bool),
  68. batchDone: make(chan bool),
  69. receivedHashes: make(map[string][]byte),
  70. }
  71. }
  72. func (self *testClient) NeedData(ctx context.Context, hash []byte) func(context.Context) error {
  73. self.receivedHashes[string(hash)] = hash
  74. if bytes.Equal(hash, hash0[:]) {
  75. return func(context.Context) error {
  76. <-self.wait0
  77. return nil
  78. }
  79. } else if bytes.Equal(hash, hash2[:]) {
  80. return func(context.Context) error {
  81. <-self.wait2
  82. return nil
  83. }
  84. }
  85. return nil
  86. }
  87. func (self *testClient) BatchDone(Stream, uint64, []byte, []byte) func() (*TakeoverProof, error) {
  88. close(self.batchDone)
  89. return nil
  90. }
  91. func (self *testClient) Close() {}
  92. type testServer struct {
  93. t string
  94. }
  95. func newTestServer(t string) *testServer {
  96. return &testServer{
  97. t: t,
  98. }
  99. }
  100. func (self *testServer) SetNextBatch(from uint64, to uint64) ([]byte, uint64, uint64, *HandoverProof, error) {
  101. return make([]byte, HashSize), from + 1, to + 1, nil, nil
  102. }
  103. func (self *testServer) GetData(context.Context, []byte) ([]byte, error) {
  104. return nil, nil
  105. }
  106. func (self *testServer) Close() {
  107. }
  108. func TestStreamerDownstreamSubscribeUnsubscribeMsgExchange(t *testing.T) {
  109. tester, streamer, _, teardown, err := newStreamerTester(t)
  110. defer teardown()
  111. if err != nil {
  112. t.Fatal(err)
  113. }
  114. streamer.RegisterClientFunc("foo", func(p *Peer, t string, live bool) (Client, error) {
  115. return newTestClient(t), nil
  116. })
  117. node := tester.Nodes[0]
  118. stream := NewStream("foo", "", true)
  119. err = streamer.Subscribe(node.ID(), stream, NewRange(5, 8), Top)
  120. if err != nil {
  121. t.Fatalf("Expected no error, got %v", err)
  122. }
  123. err = tester.TestExchanges(
  124. p2ptest.Exchange{
  125. Label: "Subscribe message",
  126. Expects: []p2ptest.Expect{
  127. {
  128. Code: 4,
  129. Msg: &SubscribeMsg{
  130. Stream: stream,
  131. History: NewRange(5, 8),
  132. Priority: Top,
  133. },
  134. Peer: node.ID(),
  135. },
  136. },
  137. },
  138. // trigger OfferedHashesMsg to actually create the client
  139. p2ptest.Exchange{
  140. Label: "OfferedHashes message",
  141. Triggers: []p2ptest.Trigger{
  142. {
  143. Code: 1,
  144. Msg: &OfferedHashesMsg{
  145. HandoverProof: &HandoverProof{
  146. Handover: &Handover{},
  147. },
  148. Hashes: hashes,
  149. From: 5,
  150. To: 8,
  151. Stream: stream,
  152. },
  153. Peer: node.ID(),
  154. },
  155. },
  156. Expects: []p2ptest.Expect{
  157. {
  158. Code: 2,
  159. Msg: &WantedHashesMsg{
  160. Stream: stream,
  161. Want: []byte{5},
  162. From: 9,
  163. To: 0,
  164. },
  165. Peer: node.ID(),
  166. },
  167. },
  168. },
  169. )
  170. if err != nil {
  171. t.Fatal(err)
  172. }
  173. err = streamer.Unsubscribe(node.ID(), stream)
  174. if err != nil {
  175. t.Fatalf("Expected no error, got %v", err)
  176. }
  177. err = tester.TestExchanges(p2ptest.Exchange{
  178. Label: "Unsubscribe message",
  179. Expects: []p2ptest.Expect{
  180. {
  181. Code: 0,
  182. Msg: &UnsubscribeMsg{
  183. Stream: stream,
  184. },
  185. Peer: node.ID(),
  186. },
  187. },
  188. })
  189. if err != nil {
  190. t.Fatal(err)
  191. }
  192. }
  193. func TestStreamerUpstreamSubscribeUnsubscribeMsgExchange(t *testing.T) {
  194. tester, streamer, _, teardown, err := newStreamerTester(t)
  195. defer teardown()
  196. if err != nil {
  197. t.Fatal(err)
  198. }
  199. stream := NewStream("foo", "", false)
  200. streamer.RegisterServerFunc("foo", func(p *Peer, t string, live bool) (Server, error) {
  201. return newTestServer(t), nil
  202. })
  203. node := tester.Nodes[0]
  204. err = tester.TestExchanges(p2ptest.Exchange{
  205. Label: "Subscribe message",
  206. Triggers: []p2ptest.Trigger{
  207. {
  208. Code: 4,
  209. Msg: &SubscribeMsg{
  210. Stream: stream,
  211. History: NewRange(5, 8),
  212. Priority: Top,
  213. },
  214. Peer: node.ID(),
  215. },
  216. },
  217. Expects: []p2ptest.Expect{
  218. {
  219. Code: 1,
  220. Msg: &OfferedHashesMsg{
  221. Stream: stream,
  222. HandoverProof: &HandoverProof{
  223. Handover: &Handover{},
  224. },
  225. Hashes: make([]byte, HashSize),
  226. From: 6,
  227. To: 9,
  228. },
  229. Peer: node.ID(),
  230. },
  231. },
  232. })
  233. if err != nil {
  234. t.Fatal(err)
  235. }
  236. err = tester.TestExchanges(p2ptest.Exchange{
  237. Label: "unsubscribe message",
  238. Triggers: []p2ptest.Trigger{
  239. {
  240. Code: 0,
  241. Msg: &UnsubscribeMsg{
  242. Stream: stream,
  243. },
  244. Peer: node.ID(),
  245. },
  246. },
  247. })
  248. if err != nil {
  249. t.Fatal(err)
  250. }
  251. }
  252. func TestStreamerUpstreamSubscribeUnsubscribeMsgExchangeLive(t *testing.T) {
  253. tester, streamer, _, teardown, err := newStreamerTester(t)
  254. defer teardown()
  255. if err != nil {
  256. t.Fatal(err)
  257. }
  258. stream := NewStream("foo", "", true)
  259. streamer.RegisterServerFunc("foo", func(p *Peer, t string, live bool) (Server, error) {
  260. return newTestServer(t), nil
  261. })
  262. node := tester.Nodes[0]
  263. err = tester.TestExchanges(p2ptest.Exchange{
  264. Label: "Subscribe message",
  265. Triggers: []p2ptest.Trigger{
  266. {
  267. Code: 4,
  268. Msg: &SubscribeMsg{
  269. Stream: stream,
  270. Priority: Top,
  271. },
  272. Peer: node.ID(),
  273. },
  274. },
  275. Expects: []p2ptest.Expect{
  276. {
  277. Code: 1,
  278. Msg: &OfferedHashesMsg{
  279. Stream: stream,
  280. HandoverProof: &HandoverProof{
  281. Handover: &Handover{},
  282. },
  283. Hashes: make([]byte, HashSize),
  284. From: 1,
  285. To: 1,
  286. },
  287. Peer: node.ID(),
  288. },
  289. },
  290. })
  291. if err != nil {
  292. t.Fatal(err)
  293. }
  294. err = tester.TestExchanges(p2ptest.Exchange{
  295. Label: "unsubscribe message",
  296. Triggers: []p2ptest.Trigger{
  297. {
  298. Code: 0,
  299. Msg: &UnsubscribeMsg{
  300. Stream: stream,
  301. },
  302. Peer: node.ID(),
  303. },
  304. },
  305. })
  306. if err != nil {
  307. t.Fatal(err)
  308. }
  309. }
  310. func TestStreamerUpstreamSubscribeErrorMsgExchange(t *testing.T) {
  311. tester, streamer, _, teardown, err := newStreamerTester(t)
  312. defer teardown()
  313. if err != nil {
  314. t.Fatal(err)
  315. }
  316. streamer.RegisterServerFunc("foo", func(p *Peer, t string, live bool) (Server, error) {
  317. return newTestServer(t), nil
  318. })
  319. stream := NewStream("bar", "", true)
  320. node := tester.Nodes[0]
  321. err = tester.TestExchanges(p2ptest.Exchange{
  322. Label: "Subscribe message",
  323. Triggers: []p2ptest.Trigger{
  324. {
  325. Code: 4,
  326. Msg: &SubscribeMsg{
  327. Stream: stream,
  328. History: NewRange(5, 8),
  329. Priority: Top,
  330. },
  331. Peer: node.ID(),
  332. },
  333. },
  334. Expects: []p2ptest.Expect{
  335. {
  336. Code: 7,
  337. Msg: &SubscribeErrorMsg{
  338. Error: "stream bar not registered",
  339. },
  340. Peer: node.ID(),
  341. },
  342. },
  343. })
  344. if err != nil {
  345. t.Fatal(err)
  346. }
  347. }
  348. func TestStreamerUpstreamSubscribeLiveAndHistory(t *testing.T) {
  349. tester, streamer, _, teardown, err := newStreamerTester(t)
  350. defer teardown()
  351. if err != nil {
  352. t.Fatal(err)
  353. }
  354. stream := NewStream("foo", "", true)
  355. streamer.RegisterServerFunc("foo", func(p *Peer, t string, live bool) (Server, error) {
  356. return &testServer{
  357. t: t,
  358. }, nil
  359. })
  360. node := tester.Nodes[0]
  361. err = tester.TestExchanges(p2ptest.Exchange{
  362. Label: "Subscribe message",
  363. Triggers: []p2ptest.Trigger{
  364. {
  365. Code: 4,
  366. Msg: &SubscribeMsg{
  367. Stream: stream,
  368. History: NewRange(5, 8),
  369. Priority: Top,
  370. },
  371. Peer: node.ID(),
  372. },
  373. },
  374. Expects: []p2ptest.Expect{
  375. {
  376. Code: 1,
  377. Msg: &OfferedHashesMsg{
  378. Stream: NewStream("foo", "", false),
  379. HandoverProof: &HandoverProof{
  380. Handover: &Handover{},
  381. },
  382. Hashes: make([]byte, HashSize),
  383. From: 6,
  384. To: 9,
  385. },
  386. Peer: node.ID(),
  387. },
  388. {
  389. Code: 1,
  390. Msg: &OfferedHashesMsg{
  391. Stream: stream,
  392. HandoverProof: &HandoverProof{
  393. Handover: &Handover{},
  394. },
  395. From: 1,
  396. To: 1,
  397. Hashes: make([]byte, HashSize),
  398. },
  399. Peer: node.ID(),
  400. },
  401. },
  402. })
  403. if err != nil {
  404. t.Fatal(err)
  405. }
  406. }
  407. func TestStreamerDownstreamOfferedHashesMsgExchange(t *testing.T) {
  408. tester, streamer, _, teardown, err := newStreamerTester(t)
  409. defer teardown()
  410. if err != nil {
  411. t.Fatal(err)
  412. }
  413. stream := NewStream("foo", "", true)
  414. var tc *testClient
  415. streamer.RegisterClientFunc("foo", func(p *Peer, t string, live bool) (Client, error) {
  416. tc = newTestClient(t)
  417. return tc, nil
  418. })
  419. node := tester.Nodes[0]
  420. err = streamer.Subscribe(node.ID(), stream, NewRange(5, 8), Top)
  421. if err != nil {
  422. t.Fatalf("Expected no error, got %v", err)
  423. }
  424. err = tester.TestExchanges(p2ptest.Exchange{
  425. Label: "Subscribe message",
  426. Expects: []p2ptest.Expect{
  427. {
  428. Code: 4,
  429. Msg: &SubscribeMsg{
  430. Stream: stream,
  431. History: NewRange(5, 8),
  432. Priority: Top,
  433. },
  434. Peer: node.ID(),
  435. },
  436. },
  437. },
  438. p2ptest.Exchange{
  439. Label: "WantedHashes message",
  440. Triggers: []p2ptest.Trigger{
  441. {
  442. Code: 1,
  443. Msg: &OfferedHashesMsg{
  444. HandoverProof: &HandoverProof{
  445. Handover: &Handover{},
  446. },
  447. Hashes: hashes,
  448. From: 5,
  449. To: 8,
  450. Stream: stream,
  451. },
  452. Peer: node.ID(),
  453. },
  454. },
  455. Expects: []p2ptest.Expect{
  456. {
  457. Code: 2,
  458. Msg: &WantedHashesMsg{
  459. Stream: stream,
  460. Want: []byte{5},
  461. From: 9,
  462. To: 0,
  463. },
  464. Peer: node.ID(),
  465. },
  466. },
  467. })
  468. if err != nil {
  469. t.Fatal(err)
  470. }
  471. if len(tc.receivedHashes) != 3 {
  472. t.Fatalf("Expected number of received hashes %v, got %v", 3, len(tc.receivedHashes))
  473. }
  474. close(tc.wait0)
  475. timeout := time.NewTimer(100 * time.Millisecond)
  476. defer timeout.Stop()
  477. select {
  478. case <-tc.batchDone:
  479. t.Fatal("batch done early")
  480. case <-timeout.C:
  481. }
  482. close(tc.wait2)
  483. timeout2 := time.NewTimer(10000 * time.Millisecond)
  484. defer timeout2.Stop()
  485. select {
  486. case <-tc.batchDone:
  487. case <-timeout2.C:
  488. t.Fatal("timeout waiting batchdone call")
  489. }
  490. }
  491. func TestStreamerRequestSubscriptionQuitMsgExchange(t *testing.T) {
  492. tester, streamer, _, teardown, err := newStreamerTester(t)
  493. defer teardown()
  494. if err != nil {
  495. t.Fatal(err)
  496. }
  497. streamer.RegisterServerFunc("foo", func(p *Peer, t string, live bool) (Server, error) {
  498. return newTestServer(t), nil
  499. })
  500. node := tester.Nodes[0]
  501. stream := NewStream("foo", "", true)
  502. err = streamer.RequestSubscription(node.ID(), stream, NewRange(5, 8), Top)
  503. if err != nil {
  504. t.Fatalf("Expected no error, got %v", err)
  505. }
  506. err = tester.TestExchanges(
  507. p2ptest.Exchange{
  508. Label: "RequestSubscription message",
  509. Expects: []p2ptest.Expect{
  510. {
  511. Code: 8,
  512. Msg: &RequestSubscriptionMsg{
  513. Stream: stream,
  514. History: NewRange(5, 8),
  515. Priority: Top,
  516. },
  517. Peer: node.ID(),
  518. },
  519. },
  520. },
  521. p2ptest.Exchange{
  522. Label: "Subscribe message",
  523. Triggers: []p2ptest.Trigger{
  524. {
  525. Code: 4,
  526. Msg: &SubscribeMsg{
  527. Stream: stream,
  528. History: NewRange(5, 8),
  529. Priority: Top,
  530. },
  531. Peer: node.ID(),
  532. },
  533. },
  534. Expects: []p2ptest.Expect{
  535. {
  536. Code: 1,
  537. Msg: &OfferedHashesMsg{
  538. Stream: NewStream("foo", "", false),
  539. HandoverProof: &HandoverProof{
  540. Handover: &Handover{},
  541. },
  542. Hashes: make([]byte, HashSize),
  543. From: 6,
  544. To: 9,
  545. },
  546. Peer: node.ID(),
  547. },
  548. {
  549. Code: 1,
  550. Msg: &OfferedHashesMsg{
  551. Stream: stream,
  552. HandoverProof: &HandoverProof{
  553. Handover: &Handover{},
  554. },
  555. From: 1,
  556. To: 1,
  557. Hashes: make([]byte, HashSize),
  558. },
  559. Peer: node.ID(),
  560. },
  561. },
  562. },
  563. )
  564. if err != nil {
  565. t.Fatal(err)
  566. }
  567. err = streamer.Quit(node.ID(), stream)
  568. if err != nil {
  569. t.Fatalf("Expected no error, got %v", err)
  570. }
  571. err = tester.TestExchanges(p2ptest.Exchange{
  572. Label: "Quit message",
  573. Expects: []p2ptest.Expect{
  574. {
  575. Code: 9,
  576. Msg: &QuitMsg{
  577. Stream: stream,
  578. },
  579. Peer: node.ID(),
  580. },
  581. },
  582. })
  583. if err != nil {
  584. t.Fatal(err)
  585. }
  586. historyStream := getHistoryStream(stream)
  587. err = streamer.Quit(node.ID(), historyStream)
  588. if err != nil {
  589. t.Fatalf("Expected no error, got %v", err)
  590. }
  591. err = tester.TestExchanges(p2ptest.Exchange{
  592. Label: "Quit message",
  593. Expects: []p2ptest.Expect{
  594. {
  595. Code: 9,
  596. Msg: &QuitMsg{
  597. Stream: historyStream,
  598. },
  599. Peer: node.ID(),
  600. },
  601. },
  602. })
  603. if err != nil {
  604. t.Fatal(err)
  605. }
  606. }