server.go 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758
  1. // Copyright 2014 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. // Package p2p implements the Ethereum p2p network protocols.
  17. package p2p
  18. import (
  19. "crypto/ecdsa"
  20. "errors"
  21. "fmt"
  22. "net"
  23. "sync"
  24. "time"
  25. "github.com/ethereum/go-ethereum/logger"
  26. "github.com/ethereum/go-ethereum/logger/glog"
  27. "github.com/ethereum/go-ethereum/p2p/discover"
  28. "github.com/ethereum/go-ethereum/p2p/nat"
  29. )
  30. const (
  31. defaultDialTimeout = 15 * time.Second
  32. refreshPeersInterval = 30 * time.Second
  33. staticPeerCheckInterval = 15 * time.Second
  34. // Maximum number of concurrently handshaking inbound connections.
  35. maxAcceptConns = 50
  36. // Maximum number of concurrently dialing outbound connections.
  37. maxActiveDialTasks = 16
  38. // Maximum time allowed for reading a complete message.
  39. // This is effectively the amount of time a connection can be idle.
  40. frameReadTimeout = 30 * time.Second
  41. // Maximum amount of time allowed for writing a complete message.
  42. frameWriteTimeout = 20 * time.Second
  43. )
  44. var errServerStopped = errors.New("server stopped")
  45. var srvjslog = logger.NewJsonLogger()
  46. // Config holds Server options.
  47. type Config struct {
  48. // This field must be set to a valid secp256k1 private key.
  49. PrivateKey *ecdsa.PrivateKey
  50. // MaxPeers is the maximum number of peers that can be
  51. // connected. It must be greater than zero.
  52. MaxPeers int
  53. // MaxPendingPeers is the maximum number of peers that can be pending in the
  54. // handshake phase, counted separately for inbound and outbound connections.
  55. // Zero defaults to preset values.
  56. MaxPendingPeers int
  57. // Discovery specifies whether the peer discovery mechanism should be started
  58. // or not. Disabling is usually useful for protocol debugging (manual topology).
  59. Discovery bool
  60. // Name sets the node name of this server.
  61. // Use common.MakeName to create a name that follows existing conventions.
  62. Name string
  63. // Bootstrap nodes are used to establish connectivity
  64. // with the rest of the network.
  65. BootstrapNodes []*discover.Node
  66. // Static nodes are used as pre-configured connections which are always
  67. // maintained and re-connected on disconnects.
  68. StaticNodes []*discover.Node
  69. // Trusted nodes are used as pre-configured connections which are always
  70. // allowed to connect, even above the peer limit.
  71. TrustedNodes []*discover.Node
  72. // NodeDatabase is the path to the database containing the previously seen
  73. // live nodes in the network.
  74. NodeDatabase string
  75. // Protocols should contain the protocols supported
  76. // by the server. Matching protocols are launched for
  77. // each peer.
  78. Protocols []Protocol
  79. // If ListenAddr is set to a non-nil address, the server
  80. // will listen for incoming connections.
  81. //
  82. // If the port is zero, the operating system will pick a port. The
  83. // ListenAddr field will be updated with the actual address when
  84. // the server is started.
  85. ListenAddr string
  86. // If set to a non-nil value, the given NAT port mapper
  87. // is used to make the listening port available to the
  88. // Internet.
  89. NAT nat.Interface
  90. // If Dialer is set to a non-nil value, the given Dialer
  91. // is used to dial outbound peer connections.
  92. Dialer *net.Dialer
  93. // If NoDial is true, the server will not dial any peers.
  94. NoDial bool
  95. }
  96. // Server manages all peer connections.
  97. type Server struct {
  98. // Config fields may not be modified while the server is running.
  99. Config
  100. // Hooks for testing. These are useful because we can inhibit
  101. // the whole protocol stack.
  102. newTransport func(net.Conn) transport
  103. newPeerHook func(*Peer)
  104. lock sync.Mutex // protects running
  105. running bool
  106. ntab discoverTable
  107. listener net.Listener
  108. ourHandshake *protoHandshake
  109. lastLookup time.Time
  110. // These are for Peers, PeerCount (and nothing else).
  111. peerOp chan peerOpFunc
  112. peerOpDone chan struct{}
  113. quit chan struct{}
  114. addstatic chan *discover.Node
  115. posthandshake chan *conn
  116. addpeer chan *conn
  117. delpeer chan *Peer
  118. loopWG sync.WaitGroup // loop, listenLoop
  119. }
  120. type peerOpFunc func(map[discover.NodeID]*Peer)
  121. type connFlag int
  122. const (
  123. dynDialedConn connFlag = 1 << iota
  124. staticDialedConn
  125. inboundConn
  126. trustedConn
  127. )
  128. // conn wraps a network connection with information gathered
  129. // during the two handshakes.
  130. type conn struct {
  131. fd net.Conn
  132. transport
  133. flags connFlag
  134. cont chan error // The run loop uses cont to signal errors to setupConn.
  135. id discover.NodeID // valid after the encryption handshake
  136. caps []Cap // valid after the protocol handshake
  137. name string // valid after the protocol handshake
  138. }
  139. type transport interface {
  140. // The two handshakes.
  141. doEncHandshake(prv *ecdsa.PrivateKey, dialDest *discover.Node) (discover.NodeID, error)
  142. doProtoHandshake(our *protoHandshake) (*protoHandshake, error)
  143. // The MsgReadWriter can only be used after the encryption
  144. // handshake has completed. The code uses conn.id to track this
  145. // by setting it to a non-nil value after the encryption handshake.
  146. MsgReadWriter
  147. // transports must provide Close because we use MsgPipe in some of
  148. // the tests. Closing the actual network connection doesn't do
  149. // anything in those tests because NsgPipe doesn't use it.
  150. close(err error)
  151. }
  152. func (c *conn) String() string {
  153. s := c.flags.String() + " conn"
  154. if (c.id != discover.NodeID{}) {
  155. s += fmt.Sprintf(" %x", c.id[:8])
  156. }
  157. s += " " + c.fd.RemoteAddr().String()
  158. return s
  159. }
  160. func (f connFlag) String() string {
  161. s := ""
  162. if f&trustedConn != 0 {
  163. s += " trusted"
  164. }
  165. if f&dynDialedConn != 0 {
  166. s += " dyn dial"
  167. }
  168. if f&staticDialedConn != 0 {
  169. s += " static dial"
  170. }
  171. if f&inboundConn != 0 {
  172. s += " inbound"
  173. }
  174. if s != "" {
  175. s = s[1:]
  176. }
  177. return s
  178. }
  179. func (c *conn) is(f connFlag) bool {
  180. return c.flags&f != 0
  181. }
  182. // Peers returns all connected peers.
  183. func (srv *Server) Peers() []*Peer {
  184. var ps []*Peer
  185. select {
  186. // Note: We'd love to put this function into a variable but
  187. // that seems to cause a weird compiler error in some
  188. // environments.
  189. case srv.peerOp <- func(peers map[discover.NodeID]*Peer) {
  190. for _, p := range peers {
  191. ps = append(ps, p)
  192. }
  193. }:
  194. <-srv.peerOpDone
  195. case <-srv.quit:
  196. }
  197. return ps
  198. }
  199. // PeerCount returns the number of connected peers.
  200. func (srv *Server) PeerCount() int {
  201. var count int
  202. select {
  203. case srv.peerOp <- func(ps map[discover.NodeID]*Peer) { count = len(ps) }:
  204. <-srv.peerOpDone
  205. case <-srv.quit:
  206. }
  207. return count
  208. }
  209. // AddPeer connects to the given node and maintains the connection until the
  210. // server is shut down. If the connection fails for any reason, the server will
  211. // attempt to reconnect the peer.
  212. func (srv *Server) AddPeer(node *discover.Node) {
  213. select {
  214. case srv.addstatic <- node:
  215. case <-srv.quit:
  216. }
  217. }
  218. // Self returns the local node's endpoint information.
  219. func (srv *Server) Self() *discover.Node {
  220. srv.lock.Lock()
  221. defer srv.lock.Unlock()
  222. // If the server's not running, return an empty node
  223. if !srv.running {
  224. return &discover.Node{IP: net.ParseIP("0.0.0.0")}
  225. }
  226. // If the node is running but discovery is off, manually assemble the node infos
  227. if srv.ntab == nil {
  228. // Inbound connections disabled, use zero address
  229. if srv.listener == nil {
  230. return &discover.Node{IP: net.ParseIP("0.0.0.0"), ID: discover.PubkeyID(&srv.PrivateKey.PublicKey)}
  231. }
  232. // Otherwise inject the listener address too
  233. addr := srv.listener.Addr().(*net.TCPAddr)
  234. return &discover.Node{
  235. ID: discover.PubkeyID(&srv.PrivateKey.PublicKey),
  236. IP: addr.IP,
  237. TCP: uint16(addr.Port),
  238. }
  239. }
  240. // Otherwise return the live node infos
  241. return srv.ntab.Self()
  242. }
  243. // Stop terminates the server and all active peer connections.
  244. // It blocks until all active connections have been closed.
  245. func (srv *Server) Stop() {
  246. srv.lock.Lock()
  247. defer srv.lock.Unlock()
  248. if !srv.running {
  249. return
  250. }
  251. srv.running = false
  252. if srv.listener != nil {
  253. // this unblocks listener Accept
  254. srv.listener.Close()
  255. }
  256. close(srv.quit)
  257. srv.loopWG.Wait()
  258. }
  259. // Start starts running the server.
  260. // Servers can not be re-used after stopping.
  261. func (srv *Server) Start() (err error) {
  262. srv.lock.Lock()
  263. defer srv.lock.Unlock()
  264. if srv.running {
  265. return errors.New("server already running")
  266. }
  267. srv.running = true
  268. glog.V(logger.Info).Infoln("Starting Server")
  269. // static fields
  270. if srv.PrivateKey == nil {
  271. return fmt.Errorf("Server.PrivateKey must be set to a non-nil key")
  272. }
  273. if srv.newTransport == nil {
  274. srv.newTransport = newRLPX
  275. }
  276. if srv.Dialer == nil {
  277. srv.Dialer = &net.Dialer{Timeout: defaultDialTimeout}
  278. }
  279. srv.quit = make(chan struct{})
  280. srv.addpeer = make(chan *conn)
  281. srv.delpeer = make(chan *Peer)
  282. srv.posthandshake = make(chan *conn)
  283. srv.addstatic = make(chan *discover.Node)
  284. srv.peerOp = make(chan peerOpFunc)
  285. srv.peerOpDone = make(chan struct{})
  286. // node table
  287. if srv.Discovery {
  288. ntab, err := discover.ListenUDP(srv.PrivateKey, srv.ListenAddr, srv.NAT, srv.NodeDatabase)
  289. if err != nil {
  290. return err
  291. }
  292. if err := ntab.SetFallbackNodes(srv.BootstrapNodes); err != nil {
  293. return err
  294. }
  295. srv.ntab = ntab
  296. }
  297. dynPeers := (srv.MaxPeers + 1) / 2
  298. if !srv.Discovery {
  299. dynPeers = 0
  300. }
  301. dialer := newDialState(srv.StaticNodes, srv.ntab, dynPeers)
  302. // handshake
  303. srv.ourHandshake = &protoHandshake{Version: baseProtocolVersion, Name: srv.Name, ID: discover.PubkeyID(&srv.PrivateKey.PublicKey)}
  304. for _, p := range srv.Protocols {
  305. srv.ourHandshake.Caps = append(srv.ourHandshake.Caps, p.cap())
  306. }
  307. // listen/dial
  308. if srv.ListenAddr != "" {
  309. if err := srv.startListening(); err != nil {
  310. return err
  311. }
  312. }
  313. if srv.NoDial && srv.ListenAddr == "" {
  314. glog.V(logger.Warn).Infoln("I will be kind-of useless, neither dialing nor listening.")
  315. }
  316. srv.loopWG.Add(1)
  317. go srv.run(dialer)
  318. srv.running = true
  319. return nil
  320. }
  321. func (srv *Server) startListening() error {
  322. // Launch the TCP listener.
  323. listener, err := net.Listen("tcp", srv.ListenAddr)
  324. if err != nil {
  325. return err
  326. }
  327. laddr := listener.Addr().(*net.TCPAddr)
  328. srv.ListenAddr = laddr.String()
  329. srv.listener = listener
  330. srv.loopWG.Add(1)
  331. go srv.listenLoop()
  332. // Map the TCP listening port if NAT is configured.
  333. if !laddr.IP.IsLoopback() && srv.NAT != nil {
  334. srv.loopWG.Add(1)
  335. go func() {
  336. nat.Map(srv.NAT, srv.quit, "tcp", laddr.Port, laddr.Port, "ethereum p2p")
  337. srv.loopWG.Done()
  338. }()
  339. }
  340. return nil
  341. }
  342. type dialer interface {
  343. newTasks(running int, peers map[discover.NodeID]*Peer, now time.Time) []task
  344. taskDone(task, time.Time)
  345. addStatic(*discover.Node)
  346. }
  347. func (srv *Server) run(dialstate dialer) {
  348. defer srv.loopWG.Done()
  349. var (
  350. peers = make(map[discover.NodeID]*Peer)
  351. trusted = make(map[discover.NodeID]bool, len(srv.TrustedNodes))
  352. taskdone = make(chan task, maxActiveDialTasks)
  353. runningTasks []task
  354. queuedTasks []task // tasks that can't run yet
  355. )
  356. // Put trusted nodes into a map to speed up checks.
  357. // Trusted peers are loaded on startup and cannot be
  358. // modified while the server is running.
  359. for _, n := range srv.TrustedNodes {
  360. trusted[n.ID] = true
  361. }
  362. // removes t from runningTasks
  363. delTask := func(t task) {
  364. for i := range runningTasks {
  365. if runningTasks[i] == t {
  366. runningTasks = append(runningTasks[:i], runningTasks[i+1:]...)
  367. break
  368. }
  369. }
  370. }
  371. // starts until max number of active tasks is satisfied
  372. startTasks := func(ts []task) (rest []task) {
  373. i := 0
  374. for ; len(runningTasks) < maxActiveDialTasks && i < len(ts); i++ {
  375. t := ts[i]
  376. glog.V(logger.Detail).Infoln("new task:", t)
  377. go func() { t.Do(srv); taskdone <- t }()
  378. runningTasks = append(runningTasks, t)
  379. }
  380. return ts[i:]
  381. }
  382. scheduleTasks := func() {
  383. // Start from queue first.
  384. queuedTasks = append(queuedTasks[:0], startTasks(queuedTasks)...)
  385. // Query dialer for new tasks and start as many as possible now.
  386. if len(runningTasks) < maxActiveDialTasks {
  387. nt := dialstate.newTasks(len(runningTasks)+len(queuedTasks), peers, time.Now())
  388. queuedTasks = append(queuedTasks, startTasks(nt)...)
  389. }
  390. }
  391. running:
  392. for {
  393. scheduleTasks()
  394. select {
  395. case <-srv.quit:
  396. // The server was stopped. Run the cleanup logic.
  397. glog.V(logger.Detail).Infoln("<-quit: spinning down")
  398. break running
  399. case n := <-srv.addstatic:
  400. // This channel is used by AddPeer to add to the
  401. // ephemeral static peer list. Add it to the dialer,
  402. // it will keep the node connected.
  403. glog.V(logger.Detail).Infoln("<-addstatic:", n)
  404. dialstate.addStatic(n)
  405. case op := <-srv.peerOp:
  406. // This channel is used by Peers and PeerCount.
  407. op(peers)
  408. srv.peerOpDone <- struct{}{}
  409. case t := <-taskdone:
  410. // A task got done. Tell dialstate about it so it
  411. // can update its state and remove it from the active
  412. // tasks list.
  413. glog.V(logger.Detail).Infoln("<-taskdone:", t)
  414. dialstate.taskDone(t, time.Now())
  415. delTask(t)
  416. case c := <-srv.posthandshake:
  417. // A connection has passed the encryption handshake so
  418. // the remote identity is known (but hasn't been verified yet).
  419. if trusted[c.id] {
  420. // Ensure that the trusted flag is set before checking against MaxPeers.
  421. c.flags |= trustedConn
  422. }
  423. glog.V(logger.Detail).Infoln("<-posthandshake:", c)
  424. // TODO: track in-progress inbound node IDs (pre-Peer) to avoid dialing them.
  425. c.cont <- srv.encHandshakeChecks(peers, c)
  426. case c := <-srv.addpeer:
  427. // At this point the connection is past the protocol handshake.
  428. // Its capabilities are known and the remote identity is verified.
  429. glog.V(logger.Detail).Infoln("<-addpeer:", c)
  430. err := srv.protoHandshakeChecks(peers, c)
  431. if err != nil {
  432. glog.V(logger.Detail).Infof("Not adding %v as peer: %v", c, err)
  433. } else {
  434. // The handshakes are done and it passed all checks.
  435. p := newPeer(c, srv.Protocols)
  436. peers[c.id] = p
  437. go srv.runPeer(p)
  438. }
  439. // The dialer logic relies on the assumption that
  440. // dial tasks complete after the peer has been added or
  441. // discarded. Unblock the task last.
  442. c.cont <- err
  443. case p := <-srv.delpeer:
  444. // A peer disconnected.
  445. glog.V(logger.Detail).Infoln("<-delpeer:", p)
  446. delete(peers, p.ID())
  447. }
  448. }
  449. // Terminate discovery. If there is a running lookup it will terminate soon.
  450. if srv.ntab != nil {
  451. srv.ntab.Close()
  452. }
  453. // Disconnect all peers.
  454. for _, p := range peers {
  455. p.Disconnect(DiscQuitting)
  456. }
  457. // Wait for peers to shut down. Pending connections and tasks are
  458. // not handled here and will terminate soon-ish because srv.quit
  459. // is closed.
  460. glog.V(logger.Detail).Infof("ignoring %d pending tasks at spindown", len(runningTasks))
  461. for len(peers) > 0 {
  462. p := <-srv.delpeer
  463. glog.V(logger.Detail).Infoln("<-delpeer (spindown):", p)
  464. delete(peers, p.ID())
  465. }
  466. }
  467. func (srv *Server) protoHandshakeChecks(peers map[discover.NodeID]*Peer, c *conn) error {
  468. // Drop connections with no matching protocols.
  469. if len(srv.Protocols) > 0 && countMatchingProtocols(srv.Protocols, c.caps) == 0 {
  470. return DiscUselessPeer
  471. }
  472. // Repeat the encryption handshake checks because the
  473. // peer set might have changed between the handshakes.
  474. return srv.encHandshakeChecks(peers, c)
  475. }
  476. func (srv *Server) encHandshakeChecks(peers map[discover.NodeID]*Peer, c *conn) error {
  477. switch {
  478. case !c.is(trustedConn|staticDialedConn) && len(peers) >= srv.MaxPeers:
  479. return DiscTooManyPeers
  480. case peers[c.id] != nil:
  481. return DiscAlreadyConnected
  482. case c.id == srv.Self().ID:
  483. return DiscSelf
  484. default:
  485. return nil
  486. }
  487. }
  488. type tempError interface {
  489. Temporary() bool
  490. }
  491. // listenLoop runs in its own goroutine and accepts
  492. // inbound connections.
  493. func (srv *Server) listenLoop() {
  494. defer srv.loopWG.Done()
  495. glog.V(logger.Info).Infoln("Listening on", srv.listener.Addr())
  496. // This channel acts as a semaphore limiting
  497. // active inbound connections that are lingering pre-handshake.
  498. // If all slots are taken, no further connections are accepted.
  499. tokens := maxAcceptConns
  500. if srv.MaxPendingPeers > 0 {
  501. tokens = srv.MaxPendingPeers
  502. }
  503. slots := make(chan struct{}, tokens)
  504. for i := 0; i < tokens; i++ {
  505. slots <- struct{}{}
  506. }
  507. for {
  508. // Wait for a handshake slot before accepting.
  509. <-slots
  510. var (
  511. fd net.Conn
  512. err error
  513. )
  514. for {
  515. fd, err = srv.listener.Accept()
  516. if tempErr, ok := err.(tempError); ok && tempErr.Temporary() {
  517. glog.V(logger.Debug).Infof("Temporary read error: %v", err)
  518. continue
  519. } else if err != nil {
  520. glog.V(logger.Debug).Infof("Read error: %v", err)
  521. return
  522. }
  523. break
  524. }
  525. fd = newMeteredConn(fd, true)
  526. glog.V(logger.Debug).Infof("Accepted conn %v\n", fd.RemoteAddr())
  527. // Spawn the handler. It will give the slot back when the connection
  528. // has been established.
  529. go func() {
  530. srv.setupConn(fd, inboundConn, nil)
  531. slots <- struct{}{}
  532. }()
  533. }
  534. }
  535. // setupConn runs the handshakes and attempts to add the connection
  536. // as a peer. It returns when the connection has been added as a peer
  537. // or the handshakes have failed.
  538. func (srv *Server) setupConn(fd net.Conn, flags connFlag, dialDest *discover.Node) {
  539. // Prevent leftover pending conns from entering the handshake.
  540. srv.lock.Lock()
  541. running := srv.running
  542. srv.lock.Unlock()
  543. c := &conn{fd: fd, transport: srv.newTransport(fd), flags: flags, cont: make(chan error)}
  544. if !running {
  545. c.close(errServerStopped)
  546. return
  547. }
  548. // Run the encryption handshake.
  549. var err error
  550. if c.id, err = c.doEncHandshake(srv.PrivateKey, dialDest); err != nil {
  551. glog.V(logger.Debug).Infof("%v faild enc handshake: %v", c, err)
  552. c.close(err)
  553. return
  554. }
  555. // For dialed connections, check that the remote public key matches.
  556. if dialDest != nil && c.id != dialDest.ID {
  557. c.close(DiscUnexpectedIdentity)
  558. glog.V(logger.Debug).Infof("%v dialed identity mismatch, want %x", c, dialDest.ID[:8])
  559. return
  560. }
  561. if err := srv.checkpoint(c, srv.posthandshake); err != nil {
  562. glog.V(logger.Debug).Infof("%v failed checkpoint posthandshake: %v", c, err)
  563. c.close(err)
  564. return
  565. }
  566. // Run the protocol handshake
  567. phs, err := c.doProtoHandshake(srv.ourHandshake)
  568. if err != nil {
  569. glog.V(logger.Debug).Infof("%v failed proto handshake: %v", c, err)
  570. c.close(err)
  571. return
  572. }
  573. if phs.ID != c.id {
  574. glog.V(logger.Debug).Infof("%v wrong proto handshake identity: %x", c, phs.ID[:8])
  575. c.close(DiscUnexpectedIdentity)
  576. return
  577. }
  578. c.caps, c.name = phs.Caps, phs.Name
  579. if err := srv.checkpoint(c, srv.addpeer); err != nil {
  580. glog.V(logger.Debug).Infof("%v failed checkpoint addpeer: %v", c, err)
  581. c.close(err)
  582. return
  583. }
  584. // If the checks completed successfully, runPeer has now been
  585. // launched by run.
  586. }
  587. // checkpoint sends the conn to run, which performs the
  588. // post-handshake checks for the stage (posthandshake, addpeer).
  589. func (srv *Server) checkpoint(c *conn, stage chan<- *conn) error {
  590. select {
  591. case stage <- c:
  592. case <-srv.quit:
  593. return errServerStopped
  594. }
  595. select {
  596. case err := <-c.cont:
  597. return err
  598. case <-srv.quit:
  599. return errServerStopped
  600. }
  601. }
  602. // runPeer runs in its own goroutine for each peer.
  603. // it waits until the Peer logic returns and removes
  604. // the peer.
  605. func (srv *Server) runPeer(p *Peer) {
  606. glog.V(logger.Debug).Infof("Added %v\n", p)
  607. srvjslog.LogJson(&logger.P2PConnected{
  608. RemoteId: p.ID().String(),
  609. RemoteAddress: p.RemoteAddr().String(),
  610. RemoteVersionString: p.Name(),
  611. NumConnections: srv.PeerCount(),
  612. })
  613. if srv.newPeerHook != nil {
  614. srv.newPeerHook(p)
  615. }
  616. discreason := p.run()
  617. // Note: run waits for existing peers to be sent on srv.delpeer
  618. // before returning, so this send should not select on srv.quit.
  619. srv.delpeer <- p
  620. glog.V(logger.Debug).Infof("Removed %v (%v)\n", p, discreason)
  621. srvjslog.LogJson(&logger.P2PDisconnected{
  622. RemoteId: p.ID().String(),
  623. NumConnections: srv.PeerCount(),
  624. })
  625. }
  626. // NodeInfo represents a short summary of the information known about the host.
  627. type NodeInfo struct {
  628. ID string `json:"id"` // Unique node identifier (also the encryption key)
  629. Name string `json:"name"` // Name of the node, including client type, version, OS, custom data
  630. Enode string `json:"enode"` // Enode URL for adding this peer from remote peers
  631. IP string `json:"ip"` // IP address of the node
  632. Ports struct {
  633. Discovery int `json:"discovery"` // UDP listening port for discovery protocol
  634. Listener int `json:"listener"` // TCP listening port for RLPx
  635. } `json:"ports"`
  636. ListenAddr string `json:"listenAddr"`
  637. Protocols map[string]interface{} `json:"protocols"`
  638. }
  639. // Info gathers and returns a collection of metadata known about the host.
  640. func (srv *Server) NodeInfo() *NodeInfo {
  641. node := srv.Self()
  642. // Gather and assemble the generic node infos
  643. info := &NodeInfo{
  644. Name: srv.Name,
  645. Enode: node.String(),
  646. ID: node.ID.String(),
  647. IP: node.IP.String(),
  648. ListenAddr: srv.ListenAddr,
  649. Protocols: make(map[string]interface{}),
  650. }
  651. info.Ports.Discovery = int(node.UDP)
  652. info.Ports.Listener = int(node.TCP)
  653. // Gather all the running protocol infos (only once per protocol type)
  654. for _, proto := range srv.Protocols {
  655. if _, ok := info.Protocols[proto.Name]; !ok {
  656. nodeInfo := interface{}("unknown")
  657. if query := proto.NodeInfo; query != nil {
  658. nodeInfo = proto.NodeInfo()
  659. }
  660. info.Protocols[proto.Name] = nodeInfo
  661. }
  662. }
  663. return info
  664. }
  665. // PeersInfo returns an array of metadata objects describing connected peers.
  666. func (srv *Server) PeersInfo() []*PeerInfo {
  667. // Gather all the generic and sub-protocol specific infos
  668. infos := make([]*PeerInfo, 0, srv.PeerCount())
  669. for _, peer := range srv.Peers() {
  670. if peer != nil {
  671. infos = append(infos, peer.Info())
  672. }
  673. }
  674. // Sort the result array alphabetically by node identifier
  675. for i := 0; i < len(infos); i++ {
  676. for j := i + 1; j < len(infos); j++ {
  677. if infos[i].ID > infos[j].ID {
  678. infos[i], infos[j] = infos[j], infos[i]
  679. }
  680. }
  681. }
  682. return infos
  683. }