protocol_test.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590
  1. // Copyright 2017 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package protocols
  17. import (
  18. "bytes"
  19. "context"
  20. "errors"
  21. "fmt"
  22. "testing"
  23. "time"
  24. "github.com/ethereum/go-ethereum/rlp"
  25. "github.com/ethereum/go-ethereum/p2p"
  26. "github.com/ethereum/go-ethereum/p2p/enode"
  27. "github.com/ethereum/go-ethereum/p2p/simulations/adapters"
  28. p2ptest "github.com/ethereum/go-ethereum/p2p/testing"
  29. )
  30. // handshake message type
  31. type hs0 struct {
  32. C uint
  33. }
  34. // message to kill/drop the peer with nodeID
  35. type kill struct {
  36. C enode.ID
  37. }
  38. // message to drop connection
  39. type drop struct {
  40. }
  41. /// protoHandshake represents module-independent aspects of the protocol and is
  42. // the first message peers send and receive as part the initial exchange
  43. type protoHandshake struct {
  44. Version uint // local and remote peer should have identical version
  45. NetworkID string // local and remote peer should have identical network id
  46. }
  47. // checkProtoHandshake verifies local and remote protoHandshakes match
  48. func checkProtoHandshake(testVersion uint, testNetworkID string) func(interface{}) error {
  49. return func(rhs interface{}) error {
  50. remote := rhs.(*protoHandshake)
  51. if remote.NetworkID != testNetworkID {
  52. return fmt.Errorf("%s (!= %s)", remote.NetworkID, testNetworkID)
  53. }
  54. if remote.Version != testVersion {
  55. return fmt.Errorf("%d (!= %d)", remote.Version, testVersion)
  56. }
  57. return nil
  58. }
  59. }
  60. // newProtocol sets up a protocol
  61. // the run function here demonstrates a typical protocol using peerPool, handshake
  62. // and messages registered to handlers
  63. func newProtocol(pp *p2ptest.TestPeerPool) func(*p2p.Peer, p2p.MsgReadWriter) error {
  64. spec := &Spec{
  65. Name: "test",
  66. Version: 42,
  67. MaxMsgSize: 10 * 1024,
  68. Messages: []interface{}{
  69. protoHandshake{},
  70. hs0{},
  71. kill{},
  72. drop{},
  73. },
  74. }
  75. return func(p *p2p.Peer, rw p2p.MsgReadWriter) error {
  76. peer := NewPeer(p, rw, spec)
  77. // initiate one-off protohandshake and check validity
  78. ctx, cancel := context.WithTimeout(context.Background(), time.Second)
  79. defer cancel()
  80. phs := &protoHandshake{42, "420"}
  81. hsCheck := checkProtoHandshake(phs.Version, phs.NetworkID)
  82. _, err := peer.Handshake(ctx, phs, hsCheck)
  83. if err != nil {
  84. return err
  85. }
  86. lhs := &hs0{42}
  87. // module handshake demonstrating a simple repeatable exchange of same-type message
  88. hs, err := peer.Handshake(ctx, lhs, nil)
  89. if err != nil {
  90. return err
  91. }
  92. if rmhs := hs.(*hs0); rmhs.C > lhs.C {
  93. return fmt.Errorf("handshake mismatch remote %v > local %v", rmhs.C, lhs.C)
  94. }
  95. handle := func(ctx context.Context, msg interface{}) error {
  96. switch msg := msg.(type) {
  97. case *protoHandshake:
  98. return errors.New("duplicate handshake")
  99. case *hs0:
  100. rhs := msg
  101. if rhs.C > lhs.C {
  102. return fmt.Errorf("handshake mismatch remote %v > local %v", rhs.C, lhs.C)
  103. }
  104. lhs.C += rhs.C
  105. return peer.Send(ctx, lhs)
  106. case *kill:
  107. // demonstrates use of peerPool, killing another peer connection as a response to a message
  108. id := msg.C
  109. pp.Get(id).Drop(errors.New("killed"))
  110. return nil
  111. case *drop:
  112. // for testing we can trigger self induced disconnect upon receiving drop message
  113. return errors.New("dropped")
  114. default:
  115. return fmt.Errorf("unknown message type: %T", msg)
  116. }
  117. }
  118. pp.Add(peer)
  119. defer pp.Remove(peer)
  120. return peer.Run(handle)
  121. }
  122. }
  123. func protocolTester(t *testing.T, pp *p2ptest.TestPeerPool) *p2ptest.ProtocolTester {
  124. conf := adapters.RandomNodeConfig()
  125. return p2ptest.NewProtocolTester(t, conf.ID, 2, newProtocol(pp))
  126. }
  127. func protoHandshakeExchange(id enode.ID, proto *protoHandshake) []p2ptest.Exchange {
  128. return []p2ptest.Exchange{
  129. {
  130. Expects: []p2ptest.Expect{
  131. {
  132. Code: 0,
  133. Msg: &protoHandshake{42, "420"},
  134. Peer: id,
  135. },
  136. },
  137. },
  138. {
  139. Triggers: []p2ptest.Trigger{
  140. {
  141. Code: 0,
  142. Msg: proto,
  143. Peer: id,
  144. },
  145. },
  146. },
  147. }
  148. }
  149. func runProtoHandshake(t *testing.T, proto *protoHandshake, errs ...error) {
  150. pp := p2ptest.NewTestPeerPool()
  151. s := protocolTester(t, pp)
  152. // TODO: make this more than one handshake
  153. node := s.Nodes[0]
  154. if err := s.TestExchanges(protoHandshakeExchange(node.ID(), proto)...); err != nil {
  155. t.Fatal(err)
  156. }
  157. var disconnects []*p2ptest.Disconnect
  158. for i, err := range errs {
  159. disconnects = append(disconnects, &p2ptest.Disconnect{Peer: s.Nodes[i].ID(), Error: err})
  160. }
  161. if err := s.TestDisconnected(disconnects...); err != nil {
  162. t.Fatal(err)
  163. }
  164. }
  165. type dummyHook struct {
  166. peer *Peer
  167. size uint32
  168. msg interface{}
  169. send bool
  170. err error
  171. waitC chan struct{}
  172. }
  173. type dummyMsg struct {
  174. Content string
  175. }
  176. func (d *dummyHook) Send(peer *Peer, size uint32, msg interface{}) error {
  177. d.peer = peer
  178. d.size = size
  179. d.msg = msg
  180. d.send = true
  181. return d.err
  182. }
  183. func (d *dummyHook) Receive(peer *Peer, size uint32, msg interface{}) error {
  184. d.peer = peer
  185. d.size = size
  186. d.msg = msg
  187. d.send = false
  188. d.waitC <- struct{}{}
  189. return d.err
  190. }
  191. func TestProtocolHook(t *testing.T) {
  192. testHook := &dummyHook{
  193. waitC: make(chan struct{}, 1),
  194. }
  195. spec := &Spec{
  196. Name: "test",
  197. Version: 42,
  198. MaxMsgSize: 10 * 1024,
  199. Messages: []interface{}{
  200. dummyMsg{},
  201. },
  202. Hook: testHook,
  203. }
  204. runFunc := func(p *p2p.Peer, rw p2p.MsgReadWriter) error {
  205. peer := NewPeer(p, rw, spec)
  206. ctx := context.TODO()
  207. err := peer.Send(ctx, &dummyMsg{
  208. Content: "handshake"})
  209. if err != nil {
  210. t.Fatal(err)
  211. }
  212. handle := func(ctx context.Context, msg interface{}) error {
  213. return nil
  214. }
  215. return peer.Run(handle)
  216. }
  217. conf := adapters.RandomNodeConfig()
  218. tester := p2ptest.NewProtocolTester(t, conf.ID, 2, runFunc)
  219. err := tester.TestExchanges(p2ptest.Exchange{
  220. Expects: []p2ptest.Expect{
  221. {
  222. Code: 0,
  223. Msg: &dummyMsg{Content: "handshake"},
  224. Peer: tester.Nodes[0].ID(),
  225. },
  226. },
  227. })
  228. if err != nil {
  229. t.Fatal(err)
  230. }
  231. if testHook.msg == nil || testHook.msg.(*dummyMsg).Content != "handshake" {
  232. t.Fatal("Expected msg to be set, but it is not")
  233. }
  234. if !testHook.send {
  235. t.Fatal("Expected a send message, but it is not")
  236. }
  237. if testHook.peer == nil || testHook.peer.ID() != tester.Nodes[0].ID() {
  238. t.Fatal("Expected peer ID to be set correctly, but it is not")
  239. }
  240. if testHook.size != 11 { //11 is the length of the encoded message
  241. t.Fatalf("Expected size to be %d, but it is %d ", 1, testHook.size)
  242. }
  243. err = tester.TestExchanges(p2ptest.Exchange{
  244. Triggers: []p2ptest.Trigger{
  245. {
  246. Code: 0,
  247. Msg: &dummyMsg{Content: "response"},
  248. Peer: tester.Nodes[1].ID(),
  249. },
  250. },
  251. })
  252. <-testHook.waitC
  253. if err != nil {
  254. t.Fatal(err)
  255. }
  256. if testHook.msg == nil || testHook.msg.(*dummyMsg).Content != "response" {
  257. t.Fatal("Expected msg to be set, but it is not")
  258. }
  259. if testHook.send {
  260. t.Fatal("Expected a send message, but it is not")
  261. }
  262. if testHook.peer == nil || testHook.peer.ID() != tester.Nodes[1].ID() {
  263. t.Fatal("Expected peer ID to be set correctly, but it is not")
  264. }
  265. if testHook.size != 10 { //11 is the length of the encoded message
  266. t.Fatalf("Expected size to be %d, but it is %d ", 1, testHook.size)
  267. }
  268. testHook.err = fmt.Errorf("dummy error")
  269. err = tester.TestExchanges(p2ptest.Exchange{
  270. Triggers: []p2ptest.Trigger{
  271. {
  272. Code: 0,
  273. Msg: &dummyMsg{Content: "response"},
  274. Peer: tester.Nodes[1].ID(),
  275. },
  276. },
  277. })
  278. <-testHook.waitC
  279. time.Sleep(100 * time.Millisecond)
  280. err = tester.TestDisconnected(&p2ptest.Disconnect{tester.Nodes[1].ID(), testHook.err})
  281. if err != nil {
  282. t.Fatalf("Expected a specific disconnect error, but got different one: %v", err)
  283. }
  284. }
  285. //We need to test that if the hook is not defined, then message infrastructure
  286. //(send,receive) still works
  287. func TestNoHook(t *testing.T) {
  288. //create a test spec
  289. spec := createTestSpec()
  290. //a random node
  291. id := adapters.RandomNodeConfig().ID
  292. //a peer
  293. p := p2p.NewPeer(id, "testPeer", nil)
  294. rw := &dummyRW{}
  295. peer := NewPeer(p, rw, spec)
  296. ctx := context.TODO()
  297. msg := &perBytesMsgSenderPays{Content: "testBalance"}
  298. //send a message
  299. err := peer.Send(ctx, msg)
  300. if err != nil {
  301. t.Fatal(err)
  302. }
  303. //simulate receiving a message
  304. rw.msg = msg
  305. peer.handleIncoming(func(ctx context.Context, msg interface{}) error {
  306. return nil
  307. })
  308. //all should just work and not result in any error
  309. }
  310. func TestProtoHandshakeVersionMismatch(t *testing.T) {
  311. runProtoHandshake(t, &protoHandshake{41, "420"}, errorf(ErrHandshake, errorf(ErrHandler, "(msg code 0): 41 (!= 42)").Error()))
  312. }
  313. func TestProtoHandshakeNetworkIDMismatch(t *testing.T) {
  314. runProtoHandshake(t, &protoHandshake{42, "421"}, errorf(ErrHandshake, errorf(ErrHandler, "(msg code 0): 421 (!= 420)").Error()))
  315. }
  316. func TestProtoHandshakeSuccess(t *testing.T) {
  317. runProtoHandshake(t, &protoHandshake{42, "420"})
  318. }
  319. func moduleHandshakeExchange(id enode.ID, resp uint) []p2ptest.Exchange {
  320. return []p2ptest.Exchange{
  321. {
  322. Expects: []p2ptest.Expect{
  323. {
  324. Code: 1,
  325. Msg: &hs0{42},
  326. Peer: id,
  327. },
  328. },
  329. },
  330. {
  331. Triggers: []p2ptest.Trigger{
  332. {
  333. Code: 1,
  334. Msg: &hs0{resp},
  335. Peer: id,
  336. },
  337. },
  338. },
  339. }
  340. }
  341. func runModuleHandshake(t *testing.T, resp uint, errs ...error) {
  342. pp := p2ptest.NewTestPeerPool()
  343. s := protocolTester(t, pp)
  344. node := s.Nodes[0]
  345. if err := s.TestExchanges(protoHandshakeExchange(node.ID(), &protoHandshake{42, "420"})...); err != nil {
  346. t.Fatal(err)
  347. }
  348. if err := s.TestExchanges(moduleHandshakeExchange(node.ID(), resp)...); err != nil {
  349. t.Fatal(err)
  350. }
  351. var disconnects []*p2ptest.Disconnect
  352. for i, err := range errs {
  353. disconnects = append(disconnects, &p2ptest.Disconnect{Peer: s.Nodes[i].ID(), Error: err})
  354. }
  355. if err := s.TestDisconnected(disconnects...); err != nil {
  356. t.Fatal(err)
  357. }
  358. }
  359. func TestModuleHandshakeError(t *testing.T) {
  360. runModuleHandshake(t, 43, fmt.Errorf("handshake mismatch remote 43 > local 42"))
  361. }
  362. func TestModuleHandshakeSuccess(t *testing.T) {
  363. runModuleHandshake(t, 42)
  364. }
  365. // testing complex interactions over multiple peers, relaying, dropping
  366. func testMultiPeerSetup(a, b enode.ID) []p2ptest.Exchange {
  367. return []p2ptest.Exchange{
  368. {
  369. Label: "primary handshake",
  370. Expects: []p2ptest.Expect{
  371. {
  372. Code: 0,
  373. Msg: &protoHandshake{42, "420"},
  374. Peer: a,
  375. },
  376. {
  377. Code: 0,
  378. Msg: &protoHandshake{42, "420"},
  379. Peer: b,
  380. },
  381. },
  382. },
  383. {
  384. Label: "module handshake",
  385. Triggers: []p2ptest.Trigger{
  386. {
  387. Code: 0,
  388. Msg: &protoHandshake{42, "420"},
  389. Peer: a,
  390. },
  391. {
  392. Code: 0,
  393. Msg: &protoHandshake{42, "420"},
  394. Peer: b,
  395. },
  396. },
  397. Expects: []p2ptest.Expect{
  398. {
  399. Code: 1,
  400. Msg: &hs0{42},
  401. Peer: a,
  402. },
  403. {
  404. Code: 1,
  405. Msg: &hs0{42},
  406. Peer: b,
  407. },
  408. },
  409. },
  410. {Label: "alternative module handshake", Triggers: []p2ptest.Trigger{{Code: 1, Msg: &hs0{41}, Peer: a},
  411. {Code: 1, Msg: &hs0{41}, Peer: b}}},
  412. {Label: "repeated module handshake", Triggers: []p2ptest.Trigger{{Code: 1, Msg: &hs0{1}, Peer: a}}},
  413. {Label: "receiving repeated module handshake", Expects: []p2ptest.Expect{{Code: 1, Msg: &hs0{43}, Peer: a}}}}
  414. }
  415. func runMultiplePeers(t *testing.T, peer int, errs ...error) {
  416. pp := p2ptest.NewTestPeerPool()
  417. s := protocolTester(t, pp)
  418. if err := s.TestExchanges(testMultiPeerSetup(s.Nodes[0].ID(), s.Nodes[1].ID())...); err != nil {
  419. t.Fatal(err)
  420. }
  421. // after some exchanges of messages, we can test state changes
  422. // here this is simply demonstrated by the peerPool
  423. // after the handshake negotiations peers must be added to the pool
  424. // time.Sleep(1)
  425. tick := time.NewTicker(10 * time.Millisecond)
  426. timeout := time.NewTimer(1 * time.Second)
  427. WAIT:
  428. for {
  429. select {
  430. case <-tick.C:
  431. if pp.Has(s.Nodes[0].ID()) {
  432. break WAIT
  433. }
  434. case <-timeout.C:
  435. t.Fatal("timeout")
  436. }
  437. }
  438. if !pp.Has(s.Nodes[1].ID()) {
  439. t.Fatalf("missing peer test-1: %v (%v)", pp, s.Nodes)
  440. }
  441. // peer 0 sends kill request for peer with index <peer>
  442. err := s.TestExchanges(p2ptest.Exchange{
  443. Triggers: []p2ptest.Trigger{
  444. {
  445. Code: 2,
  446. Msg: &kill{s.Nodes[peer].ID()},
  447. Peer: s.Nodes[0].ID(),
  448. },
  449. },
  450. })
  451. if err != nil {
  452. t.Fatal(err)
  453. }
  454. // the peer not killed sends a drop request
  455. err = s.TestExchanges(p2ptest.Exchange{
  456. Triggers: []p2ptest.Trigger{
  457. {
  458. Code: 3,
  459. Msg: &drop{},
  460. Peer: s.Nodes[(peer+1)%2].ID(),
  461. },
  462. },
  463. })
  464. if err != nil {
  465. t.Fatal(err)
  466. }
  467. // check the actual discconnect errors on the individual peers
  468. var disconnects []*p2ptest.Disconnect
  469. for i, err := range errs {
  470. disconnects = append(disconnects, &p2ptest.Disconnect{Peer: s.Nodes[i].ID(), Error: err})
  471. }
  472. if err := s.TestDisconnected(disconnects...); err != nil {
  473. t.Fatal(err)
  474. }
  475. // test if disconnected peers have been removed from peerPool
  476. if pp.Has(s.Nodes[peer].ID()) {
  477. t.Fatalf("peer test-%v not dropped: %v (%v)", peer, pp, s.Nodes)
  478. }
  479. }
  480. func XTestMultiplePeersDropSelf(t *testing.T) {
  481. runMultiplePeers(t, 0,
  482. fmt.Errorf("subprotocol error"),
  483. fmt.Errorf("Message handler error: (msg code 3): dropped"),
  484. )
  485. }
  486. func XTestMultiplePeersDropOther(t *testing.T) {
  487. runMultiplePeers(t, 1,
  488. fmt.Errorf("Message handler error: (msg code 3): dropped"),
  489. fmt.Errorf("subprotocol error"),
  490. )
  491. }
  492. //dummy implementation of a MsgReadWriter
  493. //this allows for quick and easy unit tests without
  494. //having to build up the complete protocol
  495. type dummyRW struct {
  496. msg interface{}
  497. size uint32
  498. code uint64
  499. }
  500. func (d *dummyRW) WriteMsg(msg p2p.Msg) error {
  501. return nil
  502. }
  503. func (d *dummyRW) ReadMsg() (p2p.Msg, error) {
  504. enc := bytes.NewReader(d.getDummyMsg())
  505. return p2p.Msg{
  506. Code: d.code,
  507. Size: d.size,
  508. Payload: enc,
  509. ReceivedAt: time.Now(),
  510. }, nil
  511. }
  512. func (d *dummyRW) getDummyMsg() []byte {
  513. r, _ := rlp.EncodeToBytes(d.msg)
  514. var b bytes.Buffer
  515. wmsg := WrappedMsg{
  516. Context: b.Bytes(),
  517. Size: uint32(len(r)),
  518. Payload: r,
  519. }
  520. rr, _ := rlp.EncodeToBytes(wmsg)
  521. d.size = uint32(len(rr))
  522. return rr
  523. }