server.go 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653
  1. package p2p
  2. import (
  3. "crypto/ecdsa"
  4. "errors"
  5. "fmt"
  6. "net"
  7. "sync"
  8. "time"
  9. "github.com/ethereum/go-ethereum/logger"
  10. "github.com/ethereum/go-ethereum/logger/glog"
  11. "github.com/ethereum/go-ethereum/p2p/discover"
  12. "github.com/ethereum/go-ethereum/p2p/nat"
  13. )
  14. const (
  15. defaultDialTimeout = 15 * time.Second
  16. refreshPeersInterval = 30 * time.Second
  17. staticPeerCheckInterval = 15 * time.Second
  18. // Maximum number of concurrently handshaking inbound connections.
  19. maxAcceptConns = 50
  20. // Maximum number of concurrently dialing outbound connections.
  21. maxActiveDialTasks = 16
  22. // Maximum time allowed for reading a complete message.
  23. // This is effectively the amount of time a connection can be idle.
  24. frameReadTimeout = 30 * time.Second
  25. // Maximum amount of time allowed for writing a complete message.
  26. frameWriteTimeout = 20 * time.Second
  27. )
  28. var errServerStopped = errors.New("server stopped")
  29. var srvjslog = logger.NewJsonLogger()
  30. // Server manages all peer connections.
  31. //
  32. // The fields of Server are used as configuration parameters.
  33. // You should set them before starting the Server. Fields may not be
  34. // modified while the server is running.
  35. type Server struct {
  36. // This field must be set to a valid secp256k1 private key.
  37. PrivateKey *ecdsa.PrivateKey
  38. // MaxPeers is the maximum number of peers that can be
  39. // connected. It must be greater than zero.
  40. MaxPeers int
  41. // MaxPendingPeers is the maximum number of peers that can be pending in the
  42. // handshake phase, counted separately for inbound and outbound connections.
  43. // Zero defaults to preset values.
  44. MaxPendingPeers int
  45. // Discovery specifies whether the peer discovery mechanism should be started
  46. // or not. Disabling is usually useful for protocol debugging (manual topology).
  47. Discovery bool
  48. // Name sets the node name of this server.
  49. // Use common.MakeName to create a name that follows existing conventions.
  50. Name string
  51. // Bootstrap nodes are used to establish connectivity
  52. // with the rest of the network.
  53. BootstrapNodes []*discover.Node
  54. // Static nodes are used as pre-configured connections which are always
  55. // maintained and re-connected on disconnects.
  56. StaticNodes []*discover.Node
  57. // Trusted nodes are used as pre-configured connections which are always
  58. // allowed to connect, even above the peer limit.
  59. TrustedNodes []*discover.Node
  60. // NodeDatabase is the path to the database containing the previously seen
  61. // live nodes in the network.
  62. NodeDatabase string
  63. // Protocols should contain the protocols supported
  64. // by the server. Matching protocols are launched for
  65. // each peer.
  66. Protocols []Protocol
  67. // If ListenAddr is set to a non-nil address, the server
  68. // will listen for incoming connections.
  69. //
  70. // If the port is zero, the operating system will pick a port. The
  71. // ListenAddr field will be updated with the actual address when
  72. // the server is started.
  73. ListenAddr string
  74. // If set to a non-nil value, the given NAT port mapper
  75. // is used to make the listening port available to the
  76. // Internet.
  77. NAT nat.Interface
  78. // If Dialer is set to a non-nil value, the given Dialer
  79. // is used to dial outbound peer connections.
  80. Dialer *net.Dialer
  81. // If NoDial is true, the server will not dial any peers.
  82. NoDial bool
  83. // Hooks for testing. These are useful because we can inhibit
  84. // the whole protocol stack.
  85. newTransport func(net.Conn) transport
  86. newPeerHook func(*Peer)
  87. lock sync.Mutex // protects running
  88. running bool
  89. ntab discoverTable
  90. listener net.Listener
  91. ourHandshake *protoHandshake
  92. lastLookup time.Time
  93. // These are for Peers, PeerCount (and nothing else).
  94. peerOp chan peerOpFunc
  95. peerOpDone chan struct{}
  96. quit chan struct{}
  97. addstatic chan *discover.Node
  98. posthandshake chan *conn
  99. addpeer chan *conn
  100. delpeer chan *Peer
  101. loopWG sync.WaitGroup // loop, listenLoop
  102. }
  103. type peerOpFunc func(map[discover.NodeID]*Peer)
  104. type connFlag int
  105. const (
  106. dynDialedConn connFlag = 1 << iota
  107. staticDialedConn
  108. inboundConn
  109. trustedConn
  110. )
  111. // conn wraps a network connection with information gathered
  112. // during the two handshakes.
  113. type conn struct {
  114. fd net.Conn
  115. transport
  116. flags connFlag
  117. cont chan error // The run loop uses cont to signal errors to setupConn.
  118. id discover.NodeID // valid after the encryption handshake
  119. caps []Cap // valid after the protocol handshake
  120. name string // valid after the protocol handshake
  121. }
  122. type transport interface {
  123. // The two handshakes.
  124. doEncHandshake(prv *ecdsa.PrivateKey, dialDest *discover.Node) (discover.NodeID, error)
  125. doProtoHandshake(our *protoHandshake) (*protoHandshake, error)
  126. // The MsgReadWriter can only be used after the encryption
  127. // handshake has completed. The code uses conn.id to track this
  128. // by setting it to a non-nil value after the encryption handshake.
  129. MsgReadWriter
  130. // transports must provide Close because we use MsgPipe in some of
  131. // the tests. Closing the actual network connection doesn't do
  132. // anything in those tests because NsgPipe doesn't use it.
  133. close(err error)
  134. }
  135. func (c *conn) String() string {
  136. s := c.flags.String() + " conn"
  137. if (c.id != discover.NodeID{}) {
  138. s += fmt.Sprintf(" %x", c.id[:8])
  139. }
  140. s += " " + c.fd.RemoteAddr().String()
  141. return s
  142. }
  143. func (f connFlag) String() string {
  144. s := ""
  145. if f&trustedConn != 0 {
  146. s += " trusted"
  147. }
  148. if f&dynDialedConn != 0 {
  149. s += " dyn dial"
  150. }
  151. if f&staticDialedConn != 0 {
  152. s += " static dial"
  153. }
  154. if f&inboundConn != 0 {
  155. s += " inbound"
  156. }
  157. if s != "" {
  158. s = s[1:]
  159. }
  160. return s
  161. }
  162. func (c *conn) is(f connFlag) bool {
  163. return c.flags&f != 0
  164. }
  165. // Peers returns all connected peers.
  166. func (srv *Server) Peers() []*Peer {
  167. var ps []*Peer
  168. select {
  169. // Note: We'd love to put this function into a variable but
  170. // that seems to cause a weird compiler error in some
  171. // environments.
  172. case srv.peerOp <- func(peers map[discover.NodeID]*Peer) {
  173. for _, p := range peers {
  174. ps = append(ps, p)
  175. }
  176. }:
  177. <-srv.peerOpDone
  178. case <-srv.quit:
  179. }
  180. return ps
  181. }
  182. // PeerCount returns the number of connected peers.
  183. func (srv *Server) PeerCount() int {
  184. var count int
  185. select {
  186. case srv.peerOp <- func(ps map[discover.NodeID]*Peer) { count = len(ps) }:
  187. <-srv.peerOpDone
  188. case <-srv.quit:
  189. }
  190. return count
  191. }
  192. // AddPeer connects to the given node and maintains the connection until the
  193. // server is shut down. If the connection fails for any reason, the server will
  194. // attempt to reconnect the peer.
  195. func (srv *Server) AddPeer(node *discover.Node) {
  196. select {
  197. case srv.addstatic <- node:
  198. case <-srv.quit:
  199. }
  200. }
  201. // Self returns the local node's endpoint information.
  202. func (srv *Server) Self() *discover.Node {
  203. srv.lock.Lock()
  204. defer srv.lock.Unlock()
  205. // If the server's not running, return an empty node
  206. if !srv.running {
  207. return &discover.Node{IP: net.ParseIP("0.0.0.0")}
  208. }
  209. // If the node is running but discovery is off, manually assemble the node infos
  210. if srv.ntab == nil {
  211. // Inbound connections disabled, use zero address
  212. if srv.listener == nil {
  213. return &discover.Node{IP: net.ParseIP("0.0.0.0"), ID: discover.PubkeyID(&srv.PrivateKey.PublicKey)}
  214. }
  215. // Otherwise inject the listener address too
  216. addr := srv.listener.Addr().(*net.TCPAddr)
  217. return &discover.Node{
  218. ID: discover.PubkeyID(&srv.PrivateKey.PublicKey),
  219. IP: addr.IP,
  220. TCP: uint16(addr.Port),
  221. }
  222. }
  223. // Otherwise return the live node infos
  224. return srv.ntab.Self()
  225. }
  226. // Stop terminates the server and all active peer connections.
  227. // It blocks until all active connections have been closed.
  228. func (srv *Server) Stop() {
  229. srv.lock.Lock()
  230. defer srv.lock.Unlock()
  231. if !srv.running {
  232. return
  233. }
  234. srv.running = false
  235. if srv.listener != nil {
  236. // this unblocks listener Accept
  237. srv.listener.Close()
  238. }
  239. close(srv.quit)
  240. srv.loopWG.Wait()
  241. }
  242. // Start starts running the server.
  243. // Servers can not be re-used after stopping.
  244. func (srv *Server) Start() (err error) {
  245. srv.lock.Lock()
  246. defer srv.lock.Unlock()
  247. if srv.running {
  248. return errors.New("server already running")
  249. }
  250. srv.running = true
  251. glog.V(logger.Info).Infoln("Starting Server")
  252. // static fields
  253. if srv.PrivateKey == nil {
  254. return fmt.Errorf("Server.PrivateKey must be set to a non-nil key")
  255. }
  256. if srv.newTransport == nil {
  257. srv.newTransport = newRLPX
  258. }
  259. if srv.Dialer == nil {
  260. srv.Dialer = &net.Dialer{Timeout: defaultDialTimeout}
  261. }
  262. srv.quit = make(chan struct{})
  263. srv.addpeer = make(chan *conn)
  264. srv.delpeer = make(chan *Peer)
  265. srv.posthandshake = make(chan *conn)
  266. srv.addstatic = make(chan *discover.Node)
  267. srv.peerOp = make(chan peerOpFunc)
  268. srv.peerOpDone = make(chan struct{})
  269. // node table
  270. if srv.Discovery {
  271. ntab, err := discover.ListenUDP(srv.PrivateKey, srv.ListenAddr, srv.NAT, srv.NodeDatabase)
  272. if err != nil {
  273. return err
  274. }
  275. srv.ntab = ntab
  276. }
  277. dynPeers := srv.MaxPeers / 2
  278. if !srv.Discovery {
  279. dynPeers = 0
  280. }
  281. dialer := newDialState(srv.StaticNodes, srv.ntab, dynPeers)
  282. // handshake
  283. srv.ourHandshake = &protoHandshake{Version: baseProtocolVersion, Name: srv.Name, ID: discover.PubkeyID(&srv.PrivateKey.PublicKey)}
  284. for _, p := range srv.Protocols {
  285. srv.ourHandshake.Caps = append(srv.ourHandshake.Caps, p.cap())
  286. }
  287. // listen/dial
  288. if srv.ListenAddr != "" {
  289. if err := srv.startListening(); err != nil {
  290. return err
  291. }
  292. }
  293. if srv.NoDial && srv.ListenAddr == "" {
  294. glog.V(logger.Warn).Infoln("I will be kind-of useless, neither dialing nor listening.")
  295. }
  296. srv.loopWG.Add(1)
  297. go srv.run(dialer)
  298. srv.running = true
  299. return nil
  300. }
  301. func (srv *Server) startListening() error {
  302. // Launch the TCP listener.
  303. listener, err := net.Listen("tcp", srv.ListenAddr)
  304. if err != nil {
  305. return err
  306. }
  307. laddr := listener.Addr().(*net.TCPAddr)
  308. srv.ListenAddr = laddr.String()
  309. srv.listener = listener
  310. srv.loopWG.Add(1)
  311. go srv.listenLoop()
  312. // Map the TCP listening port if NAT is configured.
  313. if !laddr.IP.IsLoopback() && srv.NAT != nil {
  314. srv.loopWG.Add(1)
  315. go func() {
  316. nat.Map(srv.NAT, srv.quit, "tcp", laddr.Port, laddr.Port, "ethereum p2p")
  317. srv.loopWG.Done()
  318. }()
  319. }
  320. return nil
  321. }
  322. type dialer interface {
  323. newTasks(running int, peers map[discover.NodeID]*Peer, now time.Time) []task
  324. taskDone(task, time.Time)
  325. addStatic(*discover.Node)
  326. }
  327. func (srv *Server) run(dialstate dialer) {
  328. defer srv.loopWG.Done()
  329. var (
  330. peers = make(map[discover.NodeID]*Peer)
  331. trusted = make(map[discover.NodeID]bool, len(srv.TrustedNodes))
  332. tasks []task
  333. pendingTasks []task
  334. taskdone = make(chan task, maxActiveDialTasks)
  335. )
  336. // Put trusted nodes into a map to speed up checks.
  337. // Trusted peers are loaded on startup and cannot be
  338. // modified while the server is running.
  339. for _, n := range srv.TrustedNodes {
  340. trusted[n.ID] = true
  341. }
  342. // Some task list helpers.
  343. delTask := func(t task) {
  344. for i := range tasks {
  345. if tasks[i] == t {
  346. tasks = append(tasks[:i], tasks[i+1:]...)
  347. break
  348. }
  349. }
  350. }
  351. scheduleTasks := func(new []task) {
  352. pt := append(pendingTasks, new...)
  353. start := maxActiveDialTasks - len(tasks)
  354. if len(pt) < start {
  355. start = len(pt)
  356. }
  357. if start > 0 {
  358. tasks = append(tasks, pt[:start]...)
  359. for _, t := range pt[:start] {
  360. t := t
  361. glog.V(logger.Detail).Infoln("new task:", t)
  362. go func() { t.Do(srv); taskdone <- t }()
  363. }
  364. copy(pt, pt[start:])
  365. pendingTasks = pt[:len(pt)-start]
  366. }
  367. }
  368. running:
  369. for {
  370. // Query the dialer for new tasks and launch them.
  371. now := time.Now()
  372. nt := dialstate.newTasks(len(pendingTasks)+len(tasks), peers, now)
  373. scheduleTasks(nt)
  374. select {
  375. case <-srv.quit:
  376. // The server was stopped. Run the cleanup logic.
  377. glog.V(logger.Detail).Infoln("<-quit: spinning down")
  378. break running
  379. case n := <-srv.addstatic:
  380. // This channel is used by AddPeer to add to the
  381. // ephemeral static peer list. Add it to the dialer,
  382. // it will keep the node connected.
  383. glog.V(logger.Detail).Infoln("<-addstatic:", n)
  384. dialstate.addStatic(n)
  385. case op := <-srv.peerOp:
  386. // This channel is used by Peers and PeerCount.
  387. op(peers)
  388. srv.peerOpDone <- struct{}{}
  389. case t := <-taskdone:
  390. // A task got done. Tell dialstate about it so it
  391. // can update its state and remove it from the active
  392. // tasks list.
  393. glog.V(logger.Detail).Infoln("<-taskdone:", t)
  394. dialstate.taskDone(t, now)
  395. delTask(t)
  396. case c := <-srv.posthandshake:
  397. // A connection has passed the encryption handshake so
  398. // the remote identity is known (but hasn't been verified yet).
  399. if trusted[c.id] {
  400. // Ensure that the trusted flag is set before checking against MaxPeers.
  401. c.flags |= trustedConn
  402. }
  403. glog.V(logger.Detail).Infoln("<-posthandshake:", c)
  404. // TODO: track in-progress inbound node IDs (pre-Peer) to avoid dialing them.
  405. c.cont <- srv.encHandshakeChecks(peers, c)
  406. case c := <-srv.addpeer:
  407. // At this point the connection is past the protocol handshake.
  408. // Its capabilities are known and the remote identity is verified.
  409. glog.V(logger.Detail).Infoln("<-addpeer:", c)
  410. err := srv.protoHandshakeChecks(peers, c)
  411. if err != nil {
  412. glog.V(logger.Detail).Infof("Not adding %v as peer: %v", c, err)
  413. } else {
  414. // The handshakes are done and it passed all checks.
  415. p := newPeer(c, srv.Protocols)
  416. peers[c.id] = p
  417. go srv.runPeer(p)
  418. }
  419. // The dialer logic relies on the assumption that
  420. // dial tasks complete after the peer has been added or
  421. // discarded. Unblock the task last.
  422. c.cont <- err
  423. case p := <-srv.delpeer:
  424. // A peer disconnected.
  425. glog.V(logger.Detail).Infoln("<-delpeer:", p)
  426. delete(peers, p.ID())
  427. }
  428. }
  429. // Terminate discovery. If there is a running lookup it will terminate soon.
  430. if srv.ntab != nil {
  431. srv.ntab.Close()
  432. }
  433. // Disconnect all peers.
  434. for _, p := range peers {
  435. p.Disconnect(DiscQuitting)
  436. }
  437. // Wait for peers to shut down. Pending connections and tasks are
  438. // not handled here and will terminate soon-ish because srv.quit
  439. // is closed.
  440. glog.V(logger.Detail).Infof("ignoring %d pending tasks at spindown", len(tasks))
  441. for len(peers) > 0 {
  442. p := <-srv.delpeer
  443. glog.V(logger.Detail).Infoln("<-delpeer (spindown):", p)
  444. delete(peers, p.ID())
  445. }
  446. }
  447. func (srv *Server) protoHandshakeChecks(peers map[discover.NodeID]*Peer, c *conn) error {
  448. // Drop connections with no matching protocols.
  449. if len(srv.Protocols) > 0 && countMatchingProtocols(srv.Protocols, c.caps) == 0 {
  450. return DiscUselessPeer
  451. }
  452. // Repeat the encryption handshake checks because the
  453. // peer set might have changed between the handshakes.
  454. return srv.encHandshakeChecks(peers, c)
  455. }
  456. func (srv *Server) encHandshakeChecks(peers map[discover.NodeID]*Peer, c *conn) error {
  457. switch {
  458. case !c.is(trustedConn|staticDialedConn) && len(peers) >= srv.MaxPeers:
  459. return DiscTooManyPeers
  460. case peers[c.id] != nil:
  461. return DiscAlreadyConnected
  462. case c.id == srv.Self().ID:
  463. return DiscSelf
  464. default:
  465. return nil
  466. }
  467. }
  468. // listenLoop runs in its own goroutine and accepts
  469. // inbound connections.
  470. func (srv *Server) listenLoop() {
  471. defer srv.loopWG.Done()
  472. glog.V(logger.Info).Infoln("Listening on", srv.listener.Addr())
  473. // This channel acts as a semaphore limiting
  474. // active inbound connections that are lingering pre-handshake.
  475. // If all slots are taken, no further connections are accepted.
  476. tokens := maxAcceptConns
  477. if srv.MaxPendingPeers > 0 {
  478. tokens = srv.MaxPendingPeers
  479. }
  480. slots := make(chan struct{}, tokens)
  481. for i := 0; i < tokens; i++ {
  482. slots <- struct{}{}
  483. }
  484. for {
  485. <-slots
  486. fd, err := srv.listener.Accept()
  487. if err != nil {
  488. return
  489. }
  490. glog.V(logger.Debug).Infof("Accepted conn %v\n", fd.RemoteAddr())
  491. go func() {
  492. srv.setupConn(fd, inboundConn, nil)
  493. slots <- struct{}{}
  494. }()
  495. }
  496. }
  497. // setupConn runs the handshakes and attempts to add the connection
  498. // as a peer. It returns when the connection has been added as a peer
  499. // or the handshakes have failed.
  500. func (srv *Server) setupConn(fd net.Conn, flags connFlag, dialDest *discover.Node) {
  501. // Prevent leftover pending conns from entering the handshake.
  502. srv.lock.Lock()
  503. running := srv.running
  504. srv.lock.Unlock()
  505. c := &conn{fd: fd, transport: srv.newTransport(fd), flags: flags, cont: make(chan error)}
  506. if !running {
  507. c.close(errServerStopped)
  508. return
  509. }
  510. // Run the encryption handshake.
  511. var err error
  512. if c.id, err = c.doEncHandshake(srv.PrivateKey, dialDest); err != nil {
  513. glog.V(logger.Debug).Infof("%v faild enc handshake: %v", c, err)
  514. c.close(err)
  515. return
  516. }
  517. // For dialed connections, check that the remote public key matches.
  518. if dialDest != nil && c.id != dialDest.ID {
  519. c.close(DiscUnexpectedIdentity)
  520. glog.V(logger.Debug).Infof("%v dialed identity mismatch, want %x", c, dialDest.ID[:8])
  521. return
  522. }
  523. if err := srv.checkpoint(c, srv.posthandshake); err != nil {
  524. glog.V(logger.Debug).Infof("%v failed checkpoint posthandshake: %v", c, err)
  525. c.close(err)
  526. return
  527. }
  528. // Run the protocol handshake
  529. phs, err := c.doProtoHandshake(srv.ourHandshake)
  530. if err != nil {
  531. glog.V(logger.Debug).Infof("%v failed proto handshake: %v", c, err)
  532. c.close(err)
  533. return
  534. }
  535. if phs.ID != c.id {
  536. glog.V(logger.Debug).Infof("%v wrong proto handshake identity: %x", c, phs.ID[:8])
  537. c.close(DiscUnexpectedIdentity)
  538. return
  539. }
  540. c.caps, c.name = phs.Caps, phs.Name
  541. if err := srv.checkpoint(c, srv.addpeer); err != nil {
  542. glog.V(logger.Debug).Infof("%v failed checkpoint addpeer: %v", c, err)
  543. c.close(err)
  544. return
  545. }
  546. // If the checks completed successfully, runPeer has now been
  547. // launched by run.
  548. }
  549. // checkpoint sends the conn to run, which performs the
  550. // post-handshake checks for the stage (posthandshake, addpeer).
  551. func (srv *Server) checkpoint(c *conn, stage chan<- *conn) error {
  552. select {
  553. case stage <- c:
  554. case <-srv.quit:
  555. return errServerStopped
  556. }
  557. select {
  558. case err := <-c.cont:
  559. return err
  560. case <-srv.quit:
  561. return errServerStopped
  562. }
  563. }
  564. // runPeer runs in its own goroutine for each peer.
  565. // it waits until the Peer logic returns and removes
  566. // the peer.
  567. func (srv *Server) runPeer(p *Peer) {
  568. glog.V(logger.Debug).Infof("Added %v\n", p)
  569. srvjslog.LogJson(&logger.P2PConnected{
  570. RemoteId: p.ID().String(),
  571. RemoteAddress: p.RemoteAddr().String(),
  572. RemoteVersionString: p.Name(),
  573. NumConnections: srv.PeerCount(),
  574. })
  575. if srv.newPeerHook != nil {
  576. srv.newPeerHook(p)
  577. }
  578. discreason := p.run()
  579. // Note: run waits for existing peers to be sent on srv.delpeer
  580. // before returning, so this send should not select on srv.quit.
  581. srv.delpeer <- p
  582. glog.V(logger.Debug).Infof("Removed %v (%v)\n", p, discreason)
  583. srvjslog.LogJson(&logger.P2PDisconnected{
  584. RemoteId: p.ID().String(),
  585. NumConnections: srv.PeerCount(),
  586. })
  587. }