server.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495
  1. package p2p
  2. import (
  3. "bytes"
  4. "crypto/ecdsa"
  5. "errors"
  6. "fmt"
  7. "io"
  8. "net"
  9. "runtime"
  10. "sync"
  11. "time"
  12. "github.com/ethereum/go-ethereum/logger"
  13. "github.com/ethereum/go-ethereum/p2p/discover"
  14. )
  15. const (
  16. defaultDialTimeout = 10 * time.Second
  17. refreshPeersInterval = 30 * time.Second
  18. portMappingUpdateInterval = 15 * time.Minute
  19. portMappingTimeout = 20 * time.Minute
  20. )
  21. var srvlog = logger.NewLogger("P2P Server")
  22. // MakeName creates a node name that follows the ethereum convention
  23. // for such names. It adds the operation system name and Go runtime version
  24. // the name.
  25. func MakeName(name, version string) string {
  26. return fmt.Sprintf("%s/v%s/%s/%s", name, version, runtime.GOOS, runtime.Version())
  27. }
  28. // Server manages all peer connections.
  29. //
  30. // The fields of Server are used as configuration parameters.
  31. // You should set them before starting the Server. Fields may not be
  32. // modified while the server is running.
  33. type Server struct {
  34. // This field must be set to a valid secp256k1 private key.
  35. PrivateKey *ecdsa.PrivateKey
  36. // MaxPeers is the maximum number of peers that can be
  37. // connected. It must be greater than zero.
  38. MaxPeers int
  39. // Name sets the node name of this server.
  40. // Use MakeName to create a name that follows existing conventions.
  41. Name string
  42. // Bootstrap nodes are used to establish connectivity
  43. // with the rest of the network.
  44. BootstrapNodes []discover.Node
  45. // Protocols should contain the protocols supported
  46. // by the server. Matching protocols are launched for
  47. // each peer.
  48. Protocols []Protocol
  49. // If Blacklist is set to a non-nil value, the given Blacklist
  50. // is used to verify peer connections.
  51. Blacklist Blacklist
  52. // If ListenAddr is set to a non-nil address, the server
  53. // will listen for incoming connections.
  54. //
  55. // If the port is zero, the operating system will pick a port. The
  56. // ListenAddr field will be updated with the actual address when
  57. // the server is started.
  58. ListenAddr string
  59. // If set to a non-nil value, the given NAT port mapper
  60. // is used to make the listening port available to the
  61. // Internet.
  62. NAT NAT
  63. // If Dialer is set to a non-nil value, the given Dialer
  64. // is used to dial outbound peer connections.
  65. Dialer *net.Dialer
  66. // If NoDial is true, the server will not dial any peers.
  67. NoDial bool
  68. // Hooks for testing. These are useful because we can inhibit
  69. // the whole protocol stack.
  70. handshakeFunc
  71. newPeerHook
  72. lock sync.RWMutex
  73. running bool
  74. listener net.Listener
  75. laddr *net.TCPAddr // real listen addr
  76. peers map[discover.NodeID]*Peer
  77. ntab *discover.Table
  78. quit chan struct{}
  79. loopWG sync.WaitGroup // {dial,listen,nat}Loop
  80. peerWG sync.WaitGroup // active peer goroutines
  81. peerConnect chan *discover.Node
  82. }
  83. // NAT is implemented by NAT traversal methods.
  84. type NAT interface {
  85. GetExternalAddress() (net.IP, error)
  86. AddPortMapping(protocol string, extport, intport int, name string, lifetime time.Duration) error
  87. DeletePortMapping(protocol string, extport, intport int) error
  88. // Should return name of the method.
  89. String() string
  90. }
  91. type handshakeFunc func(io.ReadWriter, *ecdsa.PrivateKey, *discover.Node) (discover.NodeID, []byte, error)
  92. type newPeerHook func(*Peer)
  93. // Peers returns all connected peers.
  94. func (srv *Server) Peers() (peers []*Peer) {
  95. srv.lock.RLock()
  96. defer srv.lock.RUnlock()
  97. for _, peer := range srv.peers {
  98. if peer != nil {
  99. peers = append(peers, peer)
  100. }
  101. }
  102. return
  103. }
  104. // PeerCount returns the number of connected peers.
  105. func (srv *Server) PeerCount() int {
  106. srv.lock.RLock()
  107. n := len(srv.peers)
  108. srv.lock.RUnlock()
  109. return n
  110. }
  111. // SuggestPeer creates a connection to the given Node if it
  112. // is not already connected.
  113. func (srv *Server) SuggestPeer(ip net.IP, port int, id discover.NodeID) {
  114. srv.peerConnect <- &discover.Node{ID: id, IP: ip, TCPPort: port}
  115. }
  116. // Broadcast sends an RLP-encoded message to all connected peers.
  117. // This method is deprecated and will be removed later.
  118. func (srv *Server) Broadcast(protocol string, code uint64, data ...interface{}) {
  119. var payload []byte
  120. if data != nil {
  121. payload = encodePayload(data...)
  122. }
  123. srv.lock.RLock()
  124. defer srv.lock.RUnlock()
  125. for _, peer := range srv.peers {
  126. if peer != nil {
  127. var msg = Msg{Code: code}
  128. if data != nil {
  129. msg.Payload = bytes.NewReader(payload)
  130. msg.Size = uint32(len(payload))
  131. }
  132. peer.writeProtoMsg(protocol, msg)
  133. }
  134. }
  135. }
  136. // Start starts running the server.
  137. // Servers can be re-used and started again after stopping.
  138. func (srv *Server) Start() (err error) {
  139. srv.lock.Lock()
  140. defer srv.lock.Unlock()
  141. if srv.running {
  142. return errors.New("server already running")
  143. }
  144. srvlog.Infoln("Starting Server")
  145. // initialize all the fields
  146. if srv.PrivateKey == nil {
  147. return fmt.Errorf("Server.PrivateKey must be set to a non-nil key")
  148. }
  149. if srv.MaxPeers <= 0 {
  150. return fmt.Errorf("Server.MaxPeers must be > 0")
  151. }
  152. srv.quit = make(chan struct{})
  153. srv.peers = make(map[discover.NodeID]*Peer)
  154. srv.peerConnect = make(chan *discover.Node)
  155. if srv.handshakeFunc == nil {
  156. srv.handshakeFunc = encHandshake
  157. }
  158. if srv.Blacklist == nil {
  159. srv.Blacklist = NewBlacklist()
  160. }
  161. if srv.ListenAddr != "" {
  162. if err := srv.startListening(); err != nil {
  163. return err
  164. }
  165. }
  166. // dial stuff
  167. dt, err := discover.ListenUDP(srv.PrivateKey, srv.ListenAddr)
  168. if err != nil {
  169. return err
  170. }
  171. srv.ntab = dt
  172. if srv.Dialer == nil {
  173. srv.Dialer = &net.Dialer{Timeout: defaultDialTimeout}
  174. }
  175. if !srv.NoDial {
  176. srv.loopWG.Add(1)
  177. go srv.dialLoop()
  178. }
  179. if srv.NoDial && srv.ListenAddr == "" {
  180. srvlog.Warnln("I will be kind-of useless, neither dialing nor listening.")
  181. }
  182. srv.running = true
  183. return nil
  184. }
  185. func (srv *Server) startListening() error {
  186. listener, err := net.Listen("tcp", srv.ListenAddr)
  187. if err != nil {
  188. return err
  189. }
  190. srv.ListenAddr = listener.Addr().String()
  191. srv.laddr = listener.Addr().(*net.TCPAddr)
  192. srv.listener = listener
  193. srv.loopWG.Add(1)
  194. go srv.listenLoop()
  195. if !srv.laddr.IP.IsLoopback() && srv.NAT != nil {
  196. srv.loopWG.Add(1)
  197. go srv.natLoop(srv.laddr.Port)
  198. }
  199. return nil
  200. }
  201. // Stop terminates the server and all active peer connections.
  202. // It blocks until all active connections have been closed.
  203. func (srv *Server) Stop() {
  204. srv.lock.Lock()
  205. if !srv.running {
  206. srv.lock.Unlock()
  207. return
  208. }
  209. srv.running = false
  210. srv.lock.Unlock()
  211. srvlog.Infoln("Stopping Server")
  212. srv.ntab.Close()
  213. if srv.listener != nil {
  214. // this unblocks listener Accept
  215. srv.listener.Close()
  216. }
  217. close(srv.quit)
  218. srv.loopWG.Wait()
  219. // No new peers can be added at this point because dialLoop and
  220. // listenLoop are down. It is safe to call peerWG.Wait because
  221. // peerWG.Add is not called outside of those loops.
  222. for _, peer := range srv.peers {
  223. peer.Disconnect(DiscQuitting)
  224. }
  225. srv.peerWG.Wait()
  226. }
  227. // main loop for adding connections via listening
  228. func (srv *Server) listenLoop() {
  229. defer srv.loopWG.Done()
  230. srvlog.Infoln("Listening on", srv.listener.Addr())
  231. for {
  232. conn, err := srv.listener.Accept()
  233. if err != nil {
  234. return
  235. }
  236. srvlog.Debugf("Accepted conn %v\n", conn.RemoteAddr())
  237. srv.peerWG.Add(1)
  238. go srv.startPeer(conn, nil)
  239. }
  240. }
  241. func (srv *Server) natLoop(port int) {
  242. defer srv.loopWG.Done()
  243. for {
  244. srv.updatePortMapping(port)
  245. select {
  246. case <-time.After(portMappingUpdateInterval):
  247. // one more round
  248. case <-srv.quit:
  249. srv.removePortMapping(port)
  250. return
  251. }
  252. }
  253. }
  254. func (srv *Server) updatePortMapping(port int) {
  255. srvlog.Infoln("Attempting to map port", port, "with", srv.NAT)
  256. err := srv.NAT.AddPortMapping("tcp", port, port, "ethereum p2p", portMappingTimeout)
  257. if err != nil {
  258. srvlog.Errorln("Port mapping error:", err)
  259. return
  260. }
  261. extip, err := srv.NAT.GetExternalAddress()
  262. if err != nil {
  263. srvlog.Errorln("Error getting external IP:", err)
  264. return
  265. }
  266. srv.lock.Lock()
  267. extaddr := *(srv.listener.Addr().(*net.TCPAddr))
  268. extaddr.IP = extip
  269. srvlog.Infoln("Mapped port, external addr is", &extaddr)
  270. srv.laddr = &extaddr
  271. srv.lock.Unlock()
  272. }
  273. func (srv *Server) removePortMapping(port int) {
  274. srvlog.Infoln("Removing port mapping for", port, "with", srv.NAT)
  275. srv.NAT.DeletePortMapping("tcp", port, port)
  276. }
  277. func (srv *Server) dialLoop() {
  278. defer srv.loopWG.Done()
  279. refresh := time.NewTicker(refreshPeersInterval)
  280. defer refresh.Stop()
  281. srv.ntab.Bootstrap(srv.BootstrapNodes)
  282. go srv.findPeers()
  283. dialed := make(chan *discover.Node)
  284. dialing := make(map[discover.NodeID]bool)
  285. // TODO: limit number of active dials
  286. // TODO: ensure only one findPeers goroutine is running
  287. // TODO: pause findPeers when we're at capacity
  288. for {
  289. select {
  290. case <-refresh.C:
  291. go srv.findPeers()
  292. case dest := <-srv.peerConnect:
  293. srv.lock.Lock()
  294. _, isconnected := srv.peers[dest.ID]
  295. srv.lock.Unlock()
  296. if isconnected || dialing[dest.ID] {
  297. continue
  298. }
  299. dialing[dest.ID] = true
  300. srv.peerWG.Add(1)
  301. go func() {
  302. srv.dialNode(dest)
  303. // at this point, the peer has been added
  304. // or discarded. either way, we're not dialing it anymore.
  305. dialed <- dest
  306. }()
  307. case dest := <-dialed:
  308. delete(dialing, dest.ID)
  309. case <-srv.quit:
  310. // TODO: maybe wait for active dials
  311. return
  312. }
  313. }
  314. }
  315. func (srv *Server) dialNode(dest *discover.Node) {
  316. addr := &net.TCPAddr{IP: dest.IP, Port: dest.TCPPort}
  317. srvlog.Debugf("Dialing %v\n", dest)
  318. conn, err := srv.Dialer.Dial("tcp", addr.String())
  319. if err != nil {
  320. srvlog.DebugDetailf("dial error: %v", err)
  321. return
  322. }
  323. srv.startPeer(conn, dest)
  324. }
  325. func (srv *Server) findPeers() {
  326. far := srv.ntab.Self()
  327. for i := range far {
  328. far[i] = ^far[i]
  329. }
  330. closeToSelf := srv.ntab.Lookup(srv.ntab.Self())
  331. farFromSelf := srv.ntab.Lookup(far)
  332. for i := 0; i < len(closeToSelf) || i < len(farFromSelf); i++ {
  333. if i < len(closeToSelf) {
  334. srv.peerConnect <- closeToSelf[i]
  335. }
  336. if i < len(farFromSelf) {
  337. srv.peerConnect <- farFromSelf[i]
  338. }
  339. }
  340. }
  341. func (srv *Server) startPeer(conn net.Conn, dest *discover.Node) {
  342. // TODO: I/O timeout, handle/store session token
  343. remoteID, _, err := srv.handshakeFunc(conn, srv.PrivateKey, dest)
  344. if err != nil {
  345. conn.Close()
  346. srvlog.Debugf("Encryption Handshake with %v failed: %v", conn.RemoteAddr(), err)
  347. return
  348. }
  349. ourID := srv.ntab.Self()
  350. p := newPeer(conn, srv.Protocols, srv.Name, &ourID, &remoteID)
  351. if ok, reason := srv.addPeer(remoteID, p); !ok {
  352. p.Disconnect(reason)
  353. return
  354. }
  355. if srv.newPeerHook != nil {
  356. srv.newPeerHook(p)
  357. }
  358. p.run()
  359. srv.removePeer(p)
  360. }
  361. func (srv *Server) addPeer(id discover.NodeID, p *Peer) (bool, DiscReason) {
  362. srv.lock.Lock()
  363. defer srv.lock.Unlock()
  364. switch {
  365. case !srv.running:
  366. return false, DiscQuitting
  367. case len(srv.peers) >= srv.MaxPeers:
  368. return false, DiscTooManyPeers
  369. case srv.peers[id] != nil:
  370. return false, DiscAlreadyConnected
  371. case srv.Blacklist.Exists(id[:]):
  372. return false, DiscUselessPeer
  373. case id == srv.ntab.Self():
  374. return false, DiscSelf
  375. }
  376. srvlog.Debugf("Adding %v\n", p)
  377. srv.peers[id] = p
  378. return true, 0
  379. }
  380. // removes peer: sending disconnect msg, stop peer, remove rom list/table, release slot
  381. func (srv *Server) removePeer(p *Peer) {
  382. srvlog.Debugf("Removing %v\n", p)
  383. srv.lock.Lock()
  384. delete(srv.peers, *p.remoteID)
  385. srv.lock.Unlock()
  386. srv.peerWG.Done()
  387. }
  388. type Blacklist interface {
  389. Get([]byte) (bool, error)
  390. Put([]byte) error
  391. Delete([]byte) error
  392. Exists(pubkey []byte) (ok bool)
  393. }
  394. type BlacklistMap struct {
  395. blacklist map[string]bool
  396. lock sync.RWMutex
  397. }
  398. func NewBlacklist() *BlacklistMap {
  399. return &BlacklistMap{
  400. blacklist: make(map[string]bool),
  401. }
  402. }
  403. func (self *BlacklistMap) Get(pubkey []byte) (bool, error) {
  404. self.lock.RLock()
  405. defer self.lock.RUnlock()
  406. v, ok := self.blacklist[string(pubkey)]
  407. var err error
  408. if !ok {
  409. err = fmt.Errorf("not found")
  410. }
  411. return v, err
  412. }
  413. func (self *BlacklistMap) Exists(pubkey []byte) (ok bool) {
  414. self.lock.RLock()
  415. defer self.lock.RUnlock()
  416. _, ok = self.blacklist[string(pubkey)]
  417. return
  418. }
  419. func (self *BlacklistMap) Put(pubkey []byte) error {
  420. self.lock.RLock()
  421. defer self.lock.RUnlock()
  422. self.blacklist[string(pubkey)] = true
  423. return nil
  424. }
  425. func (self *BlacklistMap) Delete(pubkey []byte) error {
  426. self.lock.RLock()
  427. defer self.lock.RUnlock()
  428. delete(self.blacklist, string(pubkey))
  429. return nil
  430. }