server.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467
  1. package p2p
  2. import (
  3. "bytes"
  4. "errors"
  5. "fmt"
  6. "net"
  7. "sync"
  8. "time"
  9. "github.com/ethereum/go-ethereum/logger"
  10. )
  11. const (
  12. outboundAddressPoolSize = 500
  13. defaultDialTimeout = 10 * time.Second
  14. portMappingUpdateInterval = 15 * time.Minute
  15. portMappingTimeout = 20 * time.Minute
  16. )
  17. var srvlog = logger.NewLogger("P2P Server")
  18. // Server manages all peer connections.
  19. //
  20. // The fields of Server are used as configuration parameters.
  21. // You should set them before starting the Server. Fields may not be
  22. // modified while the server is running.
  23. type Server struct {
  24. // This field must be set to a valid client identity.
  25. Identity ClientIdentity
  26. // MaxPeers is the maximum number of peers that can be
  27. // connected. It must be greater than zero.
  28. MaxPeers int
  29. // Protocols should contain the protocols supported
  30. // by the server. Matching protocols are launched for
  31. // each peer.
  32. Protocols []Protocol
  33. // If Blacklist is set to a non-nil value, the given Blacklist
  34. // is used to verify peer connections.
  35. Blacklist Blacklist
  36. // If ListenAddr is set to a non-nil address, the server
  37. // will listen for incoming connections.
  38. //
  39. // If the port is zero, the operating system will pick a port. The
  40. // ListenAddr field will be updated with the actual address when
  41. // the server is started.
  42. ListenAddr string
  43. // If set to a non-nil value, the given NAT port mapper
  44. // is used to make the listening port available to the
  45. // Internet.
  46. NAT NAT
  47. // If Dialer is set to a non-nil value, the given Dialer
  48. // is used to dial outbound peer connections.
  49. Dialer *net.Dialer
  50. // If NoDial is true, the server will not dial any peers.
  51. NoDial bool
  52. // Hook for testing. This is useful because we can inhibit
  53. // the whole protocol stack.
  54. newPeerFunc peerFunc
  55. lock sync.RWMutex
  56. running bool
  57. listener net.Listener
  58. laddr *net.TCPAddr // real listen addr
  59. peers []*Peer
  60. peerSlots chan int
  61. peerCount int
  62. quit chan struct{}
  63. wg sync.WaitGroup
  64. peerConnect chan *peerAddr
  65. peerDisconnect chan *Peer
  66. }
  67. // NAT is implemented by NAT traversal methods.
  68. type NAT interface {
  69. GetExternalAddress() (net.IP, error)
  70. AddPortMapping(protocol string, extport, intport int, name string, lifetime time.Duration) error
  71. DeletePortMapping(protocol string, extport, intport int) error
  72. // Should return name of the method.
  73. String() string
  74. }
  75. type peerFunc func(srv *Server, c net.Conn, dialAddr *peerAddr) *Peer
  76. // Peers returns all connected peers.
  77. func (srv *Server) Peers() (peers []*Peer) {
  78. srv.lock.RLock()
  79. defer srv.lock.RUnlock()
  80. for _, peer := range srv.peers {
  81. if peer != nil {
  82. peers = append(peers, peer)
  83. }
  84. }
  85. return
  86. }
  87. // PeerCount returns the number of connected peers.
  88. func (srv *Server) PeerCount() int {
  89. srv.lock.RLock()
  90. defer srv.lock.RUnlock()
  91. return srv.peerCount
  92. }
  93. // SuggestPeer injects an address into the outbound address pool.
  94. func (srv *Server) SuggestPeer(ip net.IP, port int, nodeID []byte) {
  95. addr := &peerAddr{ip, uint64(port), nodeID}
  96. select {
  97. case srv.peerConnect <- addr:
  98. default: // don't block
  99. srvlog.Warnf("peer suggestion %v ignored", addr)
  100. }
  101. }
  102. // Broadcast sends an RLP-encoded message to all connected peers.
  103. // This method is deprecated and will be removed later.
  104. func (srv *Server) Broadcast(protocol string, code uint64, data ...interface{}) {
  105. var payload []byte
  106. if data != nil {
  107. payload = encodePayload(data...)
  108. }
  109. srv.lock.RLock()
  110. defer srv.lock.RUnlock()
  111. for _, peer := range srv.peers {
  112. if peer != nil {
  113. var msg = Msg{Code: code}
  114. if data != nil {
  115. msg.Payload = bytes.NewReader(payload)
  116. msg.Size = uint32(len(payload))
  117. }
  118. peer.writeProtoMsg(protocol, msg)
  119. }
  120. }
  121. }
  122. // Start starts running the server.
  123. // Servers can be re-used and started again after stopping.
  124. func (srv *Server) Start() (err error) {
  125. srv.lock.Lock()
  126. defer srv.lock.Unlock()
  127. if srv.running {
  128. return errors.New("server already running")
  129. }
  130. srvlog.Infoln("Starting Server")
  131. // initialize fields
  132. if srv.Identity == nil {
  133. return fmt.Errorf("Server.Identity must be set to a non-nil identity")
  134. }
  135. if srv.MaxPeers <= 0 {
  136. return fmt.Errorf("Server.MaxPeers must be > 0")
  137. }
  138. srv.quit = make(chan struct{})
  139. srv.peers = make([]*Peer, srv.MaxPeers)
  140. srv.peerSlots = make(chan int, srv.MaxPeers)
  141. srv.peerConnect = make(chan *peerAddr, outboundAddressPoolSize)
  142. srv.peerDisconnect = make(chan *Peer)
  143. if srv.newPeerFunc == nil {
  144. srv.newPeerFunc = newServerPeer
  145. }
  146. if srv.Blacklist == nil {
  147. srv.Blacklist = NewBlacklist()
  148. }
  149. if srv.Dialer == nil {
  150. srv.Dialer = &net.Dialer{Timeout: defaultDialTimeout}
  151. }
  152. if srv.ListenAddr != "" {
  153. if err := srv.startListening(); err != nil {
  154. return err
  155. }
  156. }
  157. if !srv.NoDial {
  158. srv.wg.Add(1)
  159. go srv.dialLoop()
  160. }
  161. if srv.NoDial && srv.ListenAddr == "" {
  162. srvlog.Warnln("I will be kind-of useless, neither dialing nor listening.")
  163. }
  164. // make all slots available
  165. for i := range srv.peers {
  166. srv.peerSlots <- i
  167. }
  168. // note: discLoop is not part of WaitGroup
  169. go srv.discLoop()
  170. srv.running = true
  171. return nil
  172. }
  173. func (srv *Server) startListening() error {
  174. listener, err := net.Listen("tcp", srv.ListenAddr)
  175. if err != nil {
  176. return err
  177. }
  178. srv.ListenAddr = listener.Addr().String()
  179. srv.laddr = listener.Addr().(*net.TCPAddr)
  180. srv.listener = listener
  181. srv.wg.Add(1)
  182. go srv.listenLoop()
  183. if !srv.laddr.IP.IsLoopback() && srv.NAT != nil {
  184. srv.wg.Add(1)
  185. go srv.natLoop(srv.laddr.Port)
  186. }
  187. return nil
  188. }
  189. // Stop terminates the server and all active peer connections.
  190. // It blocks until all active connections have been closed.
  191. func (srv *Server) Stop() {
  192. srv.lock.Lock()
  193. if !srv.running {
  194. srv.lock.Unlock()
  195. return
  196. }
  197. srv.running = false
  198. srv.lock.Unlock()
  199. srvlog.Infoln("Stopping server")
  200. if srv.listener != nil {
  201. // this unblocks listener Accept
  202. srv.listener.Close()
  203. }
  204. close(srv.quit)
  205. for _, peer := range srv.Peers() {
  206. peer.Disconnect(DiscQuitting)
  207. }
  208. srv.wg.Wait()
  209. // wait till they actually disconnect
  210. // this is checked by claiming all peerSlots.
  211. // slots become available as the peers disconnect.
  212. for i := 0; i < cap(srv.peerSlots); i++ {
  213. <-srv.peerSlots
  214. }
  215. // terminate discLoop
  216. close(srv.peerDisconnect)
  217. }
  218. func (srv *Server) discLoop() {
  219. for peer := range srv.peerDisconnect {
  220. srv.removePeer(peer)
  221. }
  222. }
  223. // main loop for adding connections via listening
  224. func (srv *Server) listenLoop() {
  225. defer srv.wg.Done()
  226. srvlog.Infoln("Listening on", srv.listener.Addr())
  227. for {
  228. select {
  229. case slot := <-srv.peerSlots:
  230. srvlog.Debugf("grabbed slot %v for listening", slot)
  231. conn, err := srv.listener.Accept()
  232. if err != nil {
  233. srv.peerSlots <- slot
  234. return
  235. }
  236. srvlog.Debugf("Accepted conn %v (slot %d)\n", conn.RemoteAddr(), slot)
  237. srv.addPeer(conn, nil, slot)
  238. case <-srv.quit:
  239. return
  240. }
  241. }
  242. }
  243. func (srv *Server) natLoop(port int) {
  244. defer srv.wg.Done()
  245. for {
  246. srv.updatePortMapping(port)
  247. select {
  248. case <-time.After(portMappingUpdateInterval):
  249. // one more round
  250. case <-srv.quit:
  251. srv.removePortMapping(port)
  252. return
  253. }
  254. }
  255. }
  256. func (srv *Server) updatePortMapping(port int) {
  257. srvlog.Infoln("Attempting to map port", port, "with", srv.NAT)
  258. err := srv.NAT.AddPortMapping("tcp", port, port, "ethereum p2p", portMappingTimeout)
  259. if err != nil {
  260. srvlog.Errorln("Port mapping error:", err)
  261. return
  262. }
  263. extip, err := srv.NAT.GetExternalAddress()
  264. if err != nil {
  265. srvlog.Errorln("Error getting external IP:", err)
  266. return
  267. }
  268. srv.lock.Lock()
  269. extaddr := *(srv.listener.Addr().(*net.TCPAddr))
  270. extaddr.IP = extip
  271. srvlog.Infoln("Mapped port, external addr is", &extaddr)
  272. srv.laddr = &extaddr
  273. srv.lock.Unlock()
  274. }
  275. func (srv *Server) removePortMapping(port int) {
  276. srvlog.Infoln("Removing port mapping for", port, "with", srv.NAT)
  277. srv.NAT.DeletePortMapping("tcp", port, port)
  278. }
  279. func (srv *Server) dialLoop() {
  280. defer srv.wg.Done()
  281. var (
  282. suggest chan *peerAddr
  283. slot *int
  284. slots = srv.peerSlots
  285. )
  286. for {
  287. select {
  288. case i := <-slots:
  289. // we need a peer in slot i, slot reserved
  290. slot = &i
  291. // now we can watch for candidate peers in the next loop
  292. suggest = srv.peerConnect
  293. // do not consume more until candidate peer is found
  294. slots = nil
  295. case desc := <-suggest:
  296. // candidate peer found, will dial out asyncronously
  297. // if connection fails slot will be released
  298. srvlog.Infof("dial %v (%v)", desc, *slot)
  299. go srv.dialPeer(desc, *slot)
  300. // we can watch if more peers needed in the next loop
  301. slots = srv.peerSlots
  302. // until then we dont care about candidate peers
  303. suggest = nil
  304. case <-srv.quit:
  305. // give back the currently reserved slot
  306. if slot != nil {
  307. srv.peerSlots <- *slot
  308. }
  309. return
  310. }
  311. }
  312. }
  313. // connect to peer via dial out
  314. func (srv *Server) dialPeer(desc *peerAddr, slot int) {
  315. srvlog.Debugf("Dialing %v (slot %d)\n", desc, slot)
  316. conn, err := srv.Dialer.Dial(desc.Network(), desc.String())
  317. if err != nil {
  318. srvlog.Errorf("Dial error: %v", err)
  319. srv.peerSlots <- slot
  320. return
  321. }
  322. go srv.addPeer(conn, desc, slot)
  323. }
  324. // creates the new peer object and inserts it into its slot
  325. func (srv *Server) addPeer(conn net.Conn, desc *peerAddr, slot int) *Peer {
  326. srv.lock.Lock()
  327. defer srv.lock.Unlock()
  328. if !srv.running {
  329. conn.Close()
  330. srv.peerSlots <- slot // release slot
  331. return nil
  332. }
  333. peer := srv.newPeerFunc(srv, conn, desc)
  334. peer.slot = slot
  335. srv.peers[slot] = peer
  336. srv.peerCount++
  337. go func() { peer.loop(); srv.peerDisconnect <- peer }()
  338. return peer
  339. }
  340. // removes peer: sending disconnect msg, stop peer, remove rom list/table, release slot
  341. func (srv *Server) removePeer(peer *Peer) {
  342. srv.lock.Lock()
  343. defer srv.lock.Unlock()
  344. srvlog.Debugf("Removing %v (slot %v)\n", peer, peer.slot)
  345. if srv.peers[peer.slot] != peer {
  346. srvlog.Warnln("Invalid peer to remove:", peer)
  347. return
  348. }
  349. // remove from list and index
  350. srv.peerCount--
  351. srv.peers[peer.slot] = nil
  352. // release slot to signal need for a new peer, last!
  353. srv.peerSlots <- peer.slot
  354. }
  355. func (srv *Server) verifyPeer(addr *peerAddr) error {
  356. if srv.Blacklist.Exists(addr.Pubkey) {
  357. return errors.New("blacklisted")
  358. }
  359. if bytes.Equal(srv.Identity.Pubkey()[1:], addr.Pubkey) {
  360. return newPeerError(errPubkeyForbidden, "not allowed to connect to srv")
  361. }
  362. srv.lock.RLock()
  363. defer srv.lock.RUnlock()
  364. for _, peer := range srv.peers {
  365. if peer != nil {
  366. id := peer.Identity()
  367. if id != nil && bytes.Equal(id.Pubkey(), addr.Pubkey) {
  368. return errors.New("already connected")
  369. }
  370. }
  371. }
  372. return nil
  373. }
  374. // TODO replace with "Set"
  375. type Blacklist interface {
  376. Get([]byte) (bool, error)
  377. Put([]byte) error
  378. Delete([]byte) error
  379. Exists(pubkey []byte) (ok bool)
  380. }
  381. type BlacklistMap struct {
  382. blacklist map[string]bool
  383. lock sync.RWMutex
  384. }
  385. func NewBlacklist() *BlacklistMap {
  386. return &BlacklistMap{
  387. blacklist: make(map[string]bool),
  388. }
  389. }
  390. func (self *BlacklistMap) Get(pubkey []byte) (bool, error) {
  391. self.lock.RLock()
  392. defer self.lock.RUnlock()
  393. v, ok := self.blacklist[string(pubkey)]
  394. var err error
  395. if !ok {
  396. err = fmt.Errorf("not found")
  397. }
  398. return v, err
  399. }
  400. func (self *BlacklistMap) Exists(pubkey []byte) (ok bool) {
  401. self.lock.RLock()
  402. defer self.lock.RUnlock()
  403. _, ok = self.blacklist[string(pubkey)]
  404. return
  405. }
  406. func (self *BlacklistMap) Put(pubkey []byte) error {
  407. self.lock.RLock()
  408. defer self.lock.RUnlock()
  409. self.blacklist[string(pubkey)] = true
  410. return nil
  411. }
  412. func (self *BlacklistMap) Delete(pubkey []byte) error {
  413. self.lock.RLock()
  414. defer self.lock.RUnlock()
  415. delete(self.blacklist, string(pubkey))
  416. return nil
  417. }