server_test.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618
  1. // Copyright 2014 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package p2p
  17. import (
  18. "crypto/ecdsa"
  19. "errors"
  20. "math/rand"
  21. "net"
  22. "reflect"
  23. "testing"
  24. "time"
  25. "github.com/ethereum/go-ethereum/crypto"
  26. "github.com/ethereum/go-ethereum/log"
  27. "github.com/ethereum/go-ethereum/p2p/enode"
  28. "github.com/ethereum/go-ethereum/p2p/enr"
  29. "golang.org/x/crypto/sha3"
  30. )
  31. // func init() {
  32. // log.Root().SetHandler(log.LvlFilterHandler(log.LvlTrace, log.StreamHandler(os.Stderr, log.TerminalFormat(false))))
  33. // }
  34. type testTransport struct {
  35. rpub *ecdsa.PublicKey
  36. *rlpx
  37. closeErr error
  38. }
  39. func newTestTransport(rpub *ecdsa.PublicKey, fd net.Conn) transport {
  40. wrapped := newRLPX(fd).(*rlpx)
  41. wrapped.rw = newRLPXFrameRW(fd, secrets{
  42. MAC: zero16,
  43. AES: zero16,
  44. IngressMAC: sha3.NewLegacyKeccak256(),
  45. EgressMAC: sha3.NewLegacyKeccak256(),
  46. })
  47. return &testTransport{rpub: rpub, rlpx: wrapped}
  48. }
  49. func (c *testTransport) doEncHandshake(prv *ecdsa.PrivateKey, dialDest *ecdsa.PublicKey) (*ecdsa.PublicKey, error) {
  50. return c.rpub, nil
  51. }
  52. func (c *testTransport) doProtoHandshake(our *protoHandshake) (*protoHandshake, error) {
  53. pubkey := crypto.FromECDSAPub(c.rpub)[1:]
  54. return &protoHandshake{ID: pubkey, Name: "test"}, nil
  55. }
  56. func (c *testTransport) close(err error) {
  57. c.rlpx.fd.Close()
  58. c.closeErr = err
  59. }
  60. func startTestServer(t *testing.T, remoteKey *ecdsa.PublicKey, pf func(*Peer)) *Server {
  61. config := Config{
  62. Name: "test",
  63. MaxPeers: 10,
  64. ListenAddr: "127.0.0.1:0",
  65. PrivateKey: newkey(),
  66. }
  67. server := &Server{
  68. Config: config,
  69. newPeerHook: pf,
  70. newTransport: func(fd net.Conn) transport { return newTestTransport(remoteKey, fd) },
  71. }
  72. if err := server.Start(); err != nil {
  73. t.Fatalf("Could not start server: %v", err)
  74. }
  75. return server
  76. }
  77. func TestServerListen(t *testing.T) {
  78. // start the test server
  79. connected := make(chan *Peer)
  80. remid := &newkey().PublicKey
  81. srv := startTestServer(t, remid, func(p *Peer) {
  82. if p.ID() != enode.PubkeyToIDV4(remid) {
  83. t.Error("peer func called with wrong node id")
  84. }
  85. connected <- p
  86. })
  87. defer close(connected)
  88. defer srv.Stop()
  89. // dial the test server
  90. conn, err := net.DialTimeout("tcp", srv.ListenAddr, 5*time.Second)
  91. if err != nil {
  92. t.Fatalf("could not dial: %v", err)
  93. }
  94. defer conn.Close()
  95. select {
  96. case peer := <-connected:
  97. if peer.LocalAddr().String() != conn.RemoteAddr().String() {
  98. t.Errorf("peer started with wrong conn: got %v, want %v",
  99. peer.LocalAddr(), conn.RemoteAddr())
  100. }
  101. peers := srv.Peers()
  102. if !reflect.DeepEqual(peers, []*Peer{peer}) {
  103. t.Errorf("Peers mismatch: got %v, want %v", peers, []*Peer{peer})
  104. }
  105. case <-time.After(1 * time.Second):
  106. t.Error("server did not accept within one second")
  107. }
  108. }
  109. func TestServerDial(t *testing.T) {
  110. // run a one-shot TCP server to handle the connection.
  111. listener, err := net.Listen("tcp", "127.0.0.1:0")
  112. if err != nil {
  113. t.Fatalf("could not setup listener: %v", err)
  114. }
  115. defer listener.Close()
  116. accepted := make(chan net.Conn)
  117. go func() {
  118. conn, err := listener.Accept()
  119. if err != nil {
  120. t.Error("accept error:", err)
  121. return
  122. }
  123. accepted <- conn
  124. }()
  125. // start the server
  126. connected := make(chan *Peer)
  127. remid := &newkey().PublicKey
  128. srv := startTestServer(t, remid, func(p *Peer) { connected <- p })
  129. defer close(connected)
  130. defer srv.Stop()
  131. // tell the server to connect
  132. tcpAddr := listener.Addr().(*net.TCPAddr)
  133. node := enode.NewV4(remid, tcpAddr.IP, tcpAddr.Port, 0)
  134. srv.AddPeer(node)
  135. select {
  136. case conn := <-accepted:
  137. defer conn.Close()
  138. select {
  139. case peer := <-connected:
  140. if peer.ID() != enode.PubkeyToIDV4(remid) {
  141. t.Errorf("peer has wrong id")
  142. }
  143. if peer.Name() != "test" {
  144. t.Errorf("peer has wrong name")
  145. }
  146. if peer.RemoteAddr().String() != conn.LocalAddr().String() {
  147. t.Errorf("peer started with wrong conn: got %v, want %v",
  148. peer.RemoteAddr(), conn.LocalAddr())
  149. }
  150. peers := srv.Peers()
  151. if !reflect.DeepEqual(peers, []*Peer{peer}) {
  152. t.Errorf("Peers mismatch: got %v, want %v", peers, []*Peer{peer})
  153. }
  154. // Test AddTrustedPeer/RemoveTrustedPeer and changing Trusted flags
  155. // Particularly for race conditions on changing the flag state.
  156. if peer := srv.Peers()[0]; peer.Info().Network.Trusted {
  157. t.Errorf("peer is trusted prematurely: %v", peer)
  158. }
  159. done := make(chan bool)
  160. go func() {
  161. srv.AddTrustedPeer(node)
  162. if peer := srv.Peers()[0]; !peer.Info().Network.Trusted {
  163. t.Errorf("peer is not trusted after AddTrustedPeer: %v", peer)
  164. }
  165. srv.RemoveTrustedPeer(node)
  166. if peer := srv.Peers()[0]; peer.Info().Network.Trusted {
  167. t.Errorf("peer is trusted after RemoveTrustedPeer: %v", peer)
  168. }
  169. done <- true
  170. }()
  171. // Trigger potential race conditions
  172. peer = srv.Peers()[0]
  173. _ = peer.Inbound()
  174. _ = peer.Info()
  175. <-done
  176. case <-time.After(1 * time.Second):
  177. t.Error("server did not launch peer within one second")
  178. }
  179. case <-time.After(1 * time.Second):
  180. t.Error("server did not connect within one second")
  181. }
  182. }
  183. // This test checks that tasks generated by dialstate are
  184. // actually executed and taskdone is called for them.
  185. func TestServerTaskScheduling(t *testing.T) {
  186. var (
  187. done = make(chan *testTask)
  188. quit, returned = make(chan struct{}), make(chan struct{})
  189. tc = 0
  190. tg = taskgen{
  191. newFunc: func(running int, peers map[enode.ID]*Peer) []task {
  192. tc++
  193. return []task{&testTask{index: tc - 1}}
  194. },
  195. doneFunc: func(t task) {
  196. select {
  197. case done <- t.(*testTask):
  198. case <-quit:
  199. }
  200. },
  201. }
  202. )
  203. // The Server in this test isn't actually running
  204. // because we're only interested in what run does.
  205. db, _ := enode.OpenDB("")
  206. srv := &Server{
  207. Config: Config{MaxPeers: 10},
  208. localnode: enode.NewLocalNode(db, newkey()),
  209. nodedb: db,
  210. quit: make(chan struct{}),
  211. ntab: fakeTable{},
  212. running: true,
  213. log: log.New(),
  214. }
  215. srv.loopWG.Add(1)
  216. go func() {
  217. srv.run(tg)
  218. close(returned)
  219. }()
  220. var gotdone []*testTask
  221. for i := 0; i < 100; i++ {
  222. gotdone = append(gotdone, <-done)
  223. }
  224. for i, task := range gotdone {
  225. if task.index != i {
  226. t.Errorf("task %d has wrong index, got %d", i, task.index)
  227. break
  228. }
  229. if !task.called {
  230. t.Errorf("task %d was not called", i)
  231. break
  232. }
  233. }
  234. close(quit)
  235. srv.Stop()
  236. select {
  237. case <-returned:
  238. case <-time.After(500 * time.Millisecond):
  239. t.Error("Server.run did not return within 500ms")
  240. }
  241. }
  242. // This test checks that Server doesn't drop tasks,
  243. // even if newTasks returns more than the maximum number of tasks.
  244. func TestServerManyTasks(t *testing.T) {
  245. alltasks := make([]task, 300)
  246. for i := range alltasks {
  247. alltasks[i] = &testTask{index: i}
  248. }
  249. var (
  250. db, _ = enode.OpenDB("")
  251. srv = &Server{
  252. quit: make(chan struct{}),
  253. localnode: enode.NewLocalNode(db, newkey()),
  254. nodedb: db,
  255. ntab: fakeTable{},
  256. running: true,
  257. log: log.New(),
  258. }
  259. done = make(chan *testTask)
  260. start, end = 0, 0
  261. )
  262. defer srv.Stop()
  263. srv.loopWG.Add(1)
  264. go srv.run(taskgen{
  265. newFunc: func(running int, peers map[enode.ID]*Peer) []task {
  266. start, end = end, end+maxActiveDialTasks+10
  267. if end > len(alltasks) {
  268. end = len(alltasks)
  269. }
  270. return alltasks[start:end]
  271. },
  272. doneFunc: func(tt task) {
  273. done <- tt.(*testTask)
  274. },
  275. })
  276. doneset := make(map[int]bool)
  277. timeout := time.After(2 * time.Second)
  278. for len(doneset) < len(alltasks) {
  279. select {
  280. case tt := <-done:
  281. if doneset[tt.index] {
  282. t.Errorf("task %d got done more than once", tt.index)
  283. } else {
  284. doneset[tt.index] = true
  285. }
  286. case <-timeout:
  287. t.Errorf("%d of %d tasks got done within 2s", len(doneset), len(alltasks))
  288. for i := 0; i < len(alltasks); i++ {
  289. if !doneset[i] {
  290. t.Logf("task %d not done", i)
  291. }
  292. }
  293. return
  294. }
  295. }
  296. }
  297. type taskgen struct {
  298. newFunc func(running int, peers map[enode.ID]*Peer) []task
  299. doneFunc func(task)
  300. }
  301. func (tg taskgen) newTasks(running int, peers map[enode.ID]*Peer, now time.Time) []task {
  302. return tg.newFunc(running, peers)
  303. }
  304. func (tg taskgen) taskDone(t task, now time.Time) {
  305. tg.doneFunc(t)
  306. }
  307. func (tg taskgen) addStatic(*enode.Node) {
  308. }
  309. func (tg taskgen) removeStatic(*enode.Node) {
  310. }
  311. type testTask struct {
  312. index int
  313. called bool
  314. }
  315. func (t *testTask) Do(srv *Server) {
  316. t.called = true
  317. }
  318. // This test checks that connections are disconnected
  319. // just after the encryption handshake when the server is
  320. // at capacity. Trusted connections should still be accepted.
  321. func TestServerAtCap(t *testing.T) {
  322. trustedNode := newkey()
  323. trustedID := enode.PubkeyToIDV4(&trustedNode.PublicKey)
  324. srv := &Server{
  325. Config: Config{
  326. PrivateKey: newkey(),
  327. MaxPeers: 10,
  328. NoDial: true,
  329. TrustedNodes: []*enode.Node{newNode(trustedID, nil)},
  330. },
  331. }
  332. if err := srv.Start(); err != nil {
  333. t.Fatalf("could not start: %v", err)
  334. }
  335. defer srv.Stop()
  336. newconn := func(id enode.ID) *conn {
  337. fd, _ := net.Pipe()
  338. tx := newTestTransport(&trustedNode.PublicKey, fd)
  339. node := enode.SignNull(new(enr.Record), id)
  340. return &conn{fd: fd, transport: tx, flags: inboundConn, node: node, cont: make(chan error)}
  341. }
  342. // Inject a few connections to fill up the peer set.
  343. for i := 0; i < 10; i++ {
  344. c := newconn(randomID())
  345. if err := srv.checkpoint(c, srv.addpeer); err != nil {
  346. t.Fatalf("could not add conn %d: %v", i, err)
  347. }
  348. }
  349. // Try inserting a non-trusted connection.
  350. anotherID := randomID()
  351. c := newconn(anotherID)
  352. if err := srv.checkpoint(c, srv.posthandshake); err != DiscTooManyPeers {
  353. t.Error("wrong error for insert:", err)
  354. }
  355. // Try inserting a trusted connection.
  356. c = newconn(trustedID)
  357. if err := srv.checkpoint(c, srv.posthandshake); err != nil {
  358. t.Error("unexpected error for trusted conn @posthandshake:", err)
  359. }
  360. if !c.is(trustedConn) {
  361. t.Error("Server did not set trusted flag")
  362. }
  363. // Remove from trusted set and try again
  364. srv.RemoveTrustedPeer(newNode(trustedID, nil))
  365. c = newconn(trustedID)
  366. if err := srv.checkpoint(c, srv.posthandshake); err != DiscTooManyPeers {
  367. t.Error("wrong error for insert:", err)
  368. }
  369. // Add anotherID to trusted set and try again
  370. srv.AddTrustedPeer(newNode(anotherID, nil))
  371. c = newconn(anotherID)
  372. if err := srv.checkpoint(c, srv.posthandshake); err != nil {
  373. t.Error("unexpected error for trusted conn @posthandshake:", err)
  374. }
  375. if !c.is(trustedConn) {
  376. t.Error("Server did not set trusted flag")
  377. }
  378. }
  379. func TestServerPeerLimits(t *testing.T) {
  380. srvkey := newkey()
  381. clientkey := newkey()
  382. clientnode := enode.NewV4(&clientkey.PublicKey, nil, 0, 0)
  383. var tp = &setupTransport{
  384. pubkey: &clientkey.PublicKey,
  385. phs: protoHandshake{
  386. ID: crypto.FromECDSAPub(&clientkey.PublicKey)[1:],
  387. // Force "DiscUselessPeer" due to unmatching caps
  388. // Caps: []Cap{discard.cap()},
  389. },
  390. }
  391. srv := &Server{
  392. Config: Config{
  393. PrivateKey: srvkey,
  394. MaxPeers: 0,
  395. NoDial: true,
  396. Protocols: []Protocol{discard},
  397. },
  398. newTransport: func(fd net.Conn) transport { return tp },
  399. log: log.New(),
  400. }
  401. if err := srv.Start(); err != nil {
  402. t.Fatalf("couldn't start server: %v", err)
  403. }
  404. defer srv.Stop()
  405. // Check that server is full (MaxPeers=0)
  406. flags := dynDialedConn
  407. dialDest := clientnode
  408. conn, _ := net.Pipe()
  409. srv.SetupConn(conn, flags, dialDest)
  410. if tp.closeErr != DiscTooManyPeers {
  411. t.Errorf("unexpected close error: %q", tp.closeErr)
  412. }
  413. conn.Close()
  414. srv.AddTrustedPeer(clientnode)
  415. // Check that server allows a trusted peer despite being full.
  416. conn, _ = net.Pipe()
  417. srv.SetupConn(conn, flags, dialDest)
  418. if tp.closeErr == DiscTooManyPeers {
  419. t.Errorf("failed to bypass MaxPeers with trusted node: %q", tp.closeErr)
  420. }
  421. if tp.closeErr != DiscUselessPeer {
  422. t.Errorf("unexpected close error: %q", tp.closeErr)
  423. }
  424. conn.Close()
  425. srv.RemoveTrustedPeer(clientnode)
  426. // Check that server is full again.
  427. conn, _ = net.Pipe()
  428. srv.SetupConn(conn, flags, dialDest)
  429. if tp.closeErr != DiscTooManyPeers {
  430. t.Errorf("unexpected close error: %q", tp.closeErr)
  431. }
  432. conn.Close()
  433. }
  434. func TestServerSetupConn(t *testing.T) {
  435. var (
  436. clientkey, srvkey = newkey(), newkey()
  437. clientpub = &clientkey.PublicKey
  438. srvpub = &srvkey.PublicKey
  439. )
  440. tests := []struct {
  441. dontstart bool
  442. tt *setupTransport
  443. flags connFlag
  444. dialDest *enode.Node
  445. wantCloseErr error
  446. wantCalls string
  447. }{
  448. {
  449. dontstart: true,
  450. tt: &setupTransport{pubkey: clientpub},
  451. wantCalls: "close,",
  452. wantCloseErr: errServerStopped,
  453. },
  454. {
  455. tt: &setupTransport{pubkey: clientpub, encHandshakeErr: errors.New("read error")},
  456. flags: inboundConn,
  457. wantCalls: "doEncHandshake,close,",
  458. wantCloseErr: errors.New("read error"),
  459. },
  460. {
  461. tt: &setupTransport{pubkey: clientpub},
  462. dialDest: enode.NewV4(&newkey().PublicKey, nil, 0, 0),
  463. flags: dynDialedConn,
  464. wantCalls: "doEncHandshake,close,",
  465. wantCloseErr: DiscUnexpectedIdentity,
  466. },
  467. {
  468. tt: &setupTransport{pubkey: clientpub, phs: protoHandshake{ID: randomID().Bytes()}},
  469. dialDest: enode.NewV4(clientpub, nil, 0, 0),
  470. flags: dynDialedConn,
  471. wantCalls: "doEncHandshake,doProtoHandshake,close,",
  472. wantCloseErr: DiscUnexpectedIdentity,
  473. },
  474. {
  475. tt: &setupTransport{pubkey: clientpub, protoHandshakeErr: errors.New("foo")},
  476. dialDest: enode.NewV4(clientpub, nil, 0, 0),
  477. flags: dynDialedConn,
  478. wantCalls: "doEncHandshake,doProtoHandshake,close,",
  479. wantCloseErr: errors.New("foo"),
  480. },
  481. {
  482. tt: &setupTransport{pubkey: srvpub, phs: protoHandshake{ID: crypto.FromECDSAPub(srvpub)[1:]}},
  483. flags: inboundConn,
  484. wantCalls: "doEncHandshake,close,",
  485. wantCloseErr: DiscSelf,
  486. },
  487. {
  488. tt: &setupTransport{pubkey: clientpub, phs: protoHandshake{ID: crypto.FromECDSAPub(clientpub)[1:]}},
  489. flags: inboundConn,
  490. wantCalls: "doEncHandshake,doProtoHandshake,close,",
  491. wantCloseErr: DiscUselessPeer,
  492. },
  493. }
  494. for i, test := range tests {
  495. srv := &Server{
  496. Config: Config{
  497. PrivateKey: srvkey,
  498. MaxPeers: 10,
  499. NoDial: true,
  500. Protocols: []Protocol{discard},
  501. },
  502. newTransport: func(fd net.Conn) transport { return test.tt },
  503. log: log.New(),
  504. }
  505. if !test.dontstart {
  506. if err := srv.Start(); err != nil {
  507. t.Fatalf("couldn't start server: %v", err)
  508. }
  509. }
  510. p1, _ := net.Pipe()
  511. srv.SetupConn(p1, test.flags, test.dialDest)
  512. if !reflect.DeepEqual(test.tt.closeErr, test.wantCloseErr) {
  513. t.Errorf("test %d: close error mismatch: got %q, want %q", i, test.tt.closeErr, test.wantCloseErr)
  514. }
  515. if test.tt.calls != test.wantCalls {
  516. t.Errorf("test %d: calls mismatch: got %q, want %q", i, test.tt.calls, test.wantCalls)
  517. }
  518. }
  519. }
  520. type setupTransport struct {
  521. pubkey *ecdsa.PublicKey
  522. encHandshakeErr error
  523. phs protoHandshake
  524. protoHandshakeErr error
  525. calls string
  526. closeErr error
  527. }
  528. func (c *setupTransport) doEncHandshake(prv *ecdsa.PrivateKey, dialDest *ecdsa.PublicKey) (*ecdsa.PublicKey, error) {
  529. c.calls += "doEncHandshake,"
  530. return c.pubkey, c.encHandshakeErr
  531. }
  532. func (c *setupTransport) doProtoHandshake(our *protoHandshake) (*protoHandshake, error) {
  533. c.calls += "doProtoHandshake,"
  534. if c.protoHandshakeErr != nil {
  535. return nil, c.protoHandshakeErr
  536. }
  537. return &c.phs, nil
  538. }
  539. func (c *setupTransport) close(err error) {
  540. c.calls += "close,"
  541. c.closeErr = err
  542. }
  543. // setupConn shouldn't write to/read from the connection.
  544. func (c *setupTransport) WriteMsg(Msg) error {
  545. panic("WriteMsg called on setupTransport")
  546. }
  547. func (c *setupTransport) ReadMsg() (Msg, error) {
  548. panic("ReadMsg called on setupTransport")
  549. }
  550. func newkey() *ecdsa.PrivateKey {
  551. key, err := crypto.GenerateKey()
  552. if err != nil {
  553. panic("couldn't generate key: " + err.Error())
  554. }
  555. return key
  556. }
  557. func randomID() (id enode.ID) {
  558. for i := range id {
  559. id[i] = byte(rand.Intn(255))
  560. }
  561. return id
  562. }