protocol_test.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593
  1. // Copyright 2017 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package protocols
  17. import (
  18. "bytes"
  19. "context"
  20. "errors"
  21. "fmt"
  22. "testing"
  23. "time"
  24. "github.com/ethereum/go-ethereum/rlp"
  25. "github.com/ethereum/go-ethereum/p2p"
  26. "github.com/ethereum/go-ethereum/p2p/enode"
  27. "github.com/ethereum/go-ethereum/p2p/simulations/adapters"
  28. p2ptest "github.com/ethereum/go-ethereum/p2p/testing"
  29. )
  30. // handshake message type
  31. type hs0 struct {
  32. C uint
  33. }
  34. // message to kill/drop the peer with nodeID
  35. type kill struct {
  36. C enode.ID
  37. }
  38. // message to drop connection
  39. type drop struct {
  40. }
  41. /// protoHandshake represents module-independent aspects of the protocol and is
  42. // the first message peers send and receive as part the initial exchange
  43. type protoHandshake struct {
  44. Version uint // local and remote peer should have identical version
  45. NetworkID string // local and remote peer should have identical network id
  46. }
  47. // checkProtoHandshake verifies local and remote protoHandshakes match
  48. func checkProtoHandshake(testVersion uint, testNetworkID string) func(interface{}) error {
  49. return func(rhs interface{}) error {
  50. remote := rhs.(*protoHandshake)
  51. if remote.NetworkID != testNetworkID {
  52. return fmt.Errorf("%s (!= %s)", remote.NetworkID, testNetworkID)
  53. }
  54. if remote.Version != testVersion {
  55. return fmt.Errorf("%d (!= %d)", remote.Version, testVersion)
  56. }
  57. return nil
  58. }
  59. }
  60. // newProtocol sets up a protocol
  61. // the run function here demonstrates a typical protocol using peerPool, handshake
  62. // and messages registered to handlers
  63. func newProtocol(pp *p2ptest.TestPeerPool) func(*p2p.Peer, p2p.MsgReadWriter) error {
  64. spec := &Spec{
  65. Name: "test",
  66. Version: 42,
  67. MaxMsgSize: 10 * 1024,
  68. Messages: []interface{}{
  69. protoHandshake{},
  70. hs0{},
  71. kill{},
  72. drop{},
  73. },
  74. }
  75. return func(p *p2p.Peer, rw p2p.MsgReadWriter) error {
  76. peer := NewPeer(p, rw, spec)
  77. // initiate one-off protohandshake and check validity
  78. ctx, cancel := context.WithTimeout(context.Background(), time.Second)
  79. defer cancel()
  80. phs := &protoHandshake{42, "420"}
  81. hsCheck := checkProtoHandshake(phs.Version, phs.NetworkID)
  82. _, err := peer.Handshake(ctx, phs, hsCheck)
  83. if err != nil {
  84. return err
  85. }
  86. lhs := &hs0{42}
  87. // module handshake demonstrating a simple repeatable exchange of same-type message
  88. hs, err := peer.Handshake(ctx, lhs, nil)
  89. if err != nil {
  90. return err
  91. }
  92. if rmhs := hs.(*hs0); rmhs.C > lhs.C {
  93. return fmt.Errorf("handshake mismatch remote %v > local %v", rmhs.C, lhs.C)
  94. }
  95. handle := func(ctx context.Context, msg interface{}) error {
  96. switch msg := msg.(type) {
  97. case *protoHandshake:
  98. return errors.New("duplicate handshake")
  99. case *hs0:
  100. rhs := msg
  101. if rhs.C > lhs.C {
  102. return fmt.Errorf("handshake mismatch remote %v > local %v", rhs.C, lhs.C)
  103. }
  104. lhs.C += rhs.C
  105. return peer.Send(ctx, lhs)
  106. case *kill:
  107. // demonstrates use of peerPool, killing another peer connection as a response to a message
  108. id := msg.C
  109. pp.Get(id).Drop(errors.New("killed"))
  110. return nil
  111. case *drop:
  112. // for testing we can trigger self induced disconnect upon receiving drop message
  113. return errors.New("dropped")
  114. default:
  115. return fmt.Errorf("unknown message type: %T", msg)
  116. }
  117. }
  118. pp.Add(peer)
  119. defer pp.Remove(peer)
  120. return peer.Run(handle)
  121. }
  122. }
  123. func protocolTester(t *testing.T, pp *p2ptest.TestPeerPool) *p2ptest.ProtocolTester {
  124. conf := adapters.RandomNodeConfig()
  125. return p2ptest.NewProtocolTester(t, conf.ID, 2, newProtocol(pp))
  126. }
  127. func protoHandshakeExchange(id enode.ID, proto *protoHandshake) []p2ptest.Exchange {
  128. return []p2ptest.Exchange{
  129. {
  130. Expects: []p2ptest.Expect{
  131. {
  132. Code: 0,
  133. Msg: &protoHandshake{42, "420"},
  134. Peer: id,
  135. },
  136. },
  137. },
  138. {
  139. Triggers: []p2ptest.Trigger{
  140. {
  141. Code: 0,
  142. Msg: proto,
  143. Peer: id,
  144. },
  145. },
  146. },
  147. }
  148. }
  149. func runProtoHandshake(t *testing.T, proto *protoHandshake, errs ...error) {
  150. pp := p2ptest.NewTestPeerPool()
  151. s := protocolTester(t, pp)
  152. // TODO: make this more than one handshake
  153. node := s.Nodes[0]
  154. if err := s.TestExchanges(protoHandshakeExchange(node.ID(), proto)...); err != nil {
  155. t.Fatal(err)
  156. }
  157. var disconnects []*p2ptest.Disconnect
  158. for i, err := range errs {
  159. disconnects = append(disconnects, &p2ptest.Disconnect{Peer: s.Nodes[i].ID(), Error: err})
  160. }
  161. if err := s.TestDisconnected(disconnects...); err != nil {
  162. t.Fatal(err)
  163. }
  164. }
  165. type dummyHook struct {
  166. peer *Peer
  167. size uint32
  168. msg interface{}
  169. send bool
  170. err error
  171. waitC chan struct{}
  172. }
  173. type dummyMsg struct {
  174. Content string
  175. }
  176. func (d *dummyHook) Send(peer *Peer, size uint32, msg interface{}) error {
  177. d.peer = peer
  178. d.size = size
  179. d.msg = msg
  180. d.send = true
  181. return d.err
  182. }
  183. func (d *dummyHook) Receive(peer *Peer, size uint32, msg interface{}) error {
  184. d.peer = peer
  185. d.size = size
  186. d.msg = msg
  187. d.send = false
  188. d.waitC <- struct{}{}
  189. return d.err
  190. }
  191. func TestProtocolHook(t *testing.T) {
  192. testHook := &dummyHook{
  193. waitC: make(chan struct{}, 1),
  194. }
  195. spec := &Spec{
  196. Name: "test",
  197. Version: 42,
  198. MaxMsgSize: 10 * 1024,
  199. Messages: []interface{}{
  200. dummyMsg{},
  201. },
  202. Hook: testHook,
  203. }
  204. runFunc := func(p *p2p.Peer, rw p2p.MsgReadWriter) error {
  205. peer := NewPeer(p, rw, spec)
  206. ctx := context.TODO()
  207. err := peer.Send(ctx, &dummyMsg{
  208. Content: "handshake"})
  209. if err != nil {
  210. t.Fatal(err)
  211. }
  212. handle := func(ctx context.Context, msg interface{}) error {
  213. return nil
  214. }
  215. return peer.Run(handle)
  216. }
  217. conf := adapters.RandomNodeConfig()
  218. tester := p2ptest.NewProtocolTester(t, conf.ID, 2, runFunc)
  219. err := tester.TestExchanges(p2ptest.Exchange{
  220. Expects: []p2ptest.Expect{
  221. {
  222. Code: 0,
  223. Msg: &dummyMsg{Content: "handshake"},
  224. Peer: tester.Nodes[0].ID(),
  225. },
  226. },
  227. })
  228. if err != nil {
  229. t.Fatal(err)
  230. }
  231. if testHook.msg == nil || testHook.msg.(*dummyMsg).Content != "handshake" {
  232. t.Fatal("Expected msg to be set, but it is not")
  233. }
  234. if !testHook.send {
  235. t.Fatal("Expected a send message, but it is not")
  236. }
  237. if testHook.peer == nil {
  238. t.Fatal("Expected peer to be set, is nil")
  239. }
  240. if peerId := testHook.peer.ID(); peerId != tester.Nodes[0].ID() && peerId != tester.Nodes[1].ID() {
  241. t.Fatalf("Expected peer ID to be set correctly, but it is not (got %v, exp %v or %v", peerId, tester.Nodes[0].ID(), tester.Nodes[1].ID())
  242. }
  243. if testHook.size != 11 { //11 is the length of the encoded message
  244. t.Fatalf("Expected size to be %d, but it is %d ", 1, testHook.size)
  245. }
  246. err = tester.TestExchanges(p2ptest.Exchange{
  247. Triggers: []p2ptest.Trigger{
  248. {
  249. Code: 0,
  250. Msg: &dummyMsg{Content: "response"},
  251. Peer: tester.Nodes[1].ID(),
  252. },
  253. },
  254. })
  255. <-testHook.waitC
  256. if err != nil {
  257. t.Fatal(err)
  258. }
  259. if testHook.msg == nil || testHook.msg.(*dummyMsg).Content != "response" {
  260. t.Fatal("Expected msg to be set, but it is not")
  261. }
  262. if testHook.send {
  263. t.Fatal("Expected a send message, but it is not")
  264. }
  265. if testHook.peer == nil || testHook.peer.ID() != tester.Nodes[1].ID() {
  266. t.Fatal("Expected peer ID to be set correctly, but it is not")
  267. }
  268. if testHook.size != 10 { //11 is the length of the encoded message
  269. t.Fatalf("Expected size to be %d, but it is %d ", 1, testHook.size)
  270. }
  271. testHook.err = fmt.Errorf("dummy error")
  272. err = tester.TestExchanges(p2ptest.Exchange{
  273. Triggers: []p2ptest.Trigger{
  274. {
  275. Code: 0,
  276. Msg: &dummyMsg{Content: "response"},
  277. Peer: tester.Nodes[1].ID(),
  278. },
  279. },
  280. })
  281. <-testHook.waitC
  282. time.Sleep(100 * time.Millisecond)
  283. err = tester.TestDisconnected(&p2ptest.Disconnect{Peer: tester.Nodes[1].ID(), Error: testHook.err})
  284. if err != nil {
  285. t.Fatalf("Expected a specific disconnect error, but got different one: %v", err)
  286. }
  287. }
  288. //We need to test that if the hook is not defined, then message infrastructure
  289. //(send,receive) still works
  290. func TestNoHook(t *testing.T) {
  291. //create a test spec
  292. spec := createTestSpec()
  293. //a random node
  294. id := adapters.RandomNodeConfig().ID
  295. //a peer
  296. p := p2p.NewPeer(id, "testPeer", nil)
  297. rw := &dummyRW{}
  298. peer := NewPeer(p, rw, spec)
  299. ctx := context.TODO()
  300. msg := &perBytesMsgSenderPays{Content: "testBalance"}
  301. //send a message
  302. err := peer.Send(ctx, msg)
  303. if err != nil {
  304. t.Fatal(err)
  305. }
  306. //simulate receiving a message
  307. rw.msg = msg
  308. peer.handleIncoming(func(ctx context.Context, msg interface{}) error {
  309. return nil
  310. })
  311. //all should just work and not result in any error
  312. }
  313. func TestProtoHandshakeVersionMismatch(t *testing.T) {
  314. runProtoHandshake(t, &protoHandshake{41, "420"}, errorf(ErrHandshake, errorf(ErrHandler, "(msg code 0): 41 (!= 42)").Error()))
  315. }
  316. func TestProtoHandshakeNetworkIDMismatch(t *testing.T) {
  317. runProtoHandshake(t, &protoHandshake{42, "421"}, errorf(ErrHandshake, errorf(ErrHandler, "(msg code 0): 421 (!= 420)").Error()))
  318. }
  319. func TestProtoHandshakeSuccess(t *testing.T) {
  320. runProtoHandshake(t, &protoHandshake{42, "420"})
  321. }
  322. func moduleHandshakeExchange(id enode.ID, resp uint) []p2ptest.Exchange {
  323. return []p2ptest.Exchange{
  324. {
  325. Expects: []p2ptest.Expect{
  326. {
  327. Code: 1,
  328. Msg: &hs0{42},
  329. Peer: id,
  330. },
  331. },
  332. },
  333. {
  334. Triggers: []p2ptest.Trigger{
  335. {
  336. Code: 1,
  337. Msg: &hs0{resp},
  338. Peer: id,
  339. },
  340. },
  341. },
  342. }
  343. }
  344. func runModuleHandshake(t *testing.T, resp uint, errs ...error) {
  345. pp := p2ptest.NewTestPeerPool()
  346. s := protocolTester(t, pp)
  347. node := s.Nodes[0]
  348. if err := s.TestExchanges(protoHandshakeExchange(node.ID(), &protoHandshake{42, "420"})...); err != nil {
  349. t.Fatal(err)
  350. }
  351. if err := s.TestExchanges(moduleHandshakeExchange(node.ID(), resp)...); err != nil {
  352. t.Fatal(err)
  353. }
  354. var disconnects []*p2ptest.Disconnect
  355. for i, err := range errs {
  356. disconnects = append(disconnects, &p2ptest.Disconnect{Peer: s.Nodes[i].ID(), Error: err})
  357. }
  358. if err := s.TestDisconnected(disconnects...); err != nil {
  359. t.Fatal(err)
  360. }
  361. }
  362. func TestModuleHandshakeError(t *testing.T) {
  363. runModuleHandshake(t, 43, fmt.Errorf("handshake mismatch remote 43 > local 42"))
  364. }
  365. func TestModuleHandshakeSuccess(t *testing.T) {
  366. runModuleHandshake(t, 42)
  367. }
  368. // testing complex interactions over multiple peers, relaying, dropping
  369. func testMultiPeerSetup(a, b enode.ID) []p2ptest.Exchange {
  370. return []p2ptest.Exchange{
  371. {
  372. Label: "primary handshake",
  373. Expects: []p2ptest.Expect{
  374. {
  375. Code: 0,
  376. Msg: &protoHandshake{42, "420"},
  377. Peer: a,
  378. },
  379. {
  380. Code: 0,
  381. Msg: &protoHandshake{42, "420"},
  382. Peer: b,
  383. },
  384. },
  385. },
  386. {
  387. Label: "module handshake",
  388. Triggers: []p2ptest.Trigger{
  389. {
  390. Code: 0,
  391. Msg: &protoHandshake{42, "420"},
  392. Peer: a,
  393. },
  394. {
  395. Code: 0,
  396. Msg: &protoHandshake{42, "420"},
  397. Peer: b,
  398. },
  399. },
  400. Expects: []p2ptest.Expect{
  401. {
  402. Code: 1,
  403. Msg: &hs0{42},
  404. Peer: a,
  405. },
  406. {
  407. Code: 1,
  408. Msg: &hs0{42},
  409. Peer: b,
  410. },
  411. },
  412. },
  413. {Label: "alternative module handshake", Triggers: []p2ptest.Trigger{{Code: 1, Msg: &hs0{41}, Peer: a},
  414. {Code: 1, Msg: &hs0{41}, Peer: b}}},
  415. {Label: "repeated module handshake", Triggers: []p2ptest.Trigger{{Code: 1, Msg: &hs0{1}, Peer: a}}},
  416. {Label: "receiving repeated module handshake", Expects: []p2ptest.Expect{{Code: 1, Msg: &hs0{43}, Peer: a}}}}
  417. }
  418. func runMultiplePeers(t *testing.T, peer int, errs ...error) {
  419. pp := p2ptest.NewTestPeerPool()
  420. s := protocolTester(t, pp)
  421. if err := s.TestExchanges(testMultiPeerSetup(s.Nodes[0].ID(), s.Nodes[1].ID())...); err != nil {
  422. t.Fatal(err)
  423. }
  424. // after some exchanges of messages, we can test state changes
  425. // here this is simply demonstrated by the peerPool
  426. // after the handshake negotiations peers must be added to the pool
  427. // time.Sleep(1)
  428. tick := time.NewTicker(10 * time.Millisecond)
  429. timeout := time.NewTimer(1 * time.Second)
  430. WAIT:
  431. for {
  432. select {
  433. case <-tick.C:
  434. if pp.Has(s.Nodes[0].ID()) {
  435. break WAIT
  436. }
  437. case <-timeout.C:
  438. t.Fatal("timeout")
  439. }
  440. }
  441. if !pp.Has(s.Nodes[1].ID()) {
  442. t.Fatalf("missing peer test-1: %v (%v)", pp, s.Nodes)
  443. }
  444. // peer 0 sends kill request for peer with index <peer>
  445. err := s.TestExchanges(p2ptest.Exchange{
  446. Triggers: []p2ptest.Trigger{
  447. {
  448. Code: 2,
  449. Msg: &kill{s.Nodes[peer].ID()},
  450. Peer: s.Nodes[0].ID(),
  451. },
  452. },
  453. })
  454. if err != nil {
  455. t.Fatal(err)
  456. }
  457. // the peer not killed sends a drop request
  458. err = s.TestExchanges(p2ptest.Exchange{
  459. Triggers: []p2ptest.Trigger{
  460. {
  461. Code: 3,
  462. Msg: &drop{},
  463. Peer: s.Nodes[(peer+1)%2].ID(),
  464. },
  465. },
  466. })
  467. if err != nil {
  468. t.Fatal(err)
  469. }
  470. // check the actual discconnect errors on the individual peers
  471. var disconnects []*p2ptest.Disconnect
  472. for i, err := range errs {
  473. disconnects = append(disconnects, &p2ptest.Disconnect{Peer: s.Nodes[i].ID(), Error: err})
  474. }
  475. if err := s.TestDisconnected(disconnects...); err != nil {
  476. t.Fatal(err)
  477. }
  478. // test if disconnected peers have been removed from peerPool
  479. if pp.Has(s.Nodes[peer].ID()) {
  480. t.Fatalf("peer test-%v not dropped: %v (%v)", peer, pp, s.Nodes)
  481. }
  482. }
  483. func XTestMultiplePeersDropSelf(t *testing.T) {
  484. runMultiplePeers(t, 0,
  485. fmt.Errorf("subprotocol error"),
  486. fmt.Errorf("Message handler error: (msg code 3): dropped"),
  487. )
  488. }
  489. func XTestMultiplePeersDropOther(t *testing.T) {
  490. runMultiplePeers(t, 1,
  491. fmt.Errorf("Message handler error: (msg code 3): dropped"),
  492. fmt.Errorf("subprotocol error"),
  493. )
  494. }
  495. //dummy implementation of a MsgReadWriter
  496. //this allows for quick and easy unit tests without
  497. //having to build up the complete protocol
  498. type dummyRW struct {
  499. msg interface{}
  500. size uint32
  501. code uint64
  502. }
  503. func (d *dummyRW) WriteMsg(msg p2p.Msg) error {
  504. return nil
  505. }
  506. func (d *dummyRW) ReadMsg() (p2p.Msg, error) {
  507. enc := bytes.NewReader(d.getDummyMsg())
  508. return p2p.Msg{
  509. Code: d.code,
  510. Size: d.size,
  511. Payload: enc,
  512. ReceivedAt: time.Now(),
  513. }, nil
  514. }
  515. func (d *dummyRW) getDummyMsg() []byte {
  516. r, _ := rlp.EncodeToBytes(d.msg)
  517. var b bytes.Buffer
  518. wmsg := WrappedMsg{
  519. Context: b.Bytes(),
  520. Size: uint32(len(r)),
  521. Payload: r,
  522. }
  523. rr, _ := rlp.EncodeToBytes(wmsg)
  524. d.size = uint32(len(rr))
  525. return rr
  526. }