server.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446
  1. package p2p
  2. import (
  3. "bytes"
  4. "crypto/ecdsa"
  5. "errors"
  6. "fmt"
  7. "io"
  8. "net"
  9. "runtime"
  10. "sync"
  11. "time"
  12. "github.com/ethereum/go-ethereum/logger"
  13. "github.com/ethereum/go-ethereum/p2p/discover"
  14. "github.com/ethereum/go-ethereum/p2p/nat"
  15. )
  16. const (
  17. defaultDialTimeout = 10 * time.Second
  18. refreshPeersInterval = 30 * time.Second
  19. )
  20. var srvlog = logger.NewLogger("P2P Server")
  21. // MakeName creates a node name that follows the ethereum convention
  22. // for such names. It adds the operation system name and Go runtime version
  23. // the name.
  24. func MakeName(name, version string) string {
  25. return fmt.Sprintf("%s/v%s/%s/%s", name, version, runtime.GOOS, runtime.Version())
  26. }
  27. // Server manages all peer connections.
  28. //
  29. // The fields of Server are used as configuration parameters.
  30. // You should set them before starting the Server. Fields may not be
  31. // modified while the server is running.
  32. type Server struct {
  33. // This field must be set to a valid secp256k1 private key.
  34. PrivateKey *ecdsa.PrivateKey
  35. // MaxPeers is the maximum number of peers that can be
  36. // connected. It must be greater than zero.
  37. MaxPeers int
  38. // Name sets the node name of this server.
  39. // Use MakeName to create a name that follows existing conventions.
  40. Name string
  41. // Bootstrap nodes are used to establish connectivity
  42. // with the rest of the network.
  43. BootstrapNodes []*discover.Node
  44. // Protocols should contain the protocols supported
  45. // by the server. Matching protocols are launched for
  46. // each peer.
  47. Protocols []Protocol
  48. // If Blacklist is set to a non-nil value, the given Blacklist
  49. // is used to verify peer connections.
  50. Blacklist Blacklist
  51. // If ListenAddr is set to a non-nil address, the server
  52. // will listen for incoming connections.
  53. //
  54. // If the port is zero, the operating system will pick a port. The
  55. // ListenAddr field will be updated with the actual address when
  56. // the server is started.
  57. ListenAddr string
  58. // If set to a non-nil value, the given NAT port mapper
  59. // is used to make the listening port available to the
  60. // Internet.
  61. NAT nat.Interface
  62. // If Dialer is set to a non-nil value, the given Dialer
  63. // is used to dial outbound peer connections.
  64. Dialer *net.Dialer
  65. // If NoDial is true, the server will not dial any peers.
  66. NoDial bool
  67. // Hooks for testing. These are useful because we can inhibit
  68. // the whole protocol stack.
  69. handshakeFunc
  70. newPeerHook
  71. lock sync.RWMutex
  72. running bool
  73. listener net.Listener
  74. peers map[discover.NodeID]*Peer
  75. ntab *discover.Table
  76. quit chan struct{}
  77. loopWG sync.WaitGroup // {dial,listen,nat}Loop
  78. peerWG sync.WaitGroup // active peer goroutines
  79. peerConnect chan *discover.Node
  80. }
  81. type handshakeFunc func(io.ReadWriter, *ecdsa.PrivateKey, *discover.Node) (discover.NodeID, []byte, error)
  82. type newPeerHook func(*Peer)
  83. // Peers returns all connected peers.
  84. func (srv *Server) Peers() (peers []*Peer) {
  85. srv.lock.RLock()
  86. defer srv.lock.RUnlock()
  87. for _, peer := range srv.peers {
  88. if peer != nil {
  89. peers = append(peers, peer)
  90. }
  91. }
  92. return
  93. }
  94. // PeerCount returns the number of connected peers.
  95. func (srv *Server) PeerCount() int {
  96. srv.lock.RLock()
  97. n := len(srv.peers)
  98. srv.lock.RUnlock()
  99. return n
  100. }
  101. // SuggestPeer creates a connection to the given Node if it
  102. // is not already connected.
  103. func (srv *Server) SuggestPeer(n *discover.Node) {
  104. srv.peerConnect <- n
  105. }
  106. // Broadcast sends an RLP-encoded message to all connected peers.
  107. // This method is deprecated and will be removed later.
  108. func (srv *Server) Broadcast(protocol string, code uint64, data ...interface{}) {
  109. var payload []byte
  110. if data != nil {
  111. payload = encodePayload(data...)
  112. }
  113. srv.lock.RLock()
  114. defer srv.lock.RUnlock()
  115. for _, peer := range srv.peers {
  116. if peer != nil {
  117. var msg = Msg{Code: code}
  118. if data != nil {
  119. msg.Payload = bytes.NewReader(payload)
  120. msg.Size = uint32(len(payload))
  121. }
  122. peer.writeProtoMsg(protocol, msg)
  123. }
  124. }
  125. }
  126. // Start starts running the server.
  127. // Servers can be re-used and started again after stopping.
  128. func (srv *Server) Start() (err error) {
  129. srv.lock.Lock()
  130. defer srv.lock.Unlock()
  131. if srv.running {
  132. return errors.New("server already running")
  133. }
  134. srvlog.Infoln("Starting Server")
  135. // initialize all the fields
  136. if srv.PrivateKey == nil {
  137. return fmt.Errorf("Server.PrivateKey must be set to a non-nil key")
  138. }
  139. if srv.MaxPeers <= 0 {
  140. return fmt.Errorf("Server.MaxPeers must be > 0")
  141. }
  142. srv.quit = make(chan struct{})
  143. srv.peers = make(map[discover.NodeID]*Peer)
  144. srv.peerConnect = make(chan *discover.Node)
  145. if srv.handshakeFunc == nil {
  146. srv.handshakeFunc = encHandshake
  147. }
  148. if srv.Blacklist == nil {
  149. srv.Blacklist = NewBlacklist()
  150. }
  151. if srv.ListenAddr != "" {
  152. if err := srv.startListening(); err != nil {
  153. return err
  154. }
  155. }
  156. // dial stuff
  157. dt, err := discover.ListenUDP(srv.PrivateKey, srv.ListenAddr, srv.NAT)
  158. if err != nil {
  159. return err
  160. }
  161. srv.ntab = dt
  162. if srv.Dialer == nil {
  163. srv.Dialer = &net.Dialer{Timeout: defaultDialTimeout}
  164. }
  165. if !srv.NoDial {
  166. srv.loopWG.Add(1)
  167. go srv.dialLoop()
  168. }
  169. if srv.NoDial && srv.ListenAddr == "" {
  170. srvlog.Warnln("I will be kind-of useless, neither dialing nor listening.")
  171. }
  172. srv.running = true
  173. return nil
  174. }
  175. func (srv *Server) startListening() error {
  176. listener, err := net.Listen("tcp", srv.ListenAddr)
  177. if err != nil {
  178. return err
  179. }
  180. laddr := listener.Addr().(*net.TCPAddr)
  181. srv.ListenAddr = laddr.String()
  182. srv.listener = listener
  183. srv.loopWG.Add(1)
  184. go srv.listenLoop()
  185. if !laddr.IP.IsLoopback() && srv.NAT != nil {
  186. srv.loopWG.Add(1)
  187. go func() {
  188. nat.Map(srv.NAT, srv.quit, "tcp", laddr.Port, laddr.Port, "ethereum p2p")
  189. srv.loopWG.Done()
  190. }()
  191. }
  192. return nil
  193. }
  194. // Stop terminates the server and all active peer connections.
  195. // It blocks until all active connections have been closed.
  196. func (srv *Server) Stop() {
  197. srv.lock.Lock()
  198. if !srv.running {
  199. srv.lock.Unlock()
  200. return
  201. }
  202. srv.running = false
  203. srv.lock.Unlock()
  204. srvlog.Infoln("Stopping Server")
  205. srv.ntab.Close()
  206. if srv.listener != nil {
  207. // this unblocks listener Accept
  208. srv.listener.Close()
  209. }
  210. close(srv.quit)
  211. srv.loopWG.Wait()
  212. // No new peers can be added at this point because dialLoop and
  213. // listenLoop are down. It is safe to call peerWG.Wait because
  214. // peerWG.Add is not called outside of those loops.
  215. for _, peer := range srv.peers {
  216. peer.Disconnect(DiscQuitting)
  217. }
  218. srv.peerWG.Wait()
  219. }
  220. // main loop for adding connections via listening
  221. func (srv *Server) listenLoop() {
  222. defer srv.loopWG.Done()
  223. srvlog.Infoln("Listening on", srv.listener.Addr())
  224. for {
  225. conn, err := srv.listener.Accept()
  226. if err != nil {
  227. return
  228. }
  229. srvlog.Debugf("Accepted conn %v\n", conn.RemoteAddr())
  230. srv.peerWG.Add(1)
  231. go srv.startPeer(conn, nil)
  232. }
  233. }
  234. func (srv *Server) dialLoop() {
  235. defer srv.loopWG.Done()
  236. refresh := time.NewTicker(refreshPeersInterval)
  237. defer refresh.Stop()
  238. srv.ntab.Bootstrap(srv.BootstrapNodes)
  239. go srv.findPeers()
  240. dialed := make(chan *discover.Node)
  241. dialing := make(map[discover.NodeID]bool)
  242. // TODO: limit number of active dials
  243. // TODO: ensure only one findPeers goroutine is running
  244. // TODO: pause findPeers when we're at capacity
  245. for {
  246. select {
  247. case <-refresh.C:
  248. go srv.findPeers()
  249. case dest := <-srv.peerConnect:
  250. srv.lock.Lock()
  251. _, isconnected := srv.peers[dest.ID]
  252. srv.lock.Unlock()
  253. if isconnected || dialing[dest.ID] {
  254. continue
  255. }
  256. dialing[dest.ID] = true
  257. srv.peerWG.Add(1)
  258. go func() {
  259. srv.dialNode(dest)
  260. // at this point, the peer has been added
  261. // or discarded. either way, we're not dialing it anymore.
  262. dialed <- dest
  263. }()
  264. case dest := <-dialed:
  265. delete(dialing, dest.ID)
  266. case <-srv.quit:
  267. // TODO: maybe wait for active dials
  268. return
  269. }
  270. }
  271. }
  272. func (srv *Server) dialNode(dest *discover.Node) {
  273. addr := &net.TCPAddr{IP: dest.IP, Port: dest.TCPPort}
  274. srvlog.Debugf("Dialing %v\n", dest)
  275. conn, err := srv.Dialer.Dial("tcp", addr.String())
  276. if err != nil {
  277. srvlog.DebugDetailf("dial error: %v", err)
  278. return
  279. }
  280. srv.startPeer(conn, dest)
  281. }
  282. func (srv *Server) findPeers() {
  283. far := srv.ntab.Self()
  284. for i := range far {
  285. far[i] = ^far[i]
  286. }
  287. closeToSelf := srv.ntab.Lookup(srv.ntab.Self())
  288. farFromSelf := srv.ntab.Lookup(far)
  289. for i := 0; i < len(closeToSelf) || i < len(farFromSelf); i++ {
  290. if i < len(closeToSelf) {
  291. srv.peerConnect <- closeToSelf[i]
  292. }
  293. if i < len(farFromSelf) {
  294. srv.peerConnect <- farFromSelf[i]
  295. }
  296. }
  297. }
  298. func (srv *Server) startPeer(conn net.Conn, dest *discover.Node) {
  299. // TODO: I/O timeout, handle/store session token
  300. remoteID, _, err := srv.handshakeFunc(conn, srv.PrivateKey, dest)
  301. if err != nil {
  302. conn.Close()
  303. srvlog.Debugf("Encryption Handshake with %v failed: %v", conn.RemoteAddr(), err)
  304. return
  305. }
  306. ourID := srv.ntab.Self()
  307. p := newPeer(conn, srv.Protocols, srv.Name, &ourID, &remoteID)
  308. if ok, reason := srv.addPeer(remoteID, p); !ok {
  309. p.Disconnect(reason)
  310. return
  311. }
  312. if srv.newPeerHook != nil {
  313. srv.newPeerHook(p)
  314. }
  315. p.run()
  316. srv.removePeer(p)
  317. }
  318. func (srv *Server) addPeer(id discover.NodeID, p *Peer) (bool, DiscReason) {
  319. srv.lock.Lock()
  320. defer srv.lock.Unlock()
  321. switch {
  322. case !srv.running:
  323. return false, DiscQuitting
  324. case len(srv.peers) >= srv.MaxPeers:
  325. return false, DiscTooManyPeers
  326. case srv.peers[id] != nil:
  327. return false, DiscAlreadyConnected
  328. case srv.Blacklist.Exists(id[:]):
  329. return false, DiscUselessPeer
  330. case id == srv.ntab.Self():
  331. return false, DiscSelf
  332. }
  333. srvlog.Debugf("Adding %v\n", p)
  334. srv.peers[id] = p
  335. return true, 0
  336. }
  337. // removes peer: sending disconnect msg, stop peer, remove rom list/table, release slot
  338. func (srv *Server) removePeer(p *Peer) {
  339. srvlog.Debugf("Removing %v\n", p)
  340. srv.lock.Lock()
  341. delete(srv.peers, *p.remoteID)
  342. srv.lock.Unlock()
  343. srv.peerWG.Done()
  344. }
  345. type Blacklist interface {
  346. Get([]byte) (bool, error)
  347. Put([]byte) error
  348. Delete([]byte) error
  349. Exists(pubkey []byte) (ok bool)
  350. }
  351. type BlacklistMap struct {
  352. blacklist map[string]bool
  353. lock sync.RWMutex
  354. }
  355. func NewBlacklist() *BlacklistMap {
  356. return &BlacklistMap{
  357. blacklist: make(map[string]bool),
  358. }
  359. }
  360. func (self *BlacklistMap) Get(pubkey []byte) (bool, error) {
  361. self.lock.RLock()
  362. defer self.lock.RUnlock()
  363. v, ok := self.blacklist[string(pubkey)]
  364. var err error
  365. if !ok {
  366. err = fmt.Errorf("not found")
  367. }
  368. return v, err
  369. }
  370. func (self *BlacklistMap) Exists(pubkey []byte) (ok bool) {
  371. self.lock.RLock()
  372. defer self.lock.RUnlock()
  373. _, ok = self.blacklist[string(pubkey)]
  374. return
  375. }
  376. func (self *BlacklistMap) Put(pubkey []byte) error {
  377. self.lock.RLock()
  378. defer self.lock.RUnlock()
  379. self.blacklist[string(pubkey)] = true
  380. return nil
  381. }
  382. func (self *BlacklistMap) Delete(pubkey []byte) error {
  383. self.lock.RLock()
  384. defer self.lock.RUnlock()
  385. delete(self.blacklist, string(pubkey))
  386. return nil
  387. }