protocol_test.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624
  1. // Copyright 2017 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package protocols
  17. import (
  18. "bytes"
  19. "context"
  20. "errors"
  21. "fmt"
  22. "sync"
  23. "testing"
  24. "time"
  25. "github.com/ethereum/go-ethereum/rlp"
  26. "github.com/ethereum/go-ethereum/crypto"
  27. "github.com/ethereum/go-ethereum/p2p"
  28. "github.com/ethereum/go-ethereum/p2p/enode"
  29. "github.com/ethereum/go-ethereum/p2p/simulations/adapters"
  30. p2ptest "github.com/ethereum/go-ethereum/p2p/testing"
  31. )
  32. // handshake message type
  33. type hs0 struct {
  34. C uint
  35. }
  36. // message to kill/drop the peer with nodeID
  37. type kill struct {
  38. C enode.ID
  39. }
  40. // message to drop connection
  41. type drop struct {
  42. }
  43. /// protoHandshake represents module-independent aspects of the protocol and is
  44. // the first message peers send and receive as part the initial exchange
  45. type protoHandshake struct {
  46. Version uint // local and remote peer should have identical version
  47. NetworkID string // local and remote peer should have identical network id
  48. }
  49. // checkProtoHandshake verifies local and remote protoHandshakes match
  50. func checkProtoHandshake(testVersion uint, testNetworkID string) func(interface{}) error {
  51. return func(rhs interface{}) error {
  52. remote := rhs.(*protoHandshake)
  53. if remote.NetworkID != testNetworkID {
  54. return fmt.Errorf("%s (!= %s)", remote.NetworkID, testNetworkID)
  55. }
  56. if remote.Version != testVersion {
  57. return fmt.Errorf("%d (!= %d)", remote.Version, testVersion)
  58. }
  59. return nil
  60. }
  61. }
  62. // newProtocol sets up a protocol
  63. // the run function here demonstrates a typical protocol using peerPool, handshake
  64. // and messages registered to handlers
  65. func newProtocol(pp *p2ptest.TestPeerPool) func(*p2p.Peer, p2p.MsgReadWriter) error {
  66. spec := &Spec{
  67. Name: "test",
  68. Version: 42,
  69. MaxMsgSize: 10 * 1024,
  70. Messages: []interface{}{
  71. protoHandshake{},
  72. hs0{},
  73. kill{},
  74. drop{},
  75. },
  76. }
  77. return func(p *p2p.Peer, rw p2p.MsgReadWriter) error {
  78. peer := NewPeer(p, rw, spec)
  79. // initiate one-off protohandshake and check validity
  80. ctx, cancel := context.WithTimeout(context.Background(), time.Second)
  81. defer cancel()
  82. phs := &protoHandshake{42, "420"}
  83. hsCheck := checkProtoHandshake(phs.Version, phs.NetworkID)
  84. _, err := peer.Handshake(ctx, phs, hsCheck)
  85. if err != nil {
  86. return err
  87. }
  88. lhs := &hs0{42}
  89. // module handshake demonstrating a simple repeatable exchange of same-type message
  90. hs, err := peer.Handshake(ctx, lhs, nil)
  91. if err != nil {
  92. return err
  93. }
  94. if rmhs := hs.(*hs0); rmhs.C > lhs.C {
  95. return fmt.Errorf("handshake mismatch remote %v > local %v", rmhs.C, lhs.C)
  96. }
  97. handle := func(ctx context.Context, msg interface{}) error {
  98. switch msg := msg.(type) {
  99. case *protoHandshake:
  100. return errors.New("duplicate handshake")
  101. case *hs0:
  102. rhs := msg
  103. if rhs.C > lhs.C {
  104. return fmt.Errorf("handshake mismatch remote %v > local %v", rhs.C, lhs.C)
  105. }
  106. lhs.C += rhs.C
  107. return peer.Send(ctx, lhs)
  108. case *kill:
  109. // demonstrates use of peerPool, killing another peer connection as a response to a message
  110. id := msg.C
  111. pp.Get(id).Drop()
  112. return nil
  113. case *drop:
  114. // for testing we can trigger self induced disconnect upon receiving drop message
  115. return errors.New("dropped")
  116. default:
  117. return fmt.Errorf("unknown message type: %T", msg)
  118. }
  119. }
  120. pp.Add(peer)
  121. defer pp.Remove(peer)
  122. return peer.Run(handle)
  123. }
  124. }
  125. func protocolTester(pp *p2ptest.TestPeerPool) *p2ptest.ProtocolTester {
  126. prvkey, err := crypto.GenerateKey()
  127. if err != nil {
  128. panic(err)
  129. }
  130. return p2ptest.NewProtocolTester(prvkey, 2, newProtocol(pp))
  131. }
  132. func protoHandshakeExchange(id enode.ID, proto *protoHandshake) []p2ptest.Exchange {
  133. return []p2ptest.Exchange{
  134. {
  135. Expects: []p2ptest.Expect{
  136. {
  137. Code: 0,
  138. Msg: &protoHandshake{42, "420"},
  139. Peer: id,
  140. },
  141. },
  142. },
  143. {
  144. Triggers: []p2ptest.Trigger{
  145. {
  146. Code: 0,
  147. Msg: proto,
  148. Peer: id,
  149. },
  150. },
  151. },
  152. }
  153. }
  154. func runProtoHandshake(t *testing.T, proto *protoHandshake, errs ...error) {
  155. t.Helper()
  156. pp := p2ptest.NewTestPeerPool()
  157. s := protocolTester(pp)
  158. defer s.Stop()
  159. // TODO: make this more than one handshake
  160. node := s.Nodes[0]
  161. if err := s.TestExchanges(protoHandshakeExchange(node.ID(), proto)...); err != nil {
  162. t.Fatal(err)
  163. }
  164. var disconnects []*p2ptest.Disconnect
  165. for i, err := range errs {
  166. disconnects = append(disconnects, &p2ptest.Disconnect{Peer: s.Nodes[i].ID(), Error: err})
  167. }
  168. if err := s.TestDisconnected(disconnects...); err != nil {
  169. t.Fatal(err)
  170. }
  171. }
  172. type dummyHook struct {
  173. peer *Peer
  174. size uint32
  175. msg interface{}
  176. send bool
  177. err error
  178. waitC chan struct{}
  179. mu sync.Mutex
  180. }
  181. type dummyMsg struct {
  182. Content string
  183. }
  184. func (d *dummyHook) Send(peer *Peer, size uint32, msg interface{}) error {
  185. d.mu.Lock()
  186. defer d.mu.Unlock()
  187. d.peer = peer
  188. d.size = size
  189. d.msg = msg
  190. d.send = true
  191. return d.err
  192. }
  193. func (d *dummyHook) Receive(peer *Peer, size uint32, msg interface{}) error {
  194. d.mu.Lock()
  195. defer d.mu.Unlock()
  196. d.peer = peer
  197. d.size = size
  198. d.msg = msg
  199. d.send = false
  200. d.waitC <- struct{}{}
  201. return d.err
  202. }
  203. func TestProtocolHook(t *testing.T) {
  204. testHook := &dummyHook{
  205. waitC: make(chan struct{}, 1),
  206. }
  207. spec := &Spec{
  208. Name: "test",
  209. Version: 42,
  210. MaxMsgSize: 10 * 1024,
  211. Messages: []interface{}{
  212. dummyMsg{},
  213. },
  214. Hook: testHook,
  215. }
  216. runFunc := func(p *p2p.Peer, rw p2p.MsgReadWriter) error {
  217. peer := NewPeer(p, rw, spec)
  218. ctx := context.TODO()
  219. err := peer.Send(ctx, &dummyMsg{
  220. Content: "handshake"})
  221. if err != nil {
  222. t.Fatal(err)
  223. }
  224. handle := func(ctx context.Context, msg interface{}) error {
  225. return nil
  226. }
  227. return peer.Run(handle)
  228. }
  229. prvkey, err := crypto.GenerateKey()
  230. if err != nil {
  231. panic(err)
  232. }
  233. tester := p2ptest.NewProtocolTester(prvkey, 2, runFunc)
  234. defer tester.Stop()
  235. err = tester.TestExchanges(p2ptest.Exchange{
  236. Expects: []p2ptest.Expect{
  237. {
  238. Code: 0,
  239. Msg: &dummyMsg{Content: "handshake"},
  240. Peer: tester.Nodes[0].ID(),
  241. },
  242. },
  243. })
  244. if err != nil {
  245. t.Fatal(err)
  246. }
  247. testHook.mu.Lock()
  248. if testHook.msg == nil || testHook.msg.(*dummyMsg).Content != "handshake" {
  249. t.Fatal("Expected msg to be set, but it is not")
  250. }
  251. if !testHook.send {
  252. t.Fatal("Expected a send message, but it is not")
  253. }
  254. if testHook.peer == nil {
  255. t.Fatal("Expected peer to be set, is nil")
  256. }
  257. if peerId := testHook.peer.ID(); peerId != tester.Nodes[0].ID() && peerId != tester.Nodes[1].ID() {
  258. t.Fatalf("Expected peer ID to be set correctly, but it is not (got %v, exp %v or %v", peerId, tester.Nodes[0].ID(), tester.Nodes[1].ID())
  259. }
  260. if testHook.size != 11 { //11 is the length of the encoded message
  261. t.Fatalf("Expected size to be %d, but it is %d ", 1, testHook.size)
  262. }
  263. testHook.mu.Unlock()
  264. err = tester.TestExchanges(p2ptest.Exchange{
  265. Triggers: []p2ptest.Trigger{
  266. {
  267. Code: 0,
  268. Msg: &dummyMsg{Content: "response"},
  269. Peer: tester.Nodes[1].ID(),
  270. },
  271. },
  272. })
  273. <-testHook.waitC
  274. if err != nil {
  275. t.Fatal(err)
  276. }
  277. testHook.mu.Lock()
  278. if testHook.msg == nil || testHook.msg.(*dummyMsg).Content != "response" {
  279. t.Fatal("Expected msg to be set, but it is not")
  280. }
  281. if testHook.send {
  282. t.Fatal("Expected a send message, but it is not")
  283. }
  284. if testHook.peer == nil || testHook.peer.ID() != tester.Nodes[1].ID() {
  285. t.Fatal("Expected peer ID to be set correctly, but it is not")
  286. }
  287. if testHook.size != 10 { //11 is the length of the encoded message
  288. t.Fatalf("Expected size to be %d, but it is %d ", 1, testHook.size)
  289. }
  290. testHook.mu.Unlock()
  291. testHook.err = fmt.Errorf("dummy error")
  292. err = tester.TestExchanges(p2ptest.Exchange{
  293. Triggers: []p2ptest.Trigger{
  294. {
  295. Code: 0,
  296. Msg: &dummyMsg{Content: "response"},
  297. Peer: tester.Nodes[1].ID(),
  298. },
  299. },
  300. })
  301. <-testHook.waitC
  302. time.Sleep(100 * time.Millisecond)
  303. err = tester.TestDisconnected(&p2ptest.Disconnect{Peer: tester.Nodes[1].ID(), Error: testHook.err})
  304. if err != nil {
  305. t.Fatalf("Expected a specific disconnect error, but got different one: %v", err)
  306. }
  307. }
  308. //We need to test that if the hook is not defined, then message infrastructure
  309. //(send,receive) still works
  310. func TestNoHook(t *testing.T) {
  311. //create a test spec
  312. spec := createTestSpec()
  313. //a random node
  314. id := adapters.RandomNodeConfig().ID
  315. //a peer
  316. p := p2p.NewPeer(id, "testPeer", nil)
  317. rw := &dummyRW{}
  318. peer := NewPeer(p, rw, spec)
  319. ctx := context.TODO()
  320. msg := &perBytesMsgSenderPays{Content: "testBalance"}
  321. //send a message
  322. if err := peer.Send(ctx, msg); err != nil {
  323. t.Fatal(err)
  324. }
  325. //simulate receiving a message
  326. rw.msg = msg
  327. handler := func(ctx context.Context, msg interface{}) error {
  328. return nil
  329. }
  330. if err := peer.handleIncoming(handler); err != nil {
  331. t.Fatal(err)
  332. }
  333. }
  334. func TestProtoHandshakeVersionMismatch(t *testing.T) {
  335. runProtoHandshake(t, &protoHandshake{41, "420"}, errorf(ErrHandshake, errorf(ErrHandler, "(msg code 0): 41 (!= 42)").Error()))
  336. }
  337. func TestProtoHandshakeNetworkIDMismatch(t *testing.T) {
  338. runProtoHandshake(t, &protoHandshake{42, "421"}, errorf(ErrHandshake, errorf(ErrHandler, "(msg code 0): 421 (!= 420)").Error()))
  339. }
  340. func TestProtoHandshakeSuccess(t *testing.T) {
  341. runProtoHandshake(t, &protoHandshake{42, "420"})
  342. }
  343. func moduleHandshakeExchange(id enode.ID, resp uint) []p2ptest.Exchange {
  344. return []p2ptest.Exchange{
  345. {
  346. Expects: []p2ptest.Expect{
  347. {
  348. Code: 1,
  349. Msg: &hs0{42},
  350. Peer: id,
  351. },
  352. },
  353. },
  354. {
  355. Triggers: []p2ptest.Trigger{
  356. {
  357. Code: 1,
  358. Msg: &hs0{resp},
  359. Peer: id,
  360. },
  361. },
  362. },
  363. }
  364. }
  365. func runModuleHandshake(t *testing.T, resp uint, errs ...error) {
  366. t.Helper()
  367. pp := p2ptest.NewTestPeerPool()
  368. s := protocolTester(pp)
  369. defer s.Stop()
  370. node := s.Nodes[0]
  371. if err := s.TestExchanges(protoHandshakeExchange(node.ID(), &protoHandshake{42, "420"})...); err != nil {
  372. t.Fatal(err)
  373. }
  374. if err := s.TestExchanges(moduleHandshakeExchange(node.ID(), resp)...); err != nil {
  375. t.Fatal(err)
  376. }
  377. var disconnects []*p2ptest.Disconnect
  378. for i, err := range errs {
  379. disconnects = append(disconnects, &p2ptest.Disconnect{Peer: s.Nodes[i].ID(), Error: err})
  380. }
  381. if err := s.TestDisconnected(disconnects...); err != nil {
  382. t.Fatal(err)
  383. }
  384. }
  385. func TestModuleHandshakeError(t *testing.T) {
  386. runModuleHandshake(t, 43, fmt.Errorf("handshake mismatch remote 43 > local 42"))
  387. }
  388. func TestModuleHandshakeSuccess(t *testing.T) {
  389. runModuleHandshake(t, 42)
  390. }
  391. // testing complex interactions over multiple peers, relaying, dropping
  392. func testMultiPeerSetup(a, b enode.ID) []p2ptest.Exchange {
  393. return []p2ptest.Exchange{
  394. {
  395. Label: "primary handshake",
  396. Expects: []p2ptest.Expect{
  397. {
  398. Code: 0,
  399. Msg: &protoHandshake{42, "420"},
  400. Peer: a,
  401. },
  402. {
  403. Code: 0,
  404. Msg: &protoHandshake{42, "420"},
  405. Peer: b,
  406. },
  407. },
  408. },
  409. {
  410. Label: "module handshake",
  411. Triggers: []p2ptest.Trigger{
  412. {
  413. Code: 0,
  414. Msg: &protoHandshake{42, "420"},
  415. Peer: a,
  416. },
  417. {
  418. Code: 0,
  419. Msg: &protoHandshake{42, "420"},
  420. Peer: b,
  421. },
  422. },
  423. Expects: []p2ptest.Expect{
  424. {
  425. Code: 1,
  426. Msg: &hs0{42},
  427. Peer: a,
  428. },
  429. {
  430. Code: 1,
  431. Msg: &hs0{42},
  432. Peer: b,
  433. },
  434. },
  435. },
  436. {Label: "alternative module handshake", Triggers: []p2ptest.Trigger{{Code: 1, Msg: &hs0{41}, Peer: a},
  437. {Code: 1, Msg: &hs0{41}, Peer: b}}},
  438. {Label: "repeated module handshake", Triggers: []p2ptest.Trigger{{Code: 1, Msg: &hs0{1}, Peer: a}}},
  439. {Label: "receiving repeated module handshake", Expects: []p2ptest.Expect{{Code: 1, Msg: &hs0{43}, Peer: a}}}}
  440. }
  441. func runMultiplePeers(t *testing.T, peer int, errs ...error) {
  442. t.Helper()
  443. pp := p2ptest.NewTestPeerPool()
  444. s := protocolTester(pp)
  445. defer s.Stop()
  446. if err := s.TestExchanges(testMultiPeerSetup(s.Nodes[0].ID(), s.Nodes[1].ID())...); err != nil {
  447. t.Fatal(err)
  448. }
  449. // after some exchanges of messages, we can test state changes
  450. // here this is simply demonstrated by the peerPool
  451. // after the handshake negotiations peers must be added to the pool
  452. // time.Sleep(1)
  453. tick := time.NewTicker(10 * time.Millisecond)
  454. timeout := time.NewTimer(1 * time.Second)
  455. WAIT:
  456. for {
  457. select {
  458. case <-tick.C:
  459. if pp.Has(s.Nodes[0].ID()) {
  460. break WAIT
  461. }
  462. case <-timeout.C:
  463. t.Fatal("timeout")
  464. }
  465. }
  466. if !pp.Has(s.Nodes[1].ID()) {
  467. t.Fatalf("missing peer test-1: %v (%v)", pp, s.Nodes)
  468. }
  469. // peer 0 sends kill request for peer with index <peer>
  470. err := s.TestExchanges(p2ptest.Exchange{
  471. Triggers: []p2ptest.Trigger{
  472. {
  473. Code: 2,
  474. Msg: &kill{s.Nodes[peer].ID()},
  475. Peer: s.Nodes[0].ID(),
  476. },
  477. },
  478. })
  479. if err != nil {
  480. t.Fatal(err)
  481. }
  482. // the peer not killed sends a drop request
  483. err = s.TestExchanges(p2ptest.Exchange{
  484. Triggers: []p2ptest.Trigger{
  485. {
  486. Code: 3,
  487. Msg: &drop{},
  488. Peer: s.Nodes[(peer+1)%2].ID(),
  489. },
  490. },
  491. })
  492. if err != nil {
  493. t.Fatal(err)
  494. }
  495. // check the actual discconnect errors on the individual peers
  496. var disconnects []*p2ptest.Disconnect
  497. for i, err := range errs {
  498. disconnects = append(disconnects, &p2ptest.Disconnect{Peer: s.Nodes[i].ID(), Error: err})
  499. }
  500. if err := s.TestDisconnected(disconnects...); err != nil {
  501. t.Fatal(err)
  502. }
  503. // test if disconnected peers have been removed from peerPool
  504. if pp.Has(s.Nodes[peer].ID()) {
  505. t.Fatalf("peer test-%v not dropped: %v (%v)", peer, pp, s.Nodes)
  506. }
  507. }
  508. func TestMultiplePeersDropSelf(t *testing.T) {
  509. runMultiplePeers(t, 0,
  510. fmt.Errorf("subprotocol error"),
  511. fmt.Errorf("Message handler error: (msg code 3): dropped"),
  512. )
  513. }
  514. func TestMultiplePeersDropOther(t *testing.T) {
  515. runMultiplePeers(t, 1,
  516. fmt.Errorf("Message handler error: (msg code 3): dropped"),
  517. fmt.Errorf("subprotocol error"),
  518. )
  519. }
  520. //dummy implementation of a MsgReadWriter
  521. //this allows for quick and easy unit tests without
  522. //having to build up the complete protocol
  523. type dummyRW struct {
  524. msg interface{}
  525. size uint32
  526. code uint64
  527. }
  528. func (d *dummyRW) WriteMsg(msg p2p.Msg) error {
  529. return nil
  530. }
  531. func (d *dummyRW) ReadMsg() (p2p.Msg, error) {
  532. enc := bytes.NewReader(d.getDummyMsg())
  533. return p2p.Msg{
  534. Code: d.code,
  535. Size: d.size,
  536. Payload: enc,
  537. ReceivedAt: time.Now(),
  538. }, nil
  539. }
  540. func (d *dummyRW) getDummyMsg() []byte {
  541. r, _ := rlp.EncodeToBytes(d.msg)
  542. var b bytes.Buffer
  543. wmsg := WrappedMsg{
  544. Context: b.Bytes(),
  545. Size: uint32(len(r)),
  546. Payload: r,
  547. }
  548. rr, _ := rlp.EncodeToBytes(wmsg)
  549. d.size = uint32(len(rr))
  550. return rr
  551. }