server.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431
  1. package p2p
  2. import (
  3. "bytes"
  4. "crypto/ecdsa"
  5. "crypto/rand"
  6. "errors"
  7. "fmt"
  8. "net"
  9. "sync"
  10. "time"
  11. "github.com/ethereum/go-ethereum/logger"
  12. "github.com/ethereum/go-ethereum/logger/glog"
  13. "github.com/ethereum/go-ethereum/p2p/discover"
  14. "github.com/ethereum/go-ethereum/p2p/nat"
  15. "github.com/ethereum/go-ethereum/rlp"
  16. )
  17. const (
  18. defaultDialTimeout = 10 * time.Second
  19. refreshPeersInterval = 30 * time.Second
  20. // total timeout for encryption handshake and protocol
  21. // handshake in both directions.
  22. handshakeTimeout = 5 * time.Second
  23. // maximum time allowed for reading a complete message.
  24. // this is effectively the amount of time a connection can be idle.
  25. frameReadTimeout = 1 * time.Minute
  26. // maximum amount of time allowed for writing a complete message.
  27. frameWriteTimeout = 5 * time.Second
  28. )
  29. var srvjslog = logger.NewJsonLogger()
  30. // Server manages all peer connections.
  31. //
  32. // The fields of Server are used as configuration parameters.
  33. // You should set them before starting the Server. Fields may not be
  34. // modified while the server is running.
  35. type Server struct {
  36. // This field must be set to a valid secp256k1 private key.
  37. PrivateKey *ecdsa.PrivateKey
  38. // MaxPeers is the maximum number of peers that can be
  39. // connected. It must be greater than zero.
  40. MaxPeers int
  41. // Name sets the node name of this server.
  42. // Use common.MakeName to create a name that follows existing conventions.
  43. Name string
  44. // Bootstrap nodes are used to establish connectivity
  45. // with the rest of the network.
  46. BootstrapNodes []*discover.Node
  47. // Protocols should contain the protocols supported
  48. // by the server. Matching protocols are launched for
  49. // each peer.
  50. Protocols []Protocol
  51. // If ListenAddr is set to a non-nil address, the server
  52. // will listen for incoming connections.
  53. //
  54. // If the port is zero, the operating system will pick a port. The
  55. // ListenAddr field will be updated with the actual address when
  56. // the server is started.
  57. ListenAddr string
  58. // If set to a non-nil value, the given NAT port mapper
  59. // is used to make the listening port available to the
  60. // Internet.
  61. NAT nat.Interface
  62. // If Dialer is set to a non-nil value, the given Dialer
  63. // is used to dial outbound peer connections.
  64. Dialer *net.Dialer
  65. // If NoDial is true, the server will not dial any peers.
  66. NoDial bool
  67. // Hooks for testing. These are useful because we can inhibit
  68. // the whole protocol stack.
  69. setupFunc
  70. newPeerHook
  71. ourHandshake *protoHandshake
  72. lock sync.RWMutex
  73. running bool
  74. listener net.Listener
  75. peers map[discover.NodeID]*Peer
  76. ntab *discover.Table
  77. quit chan struct{}
  78. loopWG sync.WaitGroup // {dial,listen,nat}Loop
  79. peerWG sync.WaitGroup // active peer goroutines
  80. peerConnect chan *discover.Node
  81. }
  82. type setupFunc func(net.Conn, *ecdsa.PrivateKey, *protoHandshake, *discover.Node) (*conn, error)
  83. type newPeerHook func(*Peer)
  84. // Peers returns all connected peers.
  85. func (srv *Server) Peers() (peers []*Peer) {
  86. srv.lock.RLock()
  87. defer srv.lock.RUnlock()
  88. for _, peer := range srv.peers {
  89. if peer != nil {
  90. peers = append(peers, peer)
  91. }
  92. }
  93. return
  94. }
  95. // PeerCount returns the number of connected peers.
  96. func (srv *Server) PeerCount() int {
  97. srv.lock.RLock()
  98. n := len(srv.peers)
  99. srv.lock.RUnlock()
  100. return n
  101. }
  102. // SuggestPeer creates a connection to the given Node if it
  103. // is not already connected.
  104. func (srv *Server) SuggestPeer(n *discover.Node) {
  105. srv.peerConnect <- n
  106. }
  107. // Broadcast sends an RLP-encoded message to all connected peers.
  108. // This method is deprecated and will be removed later.
  109. func (srv *Server) Broadcast(protocol string, code uint64, data interface{}) error {
  110. var payload []byte
  111. if data != nil {
  112. var err error
  113. payload, err = rlp.EncodeToBytes(data)
  114. if err != nil {
  115. return err
  116. }
  117. }
  118. srv.lock.RLock()
  119. defer srv.lock.RUnlock()
  120. for _, peer := range srv.peers {
  121. if peer != nil {
  122. var msg = Msg{Code: code}
  123. if data != nil {
  124. msg.Payload = bytes.NewReader(payload)
  125. msg.Size = uint32(len(payload))
  126. }
  127. peer.writeProtoMsg(protocol, msg)
  128. }
  129. }
  130. return nil
  131. }
  132. // Start starts running the server.
  133. // Servers can be re-used and started again after stopping.
  134. func (srv *Server) Start() (err error) {
  135. srv.lock.Lock()
  136. defer srv.lock.Unlock()
  137. if srv.running {
  138. return errors.New("server already running")
  139. }
  140. glog.V(logger.Info).Infoln("Starting Server")
  141. // static fields
  142. if srv.PrivateKey == nil {
  143. return fmt.Errorf("Server.PrivateKey must be set to a non-nil key")
  144. }
  145. if srv.MaxPeers <= 0 {
  146. return fmt.Errorf("Server.MaxPeers must be > 0")
  147. }
  148. srv.quit = make(chan struct{})
  149. srv.peers = make(map[discover.NodeID]*Peer)
  150. srv.peerConnect = make(chan *discover.Node)
  151. if srv.setupFunc == nil {
  152. srv.setupFunc = setupConn
  153. }
  154. // node table
  155. ntab, err := discover.ListenUDP(srv.PrivateKey, srv.ListenAddr, srv.NAT)
  156. if err != nil {
  157. return err
  158. }
  159. srv.ntab = ntab
  160. // handshake
  161. srv.ourHandshake = &protoHandshake{Version: baseProtocolVersion, Name: srv.Name, ID: ntab.Self().ID}
  162. for _, p := range srv.Protocols {
  163. srv.ourHandshake.Caps = append(srv.ourHandshake.Caps, p.cap())
  164. }
  165. // listen/dial
  166. if srv.ListenAddr != "" {
  167. if err := srv.startListening(); err != nil {
  168. return err
  169. }
  170. }
  171. if srv.Dialer == nil {
  172. srv.Dialer = &net.Dialer{Timeout: defaultDialTimeout}
  173. }
  174. if !srv.NoDial {
  175. srv.loopWG.Add(1)
  176. go srv.dialLoop()
  177. }
  178. if srv.NoDial && srv.ListenAddr == "" {
  179. glog.V(logger.Warn).Infoln("I will be kind-of useless, neither dialing nor listening.")
  180. }
  181. srv.running = true
  182. return nil
  183. }
  184. func (srv *Server) startListening() error {
  185. listener, err := net.Listen("tcp", srv.ListenAddr)
  186. if err != nil {
  187. return err
  188. }
  189. laddr := listener.Addr().(*net.TCPAddr)
  190. srv.ListenAddr = laddr.String()
  191. srv.listener = listener
  192. srv.loopWG.Add(1)
  193. go srv.listenLoop()
  194. if !laddr.IP.IsLoopback() && srv.NAT != nil {
  195. srv.loopWG.Add(1)
  196. go func() {
  197. nat.Map(srv.NAT, srv.quit, "tcp", laddr.Port, laddr.Port, "ethereum p2p")
  198. srv.loopWG.Done()
  199. }()
  200. }
  201. return nil
  202. }
  203. // Stop terminates the server and all active peer connections.
  204. // It blocks until all active connections have been closed.
  205. func (srv *Server) Stop() {
  206. srv.lock.Lock()
  207. if !srv.running {
  208. srv.lock.Unlock()
  209. return
  210. }
  211. srv.running = false
  212. srv.lock.Unlock()
  213. glog.V(logger.Info).Infoln("Stopping Server")
  214. srv.ntab.Close()
  215. if srv.listener != nil {
  216. // this unblocks listener Accept
  217. srv.listener.Close()
  218. }
  219. close(srv.quit)
  220. srv.loopWG.Wait()
  221. // No new peers can be added at this point because dialLoop and
  222. // listenLoop are down. It is safe to call peerWG.Wait because
  223. // peerWG.Add is not called outside of those loops.
  224. for _, peer := range srv.peers {
  225. peer.Disconnect(DiscQuitting)
  226. }
  227. srv.peerWG.Wait()
  228. }
  229. // main loop for adding connections via listening
  230. func (srv *Server) listenLoop() {
  231. defer srv.loopWG.Done()
  232. glog.V(logger.Info).Infoln("Listening on", srv.listener.Addr())
  233. for {
  234. conn, err := srv.listener.Accept()
  235. if err != nil {
  236. return
  237. }
  238. glog.V(logger.Debug).Infof("Accepted conn %v\n", conn.RemoteAddr())
  239. srv.peerWG.Add(1)
  240. go srv.startPeer(conn, nil)
  241. }
  242. }
  243. func (srv *Server) dialLoop() {
  244. var (
  245. dialed = make(chan *discover.Node)
  246. dialing = make(map[discover.NodeID]bool)
  247. findresults = make(chan []*discover.Node)
  248. refresh = time.NewTimer(0)
  249. )
  250. defer srv.loopWG.Done()
  251. defer refresh.Stop()
  252. // TODO: maybe limit number of active dials
  253. dial := func(dest *discover.Node) {
  254. srv.lock.Lock()
  255. ok, _ := srv.checkPeer(dest.ID)
  256. srv.lock.Unlock()
  257. // Don't dial nodes that would fail the checks in addPeer.
  258. // This is important because the connection handshake is a lot
  259. // of work and we'd rather avoid doing that work for peers
  260. // that can't be added.
  261. if !ok || dialing[dest.ID] {
  262. return
  263. }
  264. dialing[dest.ID] = true
  265. srv.peerWG.Add(1)
  266. go func() {
  267. srv.dialNode(dest)
  268. dialed <- dest
  269. }()
  270. }
  271. srv.ntab.Bootstrap(srv.BootstrapNodes)
  272. for {
  273. select {
  274. case <-refresh.C:
  275. srv.lock.Lock()
  276. needpeers := len(srv.peers) < srv.MaxPeers
  277. srv.lock.Unlock()
  278. if needpeers {
  279. go func() {
  280. var target discover.NodeID
  281. rand.Read(target[:])
  282. findresults <- srv.ntab.Lookup(target)
  283. }()
  284. refresh.Stop()
  285. }
  286. case dest := <-srv.peerConnect:
  287. dial(dest)
  288. case dests := <-findresults:
  289. for _, dest := range dests {
  290. dial(dest)
  291. }
  292. refresh.Reset(refreshPeersInterval)
  293. case dest := <-dialed:
  294. delete(dialing, dest.ID)
  295. case <-srv.quit:
  296. // TODO: maybe wait for active dials
  297. return
  298. }
  299. }
  300. }
  301. func (srv *Server) dialNode(dest *discover.Node) {
  302. addr := &net.TCPAddr{IP: dest.IP, Port: dest.TCPPort}
  303. glog.V(logger.Debug).Infof("Dialing %v\n", dest)
  304. conn, err := srv.Dialer.Dial("tcp", addr.String())
  305. if err != nil {
  306. glog.V(logger.Detail).Infof("dial error: %v", err)
  307. return
  308. }
  309. srv.startPeer(conn, dest)
  310. }
  311. func (srv *Server) Self() *discover.Node {
  312. return srv.ntab.Self()
  313. }
  314. func (srv *Server) startPeer(fd net.Conn, dest *discover.Node) {
  315. // TODO: handle/store session token
  316. fd.SetDeadline(time.Now().Add(handshakeTimeout))
  317. conn, err := srv.setupFunc(fd, srv.PrivateKey, srv.ourHandshake, dest)
  318. if err != nil {
  319. fd.Close()
  320. glog.V(logger.Debug).Infof("Handshake with %v failed: %v", fd.RemoteAddr(), err)
  321. return
  322. }
  323. conn.MsgReadWriter = &netWrapper{
  324. wrapped: conn.MsgReadWriter,
  325. conn: fd, rtimeout: frameReadTimeout, wtimeout: frameWriteTimeout,
  326. }
  327. p := newPeer(fd, conn, srv.Protocols)
  328. if ok, reason := srv.addPeer(conn.ID, p); !ok {
  329. glog.V(logger.Detail).Infof("Not adding %v (%v)\n", p, reason)
  330. p.politeDisconnect(reason)
  331. return
  332. }
  333. // The handshakes are done and it passed all checks.
  334. // Spawn the Peer loops.
  335. go srv.runPeer(p)
  336. }
  337. func (srv *Server) runPeer(p *Peer) {
  338. glog.V(logger.Debug).Infof("Added %v\n", p)
  339. srvjslog.LogJson(&logger.P2PConnected{
  340. RemoteId: p.ID().String(),
  341. RemoteAddress: p.RemoteAddr().String(),
  342. RemoteVersionString: p.Name(),
  343. NumConnections: srv.PeerCount(),
  344. })
  345. if srv.newPeerHook != nil {
  346. srv.newPeerHook(p)
  347. }
  348. discreason := p.run()
  349. srv.removePeer(p)
  350. glog.V(logger.Debug).Infof("Removed %v (%v)\n", p, discreason)
  351. srvjslog.LogJson(&logger.P2PDisconnected{
  352. RemoteId: p.ID().String(),
  353. NumConnections: srv.PeerCount(),
  354. })
  355. }
  356. func (srv *Server) addPeer(id discover.NodeID, p *Peer) (bool, DiscReason) {
  357. srv.lock.Lock()
  358. defer srv.lock.Unlock()
  359. if ok, reason := srv.checkPeer(id); !ok {
  360. return false, reason
  361. }
  362. srv.peers[id] = p
  363. return true, 0
  364. }
  365. func (srv *Server) checkPeer(id discover.NodeID) (bool, DiscReason) {
  366. switch {
  367. case !srv.running:
  368. return false, DiscQuitting
  369. case len(srv.peers) >= srv.MaxPeers:
  370. return false, DiscTooManyPeers
  371. case srv.peers[id] != nil:
  372. return false, DiscAlreadyConnected
  373. case id == srv.Self().ID:
  374. return false, DiscSelf
  375. default:
  376. return true, 0
  377. }
  378. }
  379. func (srv *Server) removePeer(p *Peer) {
  380. srv.lock.Lock()
  381. delete(srv.peers, p.ID())
  382. srv.lock.Unlock()
  383. srv.peerWG.Done()
  384. }