server.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484
  1. package p2p
  2. import (
  3. "bytes"
  4. "crypto/ecdsa"
  5. "errors"
  6. "fmt"
  7. "net"
  8. "runtime"
  9. "sync"
  10. "time"
  11. "github.com/ethereum/go-ethereum/ethutil"
  12. "github.com/ethereum/go-ethereum/logger"
  13. "github.com/ethereum/go-ethereum/p2p/discover"
  14. "github.com/ethereum/go-ethereum/p2p/nat"
  15. )
  16. const (
  17. defaultDialTimeout = 10 * time.Second
  18. refreshPeersInterval = 30 * time.Second
  19. // total timeout for encryption handshake and protocol
  20. // handshake in both directions.
  21. handshakeTimeout = 5 * time.Second
  22. // maximum time allowed for reading a complete message.
  23. // this is effectively the amount of time a connection can be idle.
  24. frameReadTimeout = 1 * time.Minute
  25. // maximum amount of time allowed for writing a complete message.
  26. frameWriteTimeout = 5 * time.Second
  27. )
  28. var srvlog = logger.NewLogger("P2P Server")
  29. var srvjslog = logger.NewJsonLogger()
  30. // MakeName creates a node name that follows the ethereum convention
  31. // for such names. It adds the operation system name and Go runtime version
  32. // the name.
  33. func MakeName(name, version string) string {
  34. return fmt.Sprintf("%s/v%s/%s/%s", name, version, runtime.GOOS, runtime.Version())
  35. }
  36. // Server manages all peer connections.
  37. //
  38. // The fields of Server are used as configuration parameters.
  39. // You should set them before starting the Server. Fields may not be
  40. // modified while the server is running.
  41. type Server struct {
  42. // This field must be set to a valid secp256k1 private key.
  43. PrivateKey *ecdsa.PrivateKey
  44. // MaxPeers is the maximum number of peers that can be
  45. // connected. It must be greater than zero.
  46. MaxPeers int
  47. // Name sets the node name of this server.
  48. // Use MakeName to create a name that follows existing conventions.
  49. Name string
  50. // Bootstrap nodes are used to establish connectivity
  51. // with the rest of the network.
  52. BootstrapNodes []*discover.Node
  53. // Protocols should contain the protocols supported
  54. // by the server. Matching protocols are launched for
  55. // each peer.
  56. Protocols []Protocol
  57. // If Blacklist is set to a non-nil value, the given Blacklist
  58. // is used to verify peer connections.
  59. Blacklist Blacklist
  60. // If ListenAddr is set to a non-nil address, the server
  61. // will listen for incoming connections.
  62. //
  63. // If the port is zero, the operating system will pick a port. The
  64. // ListenAddr field will be updated with the actual address when
  65. // the server is started.
  66. ListenAddr string
  67. // If set to a non-nil value, the given NAT port mapper
  68. // is used to make the listening port available to the
  69. // Internet.
  70. NAT nat.Interface
  71. // If Dialer is set to a non-nil value, the given Dialer
  72. // is used to dial outbound peer connections.
  73. Dialer *net.Dialer
  74. // If NoDial is true, the server will not dial any peers.
  75. NoDial bool
  76. // Hooks for testing. These are useful because we can inhibit
  77. // the whole protocol stack.
  78. setupFunc
  79. newPeerHook
  80. ourHandshake *protoHandshake
  81. lock sync.RWMutex
  82. running bool
  83. listener net.Listener
  84. peers map[discover.NodeID]*Peer
  85. ntab *discover.Table
  86. quit chan struct{}
  87. loopWG sync.WaitGroup // {dial,listen,nat}Loop
  88. peerWG sync.WaitGroup // active peer goroutines
  89. peerConnect chan *discover.Node
  90. }
  91. type setupFunc func(net.Conn, *ecdsa.PrivateKey, *protoHandshake, *discover.Node) (*conn, error)
  92. type newPeerHook func(*Peer)
  93. // Peers returns all connected peers.
  94. func (srv *Server) Peers() (peers []*Peer) {
  95. srv.lock.RLock()
  96. defer srv.lock.RUnlock()
  97. for _, peer := range srv.peers {
  98. if peer != nil {
  99. peers = append(peers, peer)
  100. }
  101. }
  102. return
  103. }
  104. // PeerCount returns the number of connected peers.
  105. func (srv *Server) PeerCount() int {
  106. srv.lock.RLock()
  107. n := len(srv.peers)
  108. srv.lock.RUnlock()
  109. return n
  110. }
  111. // SuggestPeer creates a connection to the given Node if it
  112. // is not already connected.
  113. func (srv *Server) SuggestPeer(n *discover.Node) {
  114. srv.peerConnect <- n
  115. }
  116. // Broadcast sends an RLP-encoded message to all connected peers.
  117. // This method is deprecated and will be removed later.
  118. func (srv *Server) Broadcast(protocol string, code uint64, data ...interface{}) {
  119. var payload []byte
  120. if data != nil {
  121. payload = ethutil.Encode(data)
  122. }
  123. srv.lock.RLock()
  124. defer srv.lock.RUnlock()
  125. for _, peer := range srv.peers {
  126. if peer != nil {
  127. var msg = Msg{Code: code}
  128. if data != nil {
  129. msg.Payload = bytes.NewReader(payload)
  130. msg.Size = uint32(len(payload))
  131. }
  132. peer.writeProtoMsg(protocol, msg)
  133. }
  134. }
  135. }
  136. // Start starts running the server.
  137. // Servers can be re-used and started again after stopping.
  138. func (srv *Server) Start() (err error) {
  139. srv.lock.Lock()
  140. defer srv.lock.Unlock()
  141. if srv.running {
  142. return errors.New("server already running")
  143. }
  144. srvlog.Infoln("Starting Server")
  145. // static fields
  146. if srv.PrivateKey == nil {
  147. return fmt.Errorf("Server.PrivateKey must be set to a non-nil key")
  148. }
  149. if srv.MaxPeers <= 0 {
  150. return fmt.Errorf("Server.MaxPeers must be > 0")
  151. }
  152. srv.quit = make(chan struct{})
  153. srv.peers = make(map[discover.NodeID]*Peer)
  154. srv.peerConnect = make(chan *discover.Node)
  155. if srv.setupFunc == nil {
  156. srv.setupFunc = setupConn
  157. }
  158. if srv.Blacklist == nil {
  159. srv.Blacklist = NewBlacklist()
  160. }
  161. // node table
  162. ntab, err := discover.ListenUDP(srv.PrivateKey, srv.ListenAddr, srv.NAT)
  163. if err != nil {
  164. return err
  165. }
  166. srv.ntab = ntab
  167. // handshake
  168. srv.ourHandshake = &protoHandshake{Version: baseProtocolVersion, Name: srv.Name, ID: ntab.Self()}
  169. for _, p := range srv.Protocols {
  170. srv.ourHandshake.Caps = append(srv.ourHandshake.Caps, p.cap())
  171. }
  172. // listen/dial
  173. if srv.ListenAddr != "" {
  174. if err := srv.startListening(); err != nil {
  175. return err
  176. }
  177. }
  178. if srv.Dialer == nil {
  179. srv.Dialer = &net.Dialer{Timeout: defaultDialTimeout}
  180. }
  181. if !srv.NoDial {
  182. srv.loopWG.Add(1)
  183. go srv.dialLoop()
  184. }
  185. if srv.NoDial && srv.ListenAddr == "" {
  186. srvlog.Warnln("I will be kind-of useless, neither dialing nor listening.")
  187. }
  188. srv.running = true
  189. return nil
  190. }
  191. func (srv *Server) startListening() error {
  192. listener, err := net.Listen("tcp", srv.ListenAddr)
  193. if err != nil {
  194. return err
  195. }
  196. laddr := listener.Addr().(*net.TCPAddr)
  197. srv.ListenAddr = laddr.String()
  198. srv.listener = listener
  199. srv.loopWG.Add(1)
  200. go srv.listenLoop()
  201. if !laddr.IP.IsLoopback() && srv.NAT != nil {
  202. srv.loopWG.Add(1)
  203. go func() {
  204. nat.Map(srv.NAT, srv.quit, "tcp", laddr.Port, laddr.Port, "ethereum p2p")
  205. srv.loopWG.Done()
  206. }()
  207. }
  208. return nil
  209. }
  210. // Stop terminates the server and all active peer connections.
  211. // It blocks until all active connections have been closed.
  212. func (srv *Server) Stop() {
  213. srv.lock.Lock()
  214. if !srv.running {
  215. srv.lock.Unlock()
  216. return
  217. }
  218. srv.running = false
  219. srv.lock.Unlock()
  220. srvlog.Infoln("Stopping Server")
  221. srv.ntab.Close()
  222. if srv.listener != nil {
  223. // this unblocks listener Accept
  224. srv.listener.Close()
  225. }
  226. close(srv.quit)
  227. srv.loopWG.Wait()
  228. // No new peers can be added at this point because dialLoop and
  229. // listenLoop are down. It is safe to call peerWG.Wait because
  230. // peerWG.Add is not called outside of those loops.
  231. for _, peer := range srv.peers {
  232. peer.Disconnect(DiscQuitting)
  233. }
  234. srv.peerWG.Wait()
  235. }
  236. // main loop for adding connections via listening
  237. func (srv *Server) listenLoop() {
  238. defer srv.loopWG.Done()
  239. srvlog.Infoln("Listening on", srv.listener.Addr())
  240. for {
  241. conn, err := srv.listener.Accept()
  242. if err != nil {
  243. return
  244. }
  245. srvlog.Debugf("Accepted conn %v\n", conn.RemoteAddr())
  246. srv.peerWG.Add(1)
  247. go srv.startPeer(conn, nil)
  248. }
  249. }
  250. func (srv *Server) dialLoop() {
  251. defer srv.loopWG.Done()
  252. refresh := time.NewTicker(refreshPeersInterval)
  253. defer refresh.Stop()
  254. srv.ntab.Bootstrap(srv.BootstrapNodes)
  255. go srv.findPeers()
  256. dialed := make(chan *discover.Node)
  257. dialing := make(map[discover.NodeID]bool)
  258. // TODO: limit number of active dials
  259. // TODO: ensure only one findPeers goroutine is running
  260. // TODO: pause findPeers when we're at capacity
  261. for {
  262. select {
  263. case <-refresh.C:
  264. go srv.findPeers()
  265. case dest := <-srv.peerConnect:
  266. // avoid dialing nodes that are already connected.
  267. // there is another check for this in addPeer,
  268. // which runs after the handshake.
  269. srv.lock.Lock()
  270. _, isconnected := srv.peers[dest.ID]
  271. srv.lock.Unlock()
  272. if isconnected || dialing[dest.ID] || dest.ID == srv.ntab.Self() {
  273. continue
  274. }
  275. dialing[dest.ID] = true
  276. srv.peerWG.Add(1)
  277. go func() {
  278. srv.dialNode(dest)
  279. // at this point, the peer has been added
  280. // or discarded. either way, we're not dialing it anymore.
  281. dialed <- dest
  282. }()
  283. case dest := <-dialed:
  284. delete(dialing, dest.ID)
  285. case <-srv.quit:
  286. // TODO: maybe wait for active dials
  287. return
  288. }
  289. }
  290. }
  291. func (srv *Server) dialNode(dest *discover.Node) {
  292. addr := &net.TCPAddr{IP: dest.IP, Port: dest.TCPPort}
  293. srvlog.Debugf("Dialing %v\n", dest)
  294. conn, err := srv.Dialer.Dial("tcp", addr.String())
  295. if err != nil {
  296. srvlog.DebugDetailf("dial error: %v", err)
  297. return
  298. }
  299. srv.startPeer(conn, dest)
  300. }
  301. func (srv *Server) findPeers() {
  302. far := srv.ntab.Self()
  303. for i := range far {
  304. far[i] = ^far[i]
  305. }
  306. closeToSelf := srv.ntab.Lookup(srv.ntab.Self())
  307. farFromSelf := srv.ntab.Lookup(far)
  308. for i := 0; i < len(closeToSelf) || i < len(farFromSelf); i++ {
  309. if i < len(closeToSelf) {
  310. srv.peerConnect <- closeToSelf[i]
  311. }
  312. if i < len(farFromSelf) {
  313. srv.peerConnect <- farFromSelf[i]
  314. }
  315. }
  316. }
  317. func (srv *Server) startPeer(fd net.Conn, dest *discover.Node) {
  318. // TODO: handle/store session token
  319. fd.SetDeadline(time.Now().Add(handshakeTimeout))
  320. conn, err := srv.setupFunc(fd, srv.PrivateKey, srv.ourHandshake, dest)
  321. if err != nil {
  322. fd.Close()
  323. srvlog.Debugf("Handshake with %v failed: %v", fd.RemoteAddr(), err)
  324. return
  325. }
  326. conn.MsgReadWriter = &netWrapper{
  327. wrapped: conn.MsgReadWriter,
  328. conn: fd, rtimeout: frameReadTimeout, wtimeout: frameWriteTimeout,
  329. }
  330. p := newPeer(fd, conn, srv.Protocols)
  331. if ok, reason := srv.addPeer(conn.ID, p); !ok {
  332. srvlog.DebugDetailf("Not adding %v (%v)\n", p, reason)
  333. p.politeDisconnect(reason)
  334. return
  335. }
  336. srvlog.Debugf("Added %v\n", p)
  337. srvjslog.LogJson(&logger.P2PConnected{
  338. RemoteId: fmt.Sprintf("%x", conn.ID[:]),
  339. RemoteAddress: fd.RemoteAddr().String(),
  340. RemoteVersionString: conn.Name,
  341. NumConnections: srv.PeerCount(),
  342. })
  343. if srv.newPeerHook != nil {
  344. srv.newPeerHook(p)
  345. }
  346. discreason := p.run()
  347. srv.removePeer(p)
  348. srvlog.Debugf("Removed %v (%v)\n", p, discreason)
  349. srvjslog.LogJson(&logger.P2PDisconnected{
  350. RemoteId: fmt.Sprintf("%x", conn.ID[:]),
  351. NumConnections: srv.PeerCount(),
  352. })
  353. }
  354. func (srv *Server) addPeer(id discover.NodeID, p *Peer) (bool, DiscReason) {
  355. srv.lock.Lock()
  356. defer srv.lock.Unlock()
  357. switch {
  358. case !srv.running:
  359. return false, DiscQuitting
  360. case len(srv.peers) >= srv.MaxPeers:
  361. return false, DiscTooManyPeers
  362. case srv.peers[id] != nil:
  363. return false, DiscAlreadyConnected
  364. case srv.Blacklist.Exists(id[:]):
  365. return false, DiscUselessPeer
  366. case id == srv.ntab.Self():
  367. return false, DiscSelf
  368. }
  369. srv.peers[id] = p
  370. return true, 0
  371. }
  372. func (srv *Server) removePeer(p *Peer) {
  373. srv.lock.Lock()
  374. delete(srv.peers, p.ID())
  375. srv.lock.Unlock()
  376. srv.peerWG.Done()
  377. }
  378. type Blacklist interface {
  379. Get([]byte) (bool, error)
  380. Put([]byte) error
  381. Delete([]byte) error
  382. Exists(pubkey []byte) (ok bool)
  383. }
  384. type BlacklistMap struct {
  385. blacklist map[string]bool
  386. lock sync.RWMutex
  387. }
  388. func NewBlacklist() *BlacklistMap {
  389. return &BlacklistMap{
  390. blacklist: make(map[string]bool),
  391. }
  392. }
  393. func (self *BlacklistMap) Get(pubkey []byte) (bool, error) {
  394. self.lock.RLock()
  395. defer self.lock.RUnlock()
  396. v, ok := self.blacklist[string(pubkey)]
  397. var err error
  398. if !ok {
  399. err = fmt.Errorf("not found")
  400. }
  401. return v, err
  402. }
  403. func (self *BlacklistMap) Exists(pubkey []byte) (ok bool) {
  404. self.lock.RLock()
  405. defer self.lock.RUnlock()
  406. _, ok = self.blacklist[string(pubkey)]
  407. return
  408. }
  409. func (self *BlacklistMap) Put(pubkey []byte) error {
  410. self.lock.Lock()
  411. defer self.lock.Unlock()
  412. self.blacklist[string(pubkey)] = true
  413. return nil
  414. }
  415. func (self *BlacklistMap) Delete(pubkey []byte) error {
  416. self.lock.Lock()
  417. defer self.lock.Unlock()
  418. delete(self.blacklist, string(pubkey))
  419. return nil
  420. }