server.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568
  1. package p2p
  2. import (
  3. "crypto/ecdsa"
  4. "crypto/rand"
  5. "errors"
  6. "fmt"
  7. "net"
  8. "sync"
  9. "time"
  10. "github.com/ethereum/go-ethereum/logger"
  11. "github.com/ethereum/go-ethereum/logger/glog"
  12. "github.com/ethereum/go-ethereum/p2p/discover"
  13. "github.com/ethereum/go-ethereum/p2p/nat"
  14. )
  15. const (
  16. defaultDialTimeout = 15 * time.Second
  17. refreshPeersInterval = 30 * time.Second
  18. staticPeerCheckInterval = 15 * time.Second
  19. // Maximum number of concurrently handshaking inbound connections.
  20. maxAcceptConns = 50
  21. // Maximum number of concurrently dialing outbound connections.
  22. maxDialingConns = 10
  23. // total timeout for encryption handshake and protocol
  24. // handshake in both directions.
  25. handshakeTimeout = 5 * time.Second
  26. // maximum time allowed for reading a complete message.
  27. // this is effectively the amount of time a connection can be idle.
  28. frameReadTimeout = 1 * time.Minute
  29. // maximum amount of time allowed for writing a complete message.
  30. frameWriteTimeout = 5 * time.Second
  31. )
  32. var srvjslog = logger.NewJsonLogger()
  33. // Server manages all peer connections.
  34. //
  35. // The fields of Server are used as configuration parameters.
  36. // You should set them before starting the Server. Fields may not be
  37. // modified while the server is running.
  38. type Server struct {
  39. // This field must be set to a valid secp256k1 private key.
  40. PrivateKey *ecdsa.PrivateKey
  41. // MaxPeers is the maximum number of peers that can be
  42. // connected. It must be greater than zero.
  43. MaxPeers int
  44. // MaxPendingPeers is the maximum number of peers that can be pending in the
  45. // handshake phase, counted separately for inbound and outbound connections.
  46. // Zero defaults to preset values.
  47. MaxPendingPeers int
  48. // Name sets the node name of this server.
  49. // Use common.MakeName to create a name that follows existing conventions.
  50. Name string
  51. // Bootstrap nodes are used to establish connectivity
  52. // with the rest of the network.
  53. BootstrapNodes []*discover.Node
  54. // Static nodes are used as pre-configured connections which are always
  55. // maintained and re-connected on disconnects.
  56. StaticNodes []*discover.Node
  57. // Trusted nodes are used as pre-configured connections which are always
  58. // allowed to connect, even above the peer limit.
  59. TrustedNodes []*discover.Node
  60. // NodeDatabase is the path to the database containing the previously seen
  61. // live nodes in the network.
  62. NodeDatabase string
  63. // Protocols should contain the protocols supported
  64. // by the server. Matching protocols are launched for
  65. // each peer.
  66. Protocols []Protocol
  67. // If ListenAddr is set to a non-nil address, the server
  68. // will listen for incoming connections.
  69. //
  70. // If the port is zero, the operating system will pick a port. The
  71. // ListenAddr field will be updated with the actual address when
  72. // the server is started.
  73. ListenAddr string
  74. // If set to a non-nil value, the given NAT port mapper
  75. // is used to make the listening port available to the
  76. // Internet.
  77. NAT nat.Interface
  78. // If Dialer is set to a non-nil value, the given Dialer
  79. // is used to dial outbound peer connections.
  80. Dialer *net.Dialer
  81. // If NoDial is true, the server will not dial any peers.
  82. NoDial bool
  83. // Hooks for testing. These are useful because we can inhibit
  84. // the whole protocol stack.
  85. setupFunc
  86. newPeerHook
  87. ourHandshake *protoHandshake
  88. lock sync.RWMutex // protects running, peers and the trust fields
  89. running bool
  90. peers map[discover.NodeID]*Peer
  91. staticNodes map[discover.NodeID]*discover.Node // Map of currently maintained static remote nodes
  92. staticDial chan *discover.Node // Dial request channel reserved for the static nodes
  93. staticCycle time.Duration // Overrides staticPeerCheckInterval, used for testing
  94. trustedNodes map[discover.NodeID]bool // Set of currently trusted remote nodes
  95. ntab *discover.Table
  96. listener net.Listener
  97. quit chan struct{}
  98. loopWG sync.WaitGroup // {dial,listen,nat}Loop
  99. peerWG sync.WaitGroup // active peer goroutines
  100. }
  101. type setupFunc func(net.Conn, *ecdsa.PrivateKey, *protoHandshake, *discover.Node, func(discover.NodeID) bool) (*conn, error)
  102. type newPeerHook func(*Peer)
  103. // Peers returns all connected peers.
  104. func (srv *Server) Peers() (peers []*Peer) {
  105. srv.lock.RLock()
  106. defer srv.lock.RUnlock()
  107. for _, peer := range srv.peers {
  108. if peer != nil {
  109. peers = append(peers, peer)
  110. }
  111. }
  112. return
  113. }
  114. // PeerCount returns the number of connected peers.
  115. func (srv *Server) PeerCount() int {
  116. srv.lock.RLock()
  117. n := len(srv.peers)
  118. srv.lock.RUnlock()
  119. return n
  120. }
  121. // AddPeer connects to the given node and maintains the connection until the
  122. // server is shut down. If the connection fails for any reason, the server will
  123. // attempt to reconnect the peer.
  124. func (srv *Server) AddPeer(node *discover.Node) {
  125. srv.lock.Lock()
  126. defer srv.lock.Unlock()
  127. srv.staticNodes[node.ID] = node
  128. }
  129. // Start starts running the server.
  130. // Servers can be re-used and started again after stopping.
  131. func (srv *Server) Start() (err error) {
  132. srv.lock.Lock()
  133. defer srv.lock.Unlock()
  134. if srv.running {
  135. return errors.New("server already running")
  136. }
  137. glog.V(logger.Info).Infoln("Starting Server")
  138. // static fields
  139. if srv.PrivateKey == nil {
  140. return fmt.Errorf("Server.PrivateKey must be set to a non-nil key")
  141. }
  142. if srv.MaxPeers <= 0 {
  143. return fmt.Errorf("Server.MaxPeers must be > 0")
  144. }
  145. srv.quit = make(chan struct{})
  146. srv.peers = make(map[discover.NodeID]*Peer)
  147. // Create the current trust maps, and the associated dialing channel
  148. srv.trustedNodes = make(map[discover.NodeID]bool)
  149. for _, node := range srv.TrustedNodes {
  150. srv.trustedNodes[node.ID] = true
  151. }
  152. srv.staticNodes = make(map[discover.NodeID]*discover.Node)
  153. for _, node := range srv.StaticNodes {
  154. srv.staticNodes[node.ID] = node
  155. }
  156. srv.staticDial = make(chan *discover.Node)
  157. if srv.setupFunc == nil {
  158. srv.setupFunc = setupConn
  159. }
  160. // node table
  161. ntab, err := discover.ListenUDP(srv.PrivateKey, srv.ListenAddr, srv.NAT, srv.NodeDatabase)
  162. if err != nil {
  163. return err
  164. }
  165. srv.ntab = ntab
  166. // handshake
  167. srv.ourHandshake = &protoHandshake{Version: baseProtocolVersion, Name: srv.Name, ID: ntab.Self().ID}
  168. for _, p := range srv.Protocols {
  169. srv.ourHandshake.Caps = append(srv.ourHandshake.Caps, p.cap())
  170. }
  171. // listen/dial
  172. if srv.ListenAddr != "" {
  173. if err := srv.startListening(); err != nil {
  174. return err
  175. }
  176. }
  177. if srv.Dialer == nil {
  178. srv.Dialer = &net.Dialer{Timeout: defaultDialTimeout}
  179. }
  180. if !srv.NoDial {
  181. srv.loopWG.Add(1)
  182. go srv.dialLoop()
  183. }
  184. if srv.NoDial && srv.ListenAddr == "" {
  185. glog.V(logger.Warn).Infoln("I will be kind-of useless, neither dialing nor listening.")
  186. }
  187. // maintain the static peers
  188. go srv.staticNodesLoop()
  189. srv.running = true
  190. return nil
  191. }
  192. func (srv *Server) startListening() error {
  193. listener, err := net.Listen("tcp", srv.ListenAddr)
  194. if err != nil {
  195. return err
  196. }
  197. laddr := listener.Addr().(*net.TCPAddr)
  198. srv.ListenAddr = laddr.String()
  199. srv.listener = listener
  200. srv.loopWG.Add(1)
  201. go srv.listenLoop()
  202. if !laddr.IP.IsLoopback() && srv.NAT != nil {
  203. srv.loopWG.Add(1)
  204. go func() {
  205. nat.Map(srv.NAT, srv.quit, "tcp", laddr.Port, laddr.Port, "ethereum p2p")
  206. srv.loopWG.Done()
  207. }()
  208. }
  209. return nil
  210. }
  211. // Stop terminates the server and all active peer connections.
  212. // It blocks until all active connections have been closed.
  213. func (srv *Server) Stop() {
  214. srv.lock.Lock()
  215. if !srv.running {
  216. srv.lock.Unlock()
  217. return
  218. }
  219. srv.running = false
  220. srv.lock.Unlock()
  221. glog.V(logger.Info).Infoln("Stopping Server")
  222. srv.ntab.Close()
  223. if srv.listener != nil {
  224. // this unblocks listener Accept
  225. srv.listener.Close()
  226. }
  227. close(srv.quit)
  228. srv.loopWG.Wait()
  229. // No new peers can be added at this point because dialLoop and
  230. // listenLoop are down. It is safe to call peerWG.Wait because
  231. // peerWG.Add is not called outside of those loops.
  232. srv.lock.Lock()
  233. for _, peer := range srv.peers {
  234. peer.Disconnect(DiscQuitting)
  235. }
  236. srv.lock.Unlock()
  237. srv.peerWG.Wait()
  238. }
  239. // Self returns the local node's endpoint information.
  240. func (srv *Server) Self() *discover.Node {
  241. srv.lock.RLock()
  242. defer srv.lock.RUnlock()
  243. if !srv.running {
  244. return &discover.Node{IP: net.ParseIP("0.0.0.0")}
  245. }
  246. return srv.ntab.Self()
  247. }
  248. // main loop for adding connections via listening
  249. func (srv *Server) listenLoop() {
  250. defer srv.loopWG.Done()
  251. // This channel acts as a semaphore limiting
  252. // active inbound connections that are lingering pre-handshake.
  253. // If all slots are taken, no further connections are accepted.
  254. tokens := maxAcceptConns
  255. if srv.MaxPendingPeers > 0 {
  256. tokens = srv.MaxPendingPeers
  257. }
  258. slots := make(chan struct{}, tokens)
  259. for i := 0; i < tokens; i++ {
  260. slots <- struct{}{}
  261. }
  262. glog.V(logger.Info).Infoln("Listening on", srv.listener.Addr())
  263. for {
  264. <-slots
  265. conn, err := srv.listener.Accept()
  266. if err != nil {
  267. return
  268. }
  269. glog.V(logger.Debug).Infof("Accepted conn %v\n", conn.RemoteAddr())
  270. srv.peerWG.Add(1)
  271. go func() {
  272. srv.startPeer(conn, nil)
  273. slots <- struct{}{}
  274. }()
  275. }
  276. }
  277. // staticNodesLoop is responsible for periodically checking that static
  278. // connections are actually live, and requests dialing if not.
  279. func (srv *Server) staticNodesLoop() {
  280. // Create a default maintenance ticker, but override it requested
  281. cycle := staticPeerCheckInterval
  282. if srv.staticCycle != 0 {
  283. cycle = srv.staticCycle
  284. }
  285. tick := time.NewTicker(cycle)
  286. for {
  287. select {
  288. case <-srv.quit:
  289. return
  290. case <-tick.C:
  291. // Collect all the non-connected static nodes
  292. needed := []*discover.Node{}
  293. srv.lock.RLock()
  294. for id, node := range srv.staticNodes {
  295. if _, ok := srv.peers[id]; !ok {
  296. needed = append(needed, node)
  297. }
  298. }
  299. srv.lock.RUnlock()
  300. // Try to dial each of them (don't hang if server terminates)
  301. for _, node := range needed {
  302. glog.V(logger.Debug).Infof("Dialing static peer %v", node)
  303. select {
  304. case srv.staticDial <- node:
  305. case <-srv.quit:
  306. return
  307. }
  308. }
  309. }
  310. }
  311. }
  312. func (srv *Server) dialLoop() {
  313. var (
  314. dialed = make(chan *discover.Node)
  315. dialing = make(map[discover.NodeID]bool)
  316. findresults = make(chan []*discover.Node)
  317. refresh = time.NewTimer(0)
  318. )
  319. defer srv.loopWG.Done()
  320. defer refresh.Stop()
  321. // Limit the number of concurrent dials
  322. tokens := maxDialingConns
  323. if srv.MaxPendingPeers > 0 {
  324. tokens = srv.MaxPendingPeers
  325. }
  326. slots := make(chan struct{}, tokens)
  327. for i := 0; i < tokens; i++ {
  328. slots <- struct{}{}
  329. }
  330. dial := func(dest *discover.Node) {
  331. // Don't dial nodes that would fail the checks in addPeer.
  332. // This is important because the connection handshake is a lot
  333. // of work and we'd rather avoid doing that work for peers
  334. // that can't be added.
  335. srv.lock.RLock()
  336. ok, _ := srv.checkPeer(dest.ID)
  337. srv.lock.RUnlock()
  338. if !ok || dialing[dest.ID] {
  339. return
  340. }
  341. // Request a dial slot to prevent CPU exhaustion
  342. <-slots
  343. dialing[dest.ID] = true
  344. srv.peerWG.Add(1)
  345. go func() {
  346. srv.dialNode(dest)
  347. slots <- struct{}{}
  348. dialed <- dest
  349. }()
  350. }
  351. srv.ntab.Bootstrap(srv.BootstrapNodes)
  352. for {
  353. select {
  354. case <-refresh.C:
  355. // Grab some nodes to connect to if we're not at capacity.
  356. srv.lock.RLock()
  357. needpeers := len(srv.peers) < srv.MaxPeers/2
  358. srv.lock.RUnlock()
  359. if needpeers {
  360. go func() {
  361. var target discover.NodeID
  362. rand.Read(target[:])
  363. findresults <- srv.ntab.Lookup(target)
  364. }()
  365. } else {
  366. // Make sure we check again if the peer count falls
  367. // below MaxPeers.
  368. refresh.Reset(refreshPeersInterval)
  369. }
  370. case dest := <-srv.staticDial:
  371. dial(dest)
  372. case dests := <-findresults:
  373. for _, dest := range dests {
  374. dial(dest)
  375. }
  376. refresh.Reset(refreshPeersInterval)
  377. case dest := <-dialed:
  378. delete(dialing, dest.ID)
  379. if len(dialing) == 0 {
  380. // Check again immediately after dialing all current candidates.
  381. refresh.Reset(0)
  382. }
  383. case <-srv.quit:
  384. // TODO: maybe wait for active dials
  385. return
  386. }
  387. }
  388. }
  389. func (srv *Server) dialNode(dest *discover.Node) {
  390. addr := &net.TCPAddr{IP: dest.IP, Port: int(dest.TCP)}
  391. glog.V(logger.Debug).Infof("Dialing %v\n", dest)
  392. conn, err := srv.Dialer.Dial("tcp", addr.String())
  393. if err != nil {
  394. // dialLoop adds to the wait group counter when launching
  395. // dialNode, so we need to count it down again. startPeer also
  396. // does that when an error occurs.
  397. srv.peerWG.Done()
  398. glog.V(logger.Detail).Infof("dial error: %v", err)
  399. return
  400. }
  401. srv.startPeer(conn, dest)
  402. }
  403. func (srv *Server) startPeer(fd net.Conn, dest *discover.Node) {
  404. // TODO: handle/store session token
  405. // Run setupFunc, which should create an authenticated connection
  406. // and run the capability exchange. Note that any early error
  407. // returns during that exchange need to call peerWG.Done because
  408. // the callers of startPeer added the peer to the wait group already.
  409. fd.SetDeadline(time.Now().Add(handshakeTimeout))
  410. conn, err := srv.setupFunc(fd, srv.PrivateKey, srv.ourHandshake, dest, srv.keepconn)
  411. if err != nil {
  412. fd.Close()
  413. glog.V(logger.Debug).Infof("Handshake with %v failed: %v", fd.RemoteAddr(), err)
  414. srv.peerWG.Done()
  415. return
  416. }
  417. conn.MsgReadWriter = &netWrapper{
  418. wrapped: conn.MsgReadWriter,
  419. conn: fd, rtimeout: frameReadTimeout, wtimeout: frameWriteTimeout,
  420. }
  421. p := newPeer(fd, conn, srv.Protocols)
  422. if ok, reason := srv.addPeer(conn, p); !ok {
  423. glog.V(logger.Detail).Infof("Not adding %v (%v)\n", p, reason)
  424. p.politeDisconnect(reason)
  425. srv.peerWG.Done()
  426. return
  427. }
  428. // The handshakes are done and it passed all checks.
  429. // Spawn the Peer loops.
  430. go srv.runPeer(p)
  431. }
  432. // preflight checks whether a connection should be kept. it runs
  433. // after the encryption handshake, as soon as the remote identity is
  434. // known.
  435. func (srv *Server) keepconn(id discover.NodeID) bool {
  436. srv.lock.RLock()
  437. defer srv.lock.RUnlock()
  438. if _, ok := srv.staticNodes[id]; ok {
  439. return true // static nodes are always allowed
  440. }
  441. if _, ok := srv.trustedNodes[id]; ok {
  442. return true // trusted nodes are always allowed
  443. }
  444. return len(srv.peers) < srv.MaxPeers
  445. }
  446. func (srv *Server) runPeer(p *Peer) {
  447. glog.V(logger.Debug).Infof("Added %v\n", p)
  448. srvjslog.LogJson(&logger.P2PConnected{
  449. RemoteId: p.ID().String(),
  450. RemoteAddress: p.RemoteAddr().String(),
  451. RemoteVersionString: p.Name(),
  452. NumConnections: srv.PeerCount(),
  453. })
  454. if srv.newPeerHook != nil {
  455. srv.newPeerHook(p)
  456. }
  457. discreason := p.run()
  458. srv.removePeer(p)
  459. glog.V(logger.Debug).Infof("Removed %v (%v)\n", p, discreason)
  460. srvjslog.LogJson(&logger.P2PDisconnected{
  461. RemoteId: p.ID().String(),
  462. NumConnections: srv.PeerCount(),
  463. })
  464. }
  465. func (srv *Server) addPeer(conn *conn, p *Peer) (bool, DiscReason) {
  466. // drop connections with no matching protocols.
  467. if len(srv.Protocols) > 0 && countMatchingProtocols(srv.Protocols, conn.protoHandshake.Caps) == 0 {
  468. return false, DiscUselessPeer
  469. }
  470. // add the peer if it passes the other checks.
  471. srv.lock.Lock()
  472. defer srv.lock.Unlock()
  473. if ok, reason := srv.checkPeer(conn.ID); !ok {
  474. return false, reason
  475. }
  476. srv.peers[conn.ID] = p
  477. return true, 0
  478. }
  479. // checkPeer verifies whether a peer looks promising and should be allowed/kept
  480. // in the pool, or if it's of no use.
  481. func (srv *Server) checkPeer(id discover.NodeID) (bool, DiscReason) {
  482. // First up, figure out if the peer is static or trusted
  483. _, static := srv.staticNodes[id]
  484. trusted := srv.trustedNodes[id]
  485. // Make sure the peer passes all required checks
  486. switch {
  487. case !srv.running:
  488. return false, DiscQuitting
  489. case !static && !trusted && len(srv.peers) >= srv.MaxPeers:
  490. return false, DiscTooManyPeers
  491. case srv.peers[id] != nil:
  492. return false, DiscAlreadyConnected
  493. case id == srv.ntab.Self().ID:
  494. return false, DiscSelf
  495. default:
  496. return true, 0
  497. }
  498. }
  499. func (srv *Server) removePeer(p *Peer) {
  500. srv.lock.Lock()
  501. delete(srv.peers, p.ID())
  502. srv.lock.Unlock()
  503. srv.peerWG.Done()
  504. }