server_test.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644
  1. // Copyright 2014 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package p2p
  17. import (
  18. "crypto/ecdsa"
  19. "crypto/sha256"
  20. "errors"
  21. "io"
  22. "math/rand"
  23. "net"
  24. "reflect"
  25. "testing"
  26. "time"
  27. "github.com/ethereum/go-ethereum/crypto"
  28. "github.com/ethereum/go-ethereum/internal/testlog"
  29. "github.com/ethereum/go-ethereum/log"
  30. "github.com/ethereum/go-ethereum/p2p/enode"
  31. "github.com/ethereum/go-ethereum/p2p/enr"
  32. "github.com/ethereum/go-ethereum/p2p/rlpx"
  33. )
  34. type testTransport struct {
  35. *rlpxTransport
  36. rpub *ecdsa.PublicKey
  37. closeErr error
  38. }
  39. func newTestTransport(rpub *ecdsa.PublicKey, fd net.Conn, dialDest *ecdsa.PublicKey) transport {
  40. wrapped := newRLPX(fd, dialDest).(*rlpxTransport)
  41. wrapped.conn.InitWithSecrets(rlpx.Secrets{
  42. AES: make([]byte, 16),
  43. MAC: make([]byte, 16),
  44. EgressMAC: sha256.New(),
  45. IngressMAC: sha256.New(),
  46. })
  47. return &testTransport{rpub: rpub, rlpxTransport: wrapped}
  48. }
  49. func (c *testTransport) doEncHandshake(prv *ecdsa.PrivateKey) (*ecdsa.PublicKey, error) {
  50. return c.rpub, nil
  51. }
  52. func (c *testTransport) doProtoHandshake(our *protoHandshake) (*protoHandshake, error) {
  53. pubkey := crypto.FromECDSAPub(c.rpub)[1:]
  54. return &protoHandshake{ID: pubkey, Name: "test"}, nil
  55. }
  56. func (c *testTransport) close(err error) {
  57. c.conn.Close()
  58. c.closeErr = err
  59. }
  60. func startTestServer(t *testing.T, remoteKey *ecdsa.PublicKey, pf func(*Peer)) *Server {
  61. config := Config{
  62. Name: "test",
  63. MaxPeers: 10,
  64. ListenAddr: "127.0.0.1:0",
  65. NoDiscovery: true,
  66. PrivateKey: newkey(),
  67. Logger: testlog.Logger(t, log.LvlTrace),
  68. }
  69. server := &Server{
  70. Config: config,
  71. newPeerHook: pf,
  72. newTransport: func(fd net.Conn, dialDest *ecdsa.PublicKey) transport {
  73. return newTestTransport(remoteKey, fd, dialDest)
  74. },
  75. }
  76. if err := server.Start(); err != nil {
  77. t.Fatalf("Could not start server: %v", err)
  78. }
  79. return server
  80. }
  81. func TestServerListen(t *testing.T) {
  82. // start the test server
  83. connected := make(chan *Peer)
  84. remid := &newkey().PublicKey
  85. srv := startTestServer(t, remid, func(p *Peer) {
  86. if p.ID() != enode.PubkeyToIDV4(remid) {
  87. t.Error("peer func called with wrong node id")
  88. }
  89. connected <- p
  90. })
  91. defer close(connected)
  92. defer srv.Stop()
  93. // dial the test server
  94. conn, err := net.DialTimeout("tcp", srv.ListenAddr, 5*time.Second)
  95. if err != nil {
  96. t.Fatalf("could not dial: %v", err)
  97. }
  98. defer conn.Close()
  99. select {
  100. case peer := <-connected:
  101. if peer.LocalAddr().String() != conn.RemoteAddr().String() {
  102. t.Errorf("peer started with wrong conn: got %v, want %v",
  103. peer.LocalAddr(), conn.RemoteAddr())
  104. }
  105. peers := srv.Peers()
  106. if !reflect.DeepEqual(peers, []*Peer{peer}) {
  107. t.Errorf("Peers mismatch: got %v, want %v", peers, []*Peer{peer})
  108. }
  109. case <-time.After(1 * time.Second):
  110. t.Error("server did not accept within one second")
  111. }
  112. }
  113. func TestServerDial(t *testing.T) {
  114. // run a one-shot TCP server to handle the connection.
  115. listener, err := net.Listen("tcp", "127.0.0.1:0")
  116. if err != nil {
  117. t.Fatalf("could not setup listener: %v", err)
  118. }
  119. defer listener.Close()
  120. accepted := make(chan net.Conn, 1)
  121. go func() {
  122. conn, err := listener.Accept()
  123. if err != nil {
  124. return
  125. }
  126. accepted <- conn
  127. }()
  128. // start the server
  129. connected := make(chan *Peer)
  130. remid := &newkey().PublicKey
  131. srv := startTestServer(t, remid, func(p *Peer) { connected <- p })
  132. defer close(connected)
  133. defer srv.Stop()
  134. // tell the server to connect
  135. tcpAddr := listener.Addr().(*net.TCPAddr)
  136. node := enode.NewV4(remid, tcpAddr.IP, tcpAddr.Port, 0)
  137. srv.AddPeer(node)
  138. select {
  139. case conn := <-accepted:
  140. defer conn.Close()
  141. select {
  142. case peer := <-connected:
  143. if peer.ID() != enode.PubkeyToIDV4(remid) {
  144. t.Errorf("peer has wrong id")
  145. }
  146. if peer.Name() != "test" {
  147. t.Errorf("peer has wrong name")
  148. }
  149. if peer.RemoteAddr().String() != conn.LocalAddr().String() {
  150. t.Errorf("peer started with wrong conn: got %v, want %v",
  151. peer.RemoteAddr(), conn.LocalAddr())
  152. }
  153. peers := srv.Peers()
  154. if !reflect.DeepEqual(peers, []*Peer{peer}) {
  155. t.Errorf("Peers mismatch: got %v, want %v", peers, []*Peer{peer})
  156. }
  157. // Test AddTrustedPeer/RemoveTrustedPeer and changing Trusted flags
  158. // Particularly for race conditions on changing the flag state.
  159. if peer := srv.Peers()[0]; peer.Info().Network.Trusted {
  160. t.Errorf("peer is trusted prematurely: %v", peer)
  161. }
  162. done := make(chan bool)
  163. go func() {
  164. srv.AddTrustedPeer(node)
  165. if peer := srv.Peers()[0]; !peer.Info().Network.Trusted {
  166. t.Errorf("peer is not trusted after AddTrustedPeer: %v", peer)
  167. }
  168. srv.RemoveTrustedPeer(node)
  169. if peer := srv.Peers()[0]; peer.Info().Network.Trusted {
  170. t.Errorf("peer is trusted after RemoveTrustedPeer: %v", peer)
  171. }
  172. done <- true
  173. }()
  174. // Trigger potential race conditions
  175. peer = srv.Peers()[0]
  176. _ = peer.Inbound()
  177. _ = peer.Info()
  178. <-done
  179. case <-time.After(1 * time.Second):
  180. t.Error("server did not launch peer within one second")
  181. }
  182. case <-time.After(1 * time.Second):
  183. t.Error("server did not connect within one second")
  184. }
  185. }
  186. func TestServerStopTimeout(t *testing.T) {
  187. srv := &Server{Config: Config{
  188. PrivateKey: newkey(),
  189. MaxPeers: 1,
  190. NoDiscovery: true,
  191. Logger: testlog.Logger(t, log.LvlTrace).New("server", "1"),
  192. }}
  193. srv.Start()
  194. srv.loopWG.Add(1)
  195. stopChan := make(chan struct{})
  196. go func() {
  197. srv.Stop()
  198. close(stopChan)
  199. }()
  200. select {
  201. case <-stopChan:
  202. case <-time.After(10 * time.Second):
  203. t.Error("server should be shutdown in 10 seconds")
  204. }
  205. }
  206. // This test checks that RemovePeer disconnects the peer if it is connected.
  207. func TestServerRemovePeerDisconnect(t *testing.T) {
  208. srv1 := &Server{Config: Config{
  209. PrivateKey: newkey(),
  210. MaxPeers: 1,
  211. NoDiscovery: true,
  212. Logger: testlog.Logger(t, log.LvlTrace).New("server", "1"),
  213. }}
  214. srv2 := &Server{Config: Config{
  215. PrivateKey: newkey(),
  216. MaxPeers: 1,
  217. NoDiscovery: true,
  218. NoDial: true,
  219. ListenAddr: "127.0.0.1:0",
  220. Logger: testlog.Logger(t, log.LvlTrace).New("server", "2"),
  221. }}
  222. srv1.Start()
  223. defer srv1.Stop()
  224. srv2.Start()
  225. defer srv2.Stop()
  226. if !syncAddPeer(srv1, srv2.Self()) {
  227. t.Fatal("peer not connected")
  228. }
  229. srv1.RemovePeer(srv2.Self())
  230. if srv1.PeerCount() > 0 {
  231. t.Fatal("removed peer still connected")
  232. }
  233. }
  234. // This test checks that connections are disconnected just after the encryption handshake
  235. // when the server is at capacity. Trusted connections should still be accepted.
  236. func TestServerAtCap(t *testing.T) {
  237. trustedNode := newkey()
  238. trustedID := enode.PubkeyToIDV4(&trustedNode.PublicKey)
  239. srv := &Server{
  240. Config: Config{
  241. PrivateKey: newkey(),
  242. MaxPeers: 10,
  243. NoDial: true,
  244. NoDiscovery: true,
  245. TrustedNodes: []*enode.Node{newNode(trustedID, "")},
  246. Logger: testlog.Logger(t, log.LvlTrace),
  247. },
  248. }
  249. if err := srv.Start(); err != nil {
  250. t.Fatalf("could not start: %v", err)
  251. }
  252. defer srv.Stop()
  253. newconn := func(id enode.ID) *conn {
  254. fd, _ := net.Pipe()
  255. tx := newTestTransport(&trustedNode.PublicKey, fd, nil)
  256. node := enode.SignNull(new(enr.Record), id)
  257. return &conn{fd: fd, transport: tx, flags: inboundConn, node: node, cont: make(chan error)}
  258. }
  259. // Inject a few connections to fill up the peer set.
  260. for i := 0; i < 10; i++ {
  261. c := newconn(randomID())
  262. if err := srv.checkpoint(c, srv.checkpointAddPeer); err != nil {
  263. t.Fatalf("could not add conn %d: %v", i, err)
  264. }
  265. }
  266. // Try inserting a non-trusted connection.
  267. anotherID := randomID()
  268. c := newconn(anotherID)
  269. if err := srv.checkpoint(c, srv.checkpointPostHandshake); err != DiscTooManyPeers {
  270. t.Error("wrong error for insert:", err)
  271. }
  272. // Try inserting a trusted connection.
  273. c = newconn(trustedID)
  274. if err := srv.checkpoint(c, srv.checkpointPostHandshake); err != nil {
  275. t.Error("unexpected error for trusted conn @posthandshake:", err)
  276. }
  277. if !c.is(trustedConn) {
  278. t.Error("Server did not set trusted flag")
  279. }
  280. // Remove from trusted set and try again
  281. srv.RemoveTrustedPeer(newNode(trustedID, ""))
  282. c = newconn(trustedID)
  283. if err := srv.checkpoint(c, srv.checkpointPostHandshake); err != DiscTooManyPeers {
  284. t.Error("wrong error for insert:", err)
  285. }
  286. // Add anotherID to trusted set and try again
  287. srv.AddTrustedPeer(newNode(anotherID, ""))
  288. c = newconn(anotherID)
  289. if err := srv.checkpoint(c, srv.checkpointPostHandshake); err != nil {
  290. t.Error("unexpected error for trusted conn @posthandshake:", err)
  291. }
  292. if !c.is(trustedConn) {
  293. t.Error("Server did not set trusted flag")
  294. }
  295. }
  296. func TestServerPeerLimits(t *testing.T) {
  297. srvkey := newkey()
  298. clientkey := newkey()
  299. clientnode := enode.NewV4(&clientkey.PublicKey, nil, 0, 0)
  300. var tp = &setupTransport{
  301. pubkey: &clientkey.PublicKey,
  302. phs: protoHandshake{
  303. ID: crypto.FromECDSAPub(&clientkey.PublicKey)[1:],
  304. // Force "DiscUselessPeer" due to unmatching caps
  305. // Caps: []Cap{discard.cap()},
  306. },
  307. }
  308. srv := &Server{
  309. Config: Config{
  310. PrivateKey: srvkey,
  311. MaxPeers: 0,
  312. NoDial: true,
  313. NoDiscovery: true,
  314. Protocols: []Protocol{discard},
  315. Logger: testlog.Logger(t, log.LvlTrace),
  316. },
  317. newTransport: func(fd net.Conn, dialDest *ecdsa.PublicKey) transport { return tp },
  318. }
  319. if err := srv.Start(); err != nil {
  320. t.Fatalf("couldn't start server: %v", err)
  321. }
  322. defer srv.Stop()
  323. // Check that server is full (MaxPeers=0)
  324. flags := dynDialedConn
  325. dialDest := clientnode
  326. conn, _ := net.Pipe()
  327. srv.SetupConn(conn, flags, dialDest)
  328. if tp.closeErr != DiscTooManyPeers {
  329. t.Errorf("unexpected close error: %q", tp.closeErr)
  330. }
  331. conn.Close()
  332. srv.AddTrustedPeer(clientnode)
  333. // Check that server allows a trusted peer despite being full.
  334. conn, _ = net.Pipe()
  335. srv.SetupConn(conn, flags, dialDest)
  336. if tp.closeErr == DiscTooManyPeers {
  337. t.Errorf("failed to bypass MaxPeers with trusted node: %q", tp.closeErr)
  338. }
  339. if tp.closeErr != DiscUselessPeer {
  340. t.Errorf("unexpected close error: %q", tp.closeErr)
  341. }
  342. conn.Close()
  343. srv.RemoveTrustedPeer(clientnode)
  344. // Check that server is full again.
  345. conn, _ = net.Pipe()
  346. srv.SetupConn(conn, flags, dialDest)
  347. if tp.closeErr != DiscTooManyPeers {
  348. t.Errorf("unexpected close error: %q", tp.closeErr)
  349. }
  350. conn.Close()
  351. }
  352. func TestServerSetupConn(t *testing.T) {
  353. var (
  354. clientkey, srvkey = newkey(), newkey()
  355. clientpub = &clientkey.PublicKey
  356. srvpub = &srvkey.PublicKey
  357. )
  358. tests := []struct {
  359. dontstart bool
  360. tt *setupTransport
  361. flags connFlag
  362. dialDest *enode.Node
  363. wantCloseErr error
  364. wantCalls string
  365. }{
  366. {
  367. dontstart: true,
  368. tt: &setupTransport{pubkey: clientpub},
  369. wantCalls: "close,",
  370. wantCloseErr: errServerStopped,
  371. },
  372. {
  373. tt: &setupTransport{pubkey: clientpub, encHandshakeErr: errors.New("read error")},
  374. flags: inboundConn,
  375. wantCalls: "doEncHandshake,close,",
  376. wantCloseErr: errors.New("read error"),
  377. },
  378. {
  379. tt: &setupTransport{pubkey: clientpub, phs: protoHandshake{ID: randomID().Bytes()}},
  380. dialDest: enode.NewV4(clientpub, nil, 0, 0),
  381. flags: dynDialedConn,
  382. wantCalls: "doEncHandshake,doProtoHandshake,close,",
  383. wantCloseErr: DiscUnexpectedIdentity,
  384. },
  385. {
  386. tt: &setupTransport{pubkey: clientpub, protoHandshakeErr: errors.New("foo")},
  387. dialDest: enode.NewV4(clientpub, nil, 0, 0),
  388. flags: dynDialedConn,
  389. wantCalls: "doEncHandshake,doProtoHandshake,close,",
  390. wantCloseErr: errors.New("foo"),
  391. },
  392. {
  393. tt: &setupTransport{pubkey: srvpub, phs: protoHandshake{ID: crypto.FromECDSAPub(srvpub)[1:]}},
  394. flags: inboundConn,
  395. wantCalls: "doEncHandshake,close,",
  396. wantCloseErr: DiscSelf,
  397. },
  398. {
  399. tt: &setupTransport{pubkey: clientpub, phs: protoHandshake{ID: crypto.FromECDSAPub(clientpub)[1:]}},
  400. flags: inboundConn,
  401. wantCalls: "doEncHandshake,doProtoHandshake,close,",
  402. wantCloseErr: DiscUselessPeer,
  403. },
  404. }
  405. for i, test := range tests {
  406. t.Run(test.wantCalls, func(t *testing.T) {
  407. cfg := Config{
  408. PrivateKey: srvkey,
  409. MaxPeers: 10,
  410. NoDial: true,
  411. NoDiscovery: true,
  412. Protocols: []Protocol{discard},
  413. Logger: testlog.Logger(t, log.LvlTrace),
  414. }
  415. srv := &Server{
  416. Config: cfg,
  417. newTransport: func(fd net.Conn, dialDest *ecdsa.PublicKey) transport { return test.tt },
  418. log: cfg.Logger,
  419. }
  420. if !test.dontstart {
  421. if err := srv.Start(); err != nil {
  422. t.Fatalf("couldn't start server: %v", err)
  423. }
  424. defer srv.Stop()
  425. }
  426. p1, _ := net.Pipe()
  427. srv.SetupConn(p1, test.flags, test.dialDest)
  428. if !reflect.DeepEqual(test.tt.closeErr, test.wantCloseErr) {
  429. t.Errorf("test %d: close error mismatch: got %q, want %q", i, test.tt.closeErr, test.wantCloseErr)
  430. }
  431. if test.tt.calls != test.wantCalls {
  432. t.Errorf("test %d: calls mismatch: got %q, want %q", i, test.tt.calls, test.wantCalls)
  433. }
  434. })
  435. }
  436. }
  437. type setupTransport struct {
  438. pubkey *ecdsa.PublicKey
  439. encHandshakeErr error
  440. phs protoHandshake
  441. protoHandshakeErr error
  442. calls string
  443. closeErr error
  444. }
  445. func (c *setupTransport) doEncHandshake(prv *ecdsa.PrivateKey) (*ecdsa.PublicKey, error) {
  446. c.calls += "doEncHandshake,"
  447. return c.pubkey, c.encHandshakeErr
  448. }
  449. func (c *setupTransport) doProtoHandshake(our *protoHandshake) (*protoHandshake, error) {
  450. c.calls += "doProtoHandshake,"
  451. if c.protoHandshakeErr != nil {
  452. return nil, c.protoHandshakeErr
  453. }
  454. return &c.phs, nil
  455. }
  456. func (c *setupTransport) close(err error) {
  457. c.calls += "close,"
  458. c.closeErr = err
  459. }
  460. // setupConn shouldn't write to/read from the connection.
  461. func (c *setupTransport) WriteMsg(Msg) error {
  462. panic("WriteMsg called on setupTransport")
  463. }
  464. func (c *setupTransport) ReadMsg() (Msg, error) {
  465. panic("ReadMsg called on setupTransport")
  466. }
  467. func newkey() *ecdsa.PrivateKey {
  468. key, err := crypto.GenerateKey()
  469. if err != nil {
  470. panic("couldn't generate key: " + err.Error())
  471. }
  472. return key
  473. }
  474. func randomID() (id enode.ID) {
  475. for i := range id {
  476. id[i] = byte(rand.Intn(255))
  477. }
  478. return id
  479. }
  480. // This test checks that inbound connections are throttled by IP.
  481. func TestServerInboundThrottle(t *testing.T) {
  482. const timeout = 5 * time.Second
  483. newTransportCalled := make(chan struct{})
  484. srv := &Server{
  485. Config: Config{
  486. PrivateKey: newkey(),
  487. ListenAddr: "127.0.0.1:0",
  488. MaxPeers: 10,
  489. NoDial: true,
  490. NoDiscovery: true,
  491. Protocols: []Protocol{discard},
  492. Logger: testlog.Logger(t, log.LvlTrace),
  493. },
  494. newTransport: func(fd net.Conn, dialDest *ecdsa.PublicKey) transport {
  495. newTransportCalled <- struct{}{}
  496. return newRLPX(fd, dialDest)
  497. },
  498. listenFunc: func(network, laddr string) (net.Listener, error) {
  499. fakeAddr := &net.TCPAddr{IP: net.IP{95, 33, 21, 2}, Port: 4444}
  500. return listenFakeAddr(network, laddr, fakeAddr)
  501. },
  502. }
  503. if err := srv.Start(); err != nil {
  504. t.Fatal("can't start: ", err)
  505. }
  506. defer srv.Stop()
  507. // Dial the test server.
  508. conn, err := net.DialTimeout("tcp", srv.ListenAddr, timeout)
  509. if err != nil {
  510. t.Fatalf("could not dial: %v", err)
  511. }
  512. select {
  513. case <-newTransportCalled:
  514. // OK
  515. case <-time.After(timeout):
  516. t.Error("newTransport not called")
  517. }
  518. conn.Close()
  519. // Dial again. This time the server should close the connection immediately.
  520. connClosed := make(chan struct{}, 1)
  521. conn, err = net.DialTimeout("tcp", srv.ListenAddr, timeout)
  522. if err != nil {
  523. t.Fatalf("could not dial: %v", err)
  524. }
  525. defer conn.Close()
  526. go func() {
  527. conn.SetDeadline(time.Now().Add(timeout))
  528. buf := make([]byte, 10)
  529. if n, err := conn.Read(buf); err != io.EOF || n != 0 {
  530. t.Errorf("expected io.EOF and n == 0, got error %q and n == %d", err, n)
  531. }
  532. connClosed <- struct{}{}
  533. }()
  534. select {
  535. case <-connClosed:
  536. // OK
  537. case <-newTransportCalled:
  538. t.Error("newTransport called for second attempt")
  539. case <-time.After(timeout):
  540. t.Error("connection not closed within timeout")
  541. }
  542. }
  543. func listenFakeAddr(network, laddr string, remoteAddr net.Addr) (net.Listener, error) {
  544. l, err := net.Listen(network, laddr)
  545. if err == nil {
  546. l = &fakeAddrListener{l, remoteAddr}
  547. }
  548. return l, err
  549. }
  550. // fakeAddrListener is a listener that creates connections with a mocked remote address.
  551. type fakeAddrListener struct {
  552. net.Listener
  553. remoteAddr net.Addr
  554. }
  555. type fakeAddrConn struct {
  556. net.Conn
  557. remoteAddr net.Addr
  558. }
  559. func (l *fakeAddrListener) Accept() (net.Conn, error) {
  560. c, err := l.Listener.Accept()
  561. if err != nil {
  562. return nil, err
  563. }
  564. return &fakeAddrConn{c, l.remoteAddr}, nil
  565. }
  566. func (c *fakeAddrConn) RemoteAddr() net.Addr {
  567. return c.remoteAddr
  568. }
  569. func syncAddPeer(srv *Server, node *enode.Node) bool {
  570. var (
  571. ch = make(chan *PeerEvent)
  572. sub = srv.SubscribeEvents(ch)
  573. timeout = time.After(2 * time.Second)
  574. )
  575. defer sub.Unsubscribe()
  576. srv.AddPeer(node)
  577. for {
  578. select {
  579. case ev := <-ch:
  580. if ev.Type == PeerEventTypeAdd && ev.Peer == node.ID() {
  581. return true
  582. }
  583. case <-timeout:
  584. return false
  585. }
  586. }
  587. }