server.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624
  1. package p2p
  2. import (
  3. "crypto/ecdsa"
  4. "errors"
  5. "fmt"
  6. "net"
  7. "sync"
  8. "time"
  9. "github.com/ethereum/go-ethereum/logger"
  10. "github.com/ethereum/go-ethereum/logger/glog"
  11. "github.com/ethereum/go-ethereum/p2p/discover"
  12. "github.com/ethereum/go-ethereum/p2p/nat"
  13. )
  14. const (
  15. defaultDialTimeout = 15 * time.Second
  16. refreshPeersInterval = 30 * time.Second
  17. staticPeerCheckInterval = 15 * time.Second
  18. // Maximum number of concurrently handshaking inbound connections.
  19. maxAcceptConns = 50
  20. // Maximum number of concurrently dialing outbound connections.
  21. maxActiveDialTasks = 16
  22. // maximum time allowed for reading a complete message.
  23. // this is effectively the amount of time a connection can be idle.
  24. frameReadTimeout = 1 * time.Minute
  25. // maximum amount of time allowed for writing a complete message.
  26. frameWriteTimeout = 5 * time.Second
  27. )
  28. var errServerStopped = errors.New("server stopped")
  29. var srvjslog = logger.NewJsonLogger()
  30. // Server manages all peer connections.
  31. //
  32. // The fields of Server are used as configuration parameters.
  33. // You should set them before starting the Server. Fields may not be
  34. // modified while the server is running.
  35. type Server struct {
  36. // This field must be set to a valid secp256k1 private key.
  37. PrivateKey *ecdsa.PrivateKey
  38. // MaxPeers is the maximum number of peers that can be
  39. // connected. It must be greater than zero.
  40. MaxPeers int
  41. // MaxPendingPeers is the maximum number of peers that can be pending in the
  42. // handshake phase, counted separately for inbound and outbound connections.
  43. // Zero defaults to preset values.
  44. MaxPendingPeers int
  45. // Name sets the node name of this server.
  46. // Use common.MakeName to create a name that follows existing conventions.
  47. Name string
  48. // Bootstrap nodes are used to establish connectivity
  49. // with the rest of the network.
  50. BootstrapNodes []*discover.Node
  51. // Static nodes are used as pre-configured connections which are always
  52. // maintained and re-connected on disconnects.
  53. StaticNodes []*discover.Node
  54. // Trusted nodes are used as pre-configured connections which are always
  55. // allowed to connect, even above the peer limit.
  56. TrustedNodes []*discover.Node
  57. // NodeDatabase is the path to the database containing the previously seen
  58. // live nodes in the network.
  59. NodeDatabase string
  60. // Protocols should contain the protocols supported
  61. // by the server. Matching protocols are launched for
  62. // each peer.
  63. Protocols []Protocol
  64. // If ListenAddr is set to a non-nil address, the server
  65. // will listen for incoming connections.
  66. //
  67. // If the port is zero, the operating system will pick a port. The
  68. // ListenAddr field will be updated with the actual address when
  69. // the server is started.
  70. ListenAddr string
  71. // If set to a non-nil value, the given NAT port mapper
  72. // is used to make the listening port available to the
  73. // Internet.
  74. NAT nat.Interface
  75. // If Dialer is set to a non-nil value, the given Dialer
  76. // is used to dial outbound peer connections.
  77. Dialer *net.Dialer
  78. // If NoDial is true, the server will not dial any peers.
  79. NoDial bool
  80. // Hooks for testing. These are useful because we can inhibit
  81. // the whole protocol stack.
  82. newTransport func(net.Conn) transport
  83. newPeerHook func(*Peer)
  84. lock sync.Mutex // protects running
  85. running bool
  86. ntab discoverTable
  87. listener net.Listener
  88. ourHandshake *protoHandshake
  89. // These are for Peers, PeerCount (and nothing else).
  90. peerOp chan peerOpFunc
  91. peerOpDone chan struct{}
  92. quit chan struct{}
  93. addstatic chan *discover.Node
  94. posthandshake chan *conn
  95. addpeer chan *conn
  96. delpeer chan *Peer
  97. loopWG sync.WaitGroup // loop, listenLoop
  98. }
  99. type peerOpFunc func(map[discover.NodeID]*Peer)
  100. type connFlag int
  101. const (
  102. dynDialedConn connFlag = 1 << iota
  103. staticDialedConn
  104. inboundConn
  105. trustedConn
  106. )
  107. // conn wraps a network connection with information gathered
  108. // during the two handshakes.
  109. type conn struct {
  110. fd net.Conn
  111. transport
  112. flags connFlag
  113. cont chan error // The run loop uses cont to signal errors to setupConn.
  114. id discover.NodeID // valid after the encryption handshake
  115. caps []Cap // valid after the protocol handshake
  116. name string // valid after the protocol handshake
  117. }
  118. type transport interface {
  119. // The two handshakes.
  120. doEncHandshake(prv *ecdsa.PrivateKey, dialDest *discover.Node) (discover.NodeID, error)
  121. doProtoHandshake(our *protoHandshake) (*protoHandshake, error)
  122. // The MsgReadWriter can only be used after the encryption
  123. // handshake has completed. The code uses conn.id to track this
  124. // by setting it to a non-nil value after the encryption handshake.
  125. MsgReadWriter
  126. // transports must provide Close because we use MsgPipe in some of
  127. // the tests. Closing the actual network connection doesn't do
  128. // anything in those tests because NsgPipe doesn't use it.
  129. close(err error)
  130. }
  131. func (c *conn) String() string {
  132. s := c.flags.String() + " conn"
  133. if (c.id != discover.NodeID{}) {
  134. s += fmt.Sprintf(" %x", c.id[:8])
  135. }
  136. s += " " + c.fd.RemoteAddr().String()
  137. return s
  138. }
  139. func (f connFlag) String() string {
  140. s := ""
  141. if f&trustedConn != 0 {
  142. s += " trusted"
  143. }
  144. if f&dynDialedConn != 0 {
  145. s += " dyn dial"
  146. }
  147. if f&staticDialedConn != 0 {
  148. s += " static dial"
  149. }
  150. if f&inboundConn != 0 {
  151. s += " inbound"
  152. }
  153. if s != "" {
  154. s = s[1:]
  155. }
  156. return s
  157. }
  158. func (c *conn) is(f connFlag) bool {
  159. return c.flags&f != 0
  160. }
  161. // Peers returns all connected peers.
  162. func (srv *Server) Peers() []*Peer {
  163. var ps []*Peer
  164. select {
  165. // Note: We'd love to put this function into a variable but
  166. // that seems to cause a weird compiler error in some
  167. // environments.
  168. case srv.peerOp <- func(peers map[discover.NodeID]*Peer) {
  169. for _, p := range peers {
  170. ps = append(ps, p)
  171. }
  172. }:
  173. <-srv.peerOpDone
  174. case <-srv.quit:
  175. }
  176. return ps
  177. }
  178. // PeerCount returns the number of connected peers.
  179. func (srv *Server) PeerCount() int {
  180. var count int
  181. select {
  182. case srv.peerOp <- func(ps map[discover.NodeID]*Peer) { count = len(ps) }:
  183. <-srv.peerOpDone
  184. case <-srv.quit:
  185. }
  186. return count
  187. }
  188. // AddPeer connects to the given node and maintains the connection until the
  189. // server is shut down. If the connection fails for any reason, the server will
  190. // attempt to reconnect the peer.
  191. func (srv *Server) AddPeer(node *discover.Node) {
  192. select {
  193. case srv.addstatic <- node:
  194. case <-srv.quit:
  195. }
  196. }
  197. // Self returns the local node's endpoint information.
  198. func (srv *Server) Self() *discover.Node {
  199. srv.lock.Lock()
  200. defer srv.lock.Unlock()
  201. if !srv.running {
  202. return &discover.Node{IP: net.ParseIP("0.0.0.0")}
  203. }
  204. return srv.ntab.Self()
  205. }
  206. // Stop terminates the server and all active peer connections.
  207. // It blocks until all active connections have been closed.
  208. func (srv *Server) Stop() {
  209. srv.lock.Lock()
  210. defer srv.lock.Unlock()
  211. if !srv.running {
  212. return
  213. }
  214. srv.running = false
  215. if srv.listener != nil {
  216. // this unblocks listener Accept
  217. srv.listener.Close()
  218. }
  219. close(srv.quit)
  220. srv.loopWG.Wait()
  221. }
  222. // Start starts running the server.
  223. // Servers can not be re-used after stopping.
  224. func (srv *Server) Start() (err error) {
  225. srv.lock.Lock()
  226. defer srv.lock.Unlock()
  227. if srv.running {
  228. return errors.New("server already running")
  229. }
  230. srv.running = true
  231. glog.V(logger.Info).Infoln("Starting Server")
  232. // static fields
  233. if srv.PrivateKey == nil {
  234. return fmt.Errorf("Server.PrivateKey must be set to a non-nil key")
  235. }
  236. if srv.MaxPeers <= 0 {
  237. return fmt.Errorf("Server.MaxPeers must be > 0")
  238. }
  239. if srv.newTransport == nil {
  240. srv.newTransport = newRLPX
  241. }
  242. if srv.Dialer == nil {
  243. srv.Dialer = &net.Dialer{Timeout: defaultDialTimeout}
  244. }
  245. srv.quit = make(chan struct{})
  246. srv.addpeer = make(chan *conn)
  247. srv.delpeer = make(chan *Peer)
  248. srv.posthandshake = make(chan *conn)
  249. srv.addstatic = make(chan *discover.Node)
  250. srv.peerOp = make(chan peerOpFunc)
  251. srv.peerOpDone = make(chan struct{})
  252. // node table
  253. ntab, err := discover.ListenUDP(srv.PrivateKey, srv.ListenAddr, srv.NAT, srv.NodeDatabase)
  254. if err != nil {
  255. return err
  256. }
  257. srv.ntab = ntab
  258. dialer := newDialState(srv.StaticNodes, srv.ntab, srv.MaxPeers/2)
  259. // handshake
  260. srv.ourHandshake = &protoHandshake{Version: baseProtocolVersion, Name: srv.Name, ID: ntab.Self().ID}
  261. for _, p := range srv.Protocols {
  262. srv.ourHandshake.Caps = append(srv.ourHandshake.Caps, p.cap())
  263. }
  264. // listen/dial
  265. if srv.ListenAddr != "" {
  266. if err := srv.startListening(); err != nil {
  267. return err
  268. }
  269. }
  270. if srv.NoDial && srv.ListenAddr == "" {
  271. glog.V(logger.Warn).Infoln("I will be kind-of useless, neither dialing nor listening.")
  272. }
  273. srv.loopWG.Add(1)
  274. go srv.run(dialer)
  275. srv.running = true
  276. return nil
  277. }
  278. func (srv *Server) startListening() error {
  279. // Launch the TCP listener.
  280. listener, err := net.Listen("tcp", srv.ListenAddr)
  281. if err != nil {
  282. return err
  283. }
  284. laddr := listener.Addr().(*net.TCPAddr)
  285. srv.ListenAddr = laddr.String()
  286. srv.listener = listener
  287. srv.loopWG.Add(1)
  288. go srv.listenLoop()
  289. // Map the TCP listening port if NAT is configured.
  290. if !laddr.IP.IsLoopback() && srv.NAT != nil {
  291. srv.loopWG.Add(1)
  292. go func() {
  293. nat.Map(srv.NAT, srv.quit, "tcp", laddr.Port, laddr.Port, "ethereum p2p")
  294. srv.loopWG.Done()
  295. }()
  296. }
  297. return nil
  298. }
  299. type dialer interface {
  300. newTasks(running int, peers map[discover.NodeID]*Peer, now time.Time) []task
  301. taskDone(task, time.Time)
  302. addStatic(*discover.Node)
  303. }
  304. func (srv *Server) run(dialstate dialer) {
  305. defer srv.loopWG.Done()
  306. var (
  307. peers = make(map[discover.NodeID]*Peer)
  308. trusted = make(map[discover.NodeID]bool, len(srv.TrustedNodes))
  309. tasks []task
  310. pendingTasks []task
  311. taskdone = make(chan task, maxActiveDialTasks)
  312. )
  313. // Put trusted nodes into a map to speed up checks.
  314. // Trusted peers are loaded on startup and cannot be
  315. // modified while the server is running.
  316. for _, n := range srv.TrustedNodes {
  317. trusted[n.ID] = true
  318. }
  319. // Some task list helpers.
  320. delTask := func(t task) {
  321. for i := range tasks {
  322. if tasks[i] == t {
  323. tasks = append(tasks[:i], tasks[i+1:]...)
  324. break
  325. }
  326. }
  327. }
  328. scheduleTasks := func(new []task) {
  329. pt := append(pendingTasks, new...)
  330. start := maxActiveDialTasks - len(tasks)
  331. if len(pt) < start {
  332. start = len(pt)
  333. }
  334. if start > 0 {
  335. tasks = append(tasks, pt[:start]...)
  336. for _, t := range pt[:start] {
  337. t := t
  338. glog.V(logger.Detail).Infoln("new task:", t)
  339. go func() { t.Do(srv); taskdone <- t }()
  340. }
  341. copy(pt, pt[start:])
  342. pendingTasks = pt[:len(pt)-start]
  343. }
  344. }
  345. running:
  346. for {
  347. // Query the dialer for new tasks and launch them.
  348. now := time.Now()
  349. nt := dialstate.newTasks(len(pendingTasks)+len(tasks), peers, now)
  350. scheduleTasks(nt)
  351. select {
  352. case <-srv.quit:
  353. // The server was stopped. Run the cleanup logic.
  354. glog.V(logger.Detail).Infoln("<-quit: spinning down")
  355. break running
  356. case n := <-srv.addstatic:
  357. // This channel is used by AddPeer to add to the
  358. // ephemeral static peer list. Add it to the dialer,
  359. // it will keep the node connected.
  360. glog.V(logger.Detail).Infoln("<-addstatic:", n)
  361. dialstate.addStatic(n)
  362. case op := <-srv.peerOp:
  363. // This channel is used by Peers and PeerCount.
  364. op(peers)
  365. srv.peerOpDone <- struct{}{}
  366. case t := <-taskdone:
  367. // A task got done. Tell dialstate about it so it
  368. // can update its state and remove it from the active
  369. // tasks list.
  370. glog.V(logger.Detail).Infoln("<-taskdone:", t)
  371. dialstate.taskDone(t, now)
  372. delTask(t)
  373. case c := <-srv.posthandshake:
  374. // A connection has passed the encryption handshake so
  375. // the remote identity is known (but hasn't been verified yet).
  376. if trusted[c.id] {
  377. // Ensure that the trusted flag is set before checking against MaxPeers.
  378. c.flags |= trustedConn
  379. }
  380. glog.V(logger.Detail).Infoln("<-posthandshake:", c)
  381. // TODO: track in-progress inbound node IDs (pre-Peer) to avoid dialing them.
  382. c.cont <- srv.encHandshakeChecks(peers, c)
  383. case c := <-srv.addpeer:
  384. // At this point the connection is past the protocol handshake.
  385. // Its capabilities are known and the remote identity is verified.
  386. glog.V(logger.Detail).Infoln("<-addpeer:", c)
  387. err := srv.protoHandshakeChecks(peers, c)
  388. if err != nil {
  389. glog.V(logger.Detail).Infof("Not adding %v as peer: %v", c, err)
  390. } else {
  391. // The handshakes are done and it passed all checks.
  392. p := newPeer(c, srv.Protocols)
  393. peers[c.id] = p
  394. go srv.runPeer(p)
  395. }
  396. // The dialer logic relies on the assumption that
  397. // dial tasks complete after the peer has been added or
  398. // discarded. Unblock the task last.
  399. c.cont <- err
  400. case p := <-srv.delpeer:
  401. // A peer disconnected.
  402. glog.V(logger.Detail).Infoln("<-delpeer:", p)
  403. delete(peers, p.ID())
  404. }
  405. }
  406. // Terminate discovery. If there is a running lookup it will terminate soon.
  407. srv.ntab.Close()
  408. // Disconnect all peers.
  409. for _, p := range peers {
  410. p.Disconnect(DiscQuitting)
  411. }
  412. // Wait for peers to shut down. Pending connections and tasks are
  413. // not handled here and will terminate soon-ish because srv.quit
  414. // is closed.
  415. glog.V(logger.Detail).Infof("ignoring %d pending tasks at spindown", len(tasks))
  416. for len(peers) > 0 {
  417. p := <-srv.delpeer
  418. glog.V(logger.Detail).Infoln("<-delpeer (spindown):", p)
  419. delete(peers, p.ID())
  420. }
  421. }
  422. func (srv *Server) protoHandshakeChecks(peers map[discover.NodeID]*Peer, c *conn) error {
  423. // Drop connections with no matching protocols.
  424. if len(srv.Protocols) > 0 && countMatchingProtocols(srv.Protocols, c.caps) == 0 {
  425. return DiscUselessPeer
  426. }
  427. // Repeat the encryption handshake checks because the
  428. // peer set might have changed between the handshakes.
  429. return srv.encHandshakeChecks(peers, c)
  430. }
  431. func (srv *Server) encHandshakeChecks(peers map[discover.NodeID]*Peer, c *conn) error {
  432. switch {
  433. case !c.is(trustedConn|staticDialedConn) && len(peers) >= srv.MaxPeers:
  434. return DiscTooManyPeers
  435. case peers[c.id] != nil:
  436. return DiscAlreadyConnected
  437. case c.id == srv.ntab.Self().ID:
  438. return DiscSelf
  439. default:
  440. return nil
  441. }
  442. }
  443. // listenLoop runs in its own goroutine and accepts
  444. // inbound connections.
  445. func (srv *Server) listenLoop() {
  446. defer srv.loopWG.Done()
  447. glog.V(logger.Info).Infoln("Listening on", srv.listener.Addr())
  448. // This channel acts as a semaphore limiting
  449. // active inbound connections that are lingering pre-handshake.
  450. // If all slots are taken, no further connections are accepted.
  451. tokens := maxAcceptConns
  452. if srv.MaxPendingPeers > 0 {
  453. tokens = srv.MaxPendingPeers
  454. }
  455. slots := make(chan struct{}, tokens)
  456. for i := 0; i < tokens; i++ {
  457. slots <- struct{}{}
  458. }
  459. for {
  460. <-slots
  461. fd, err := srv.listener.Accept()
  462. if err != nil {
  463. return
  464. }
  465. glog.V(logger.Debug).Infof("Accepted conn %v\n", fd.RemoteAddr())
  466. go func() {
  467. srv.setupConn(fd, inboundConn, nil)
  468. slots <- struct{}{}
  469. }()
  470. }
  471. }
  472. // setupConn runs the handshakes and attempts to add the connection
  473. // as a peer. It returns when the connection has been added as a peer
  474. // or the handshakes have failed.
  475. func (srv *Server) setupConn(fd net.Conn, flags connFlag, dialDest *discover.Node) {
  476. // Prevent leftover pending conns from entering the handshake.
  477. srv.lock.Lock()
  478. running := srv.running
  479. srv.lock.Unlock()
  480. c := &conn{fd: fd, transport: srv.newTransport(fd), flags: flags, cont: make(chan error)}
  481. if !running {
  482. c.close(errServerStopped)
  483. return
  484. }
  485. // Run the encryption handshake.
  486. var err error
  487. if c.id, err = c.doEncHandshake(srv.PrivateKey, dialDest); err != nil {
  488. glog.V(logger.Debug).Infof("%v faild enc handshake: %v", c, err)
  489. c.close(err)
  490. return
  491. }
  492. // For dialed connections, check that the remote public key matches.
  493. if dialDest != nil && c.id != dialDest.ID {
  494. c.close(DiscUnexpectedIdentity)
  495. glog.V(logger.Debug).Infof("%v dialed identity mismatch, want %x", c, dialDest.ID[:8])
  496. return
  497. }
  498. if err := srv.checkpoint(c, srv.posthandshake); err != nil {
  499. glog.V(logger.Debug).Infof("%v failed checkpoint posthandshake: %v", c, err)
  500. c.close(err)
  501. return
  502. }
  503. // Run the protocol handshake
  504. phs, err := c.doProtoHandshake(srv.ourHandshake)
  505. if err != nil {
  506. glog.V(logger.Debug).Infof("%v failed proto handshake: %v", c, err)
  507. c.close(err)
  508. return
  509. }
  510. if phs.ID != c.id {
  511. glog.V(logger.Debug).Infof("%v wrong proto handshake identity: %x", c, phs.ID[:8])
  512. c.close(DiscUnexpectedIdentity)
  513. return
  514. }
  515. c.caps, c.name = phs.Caps, phs.Name
  516. if err := srv.checkpoint(c, srv.addpeer); err != nil {
  517. glog.V(logger.Debug).Infof("%v failed checkpoint addpeer: %v", c, err)
  518. c.close(err)
  519. return
  520. }
  521. // If the checks completed successfully, runPeer has now been
  522. // launched by run.
  523. }
  524. // checkpoint sends the conn to run, which performs the
  525. // post-handshake checks for the stage (posthandshake, addpeer).
  526. func (srv *Server) checkpoint(c *conn, stage chan<- *conn) error {
  527. select {
  528. case stage <- c:
  529. case <-srv.quit:
  530. return errServerStopped
  531. }
  532. select {
  533. case err := <-c.cont:
  534. return err
  535. case <-srv.quit:
  536. return errServerStopped
  537. }
  538. }
  539. // runPeer runs in its own goroutine for each peer.
  540. // it waits until the Peer logic returns and removes
  541. // the peer.
  542. func (srv *Server) runPeer(p *Peer) {
  543. glog.V(logger.Debug).Infof("Added %v\n", p)
  544. srvjslog.LogJson(&logger.P2PConnected{
  545. RemoteId: p.ID().String(),
  546. RemoteAddress: p.RemoteAddr().String(),
  547. RemoteVersionString: p.Name(),
  548. NumConnections: srv.PeerCount(),
  549. })
  550. if srv.newPeerHook != nil {
  551. srv.newPeerHook(p)
  552. }
  553. discreason := p.run()
  554. // Note: run waits for existing peers to be sent on srv.delpeer
  555. // before returning, so this send should not select on srv.quit.
  556. srv.delpeer <- p
  557. glog.V(logger.Debug).Infof("Removed %v (%v)\n", p, discreason)
  558. srvjslog.LogJson(&logger.P2PDisconnected{
  559. RemoteId: p.ID().String(),
  560. NumConnections: srv.PeerCount(),
  561. })
  562. }