server.go 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267
  1. package p2p
  2. import (
  3. "blockchain-go/common/gopool"
  4. "blockchain-go/p2p/enode"
  5. "blockchain-go/p2p/enr"
  6. "blockchain-go/p2p/nat"
  7. "blockchain-go/p2p/rlpx"
  8. "crypto/ecdsa"
  9. "errors"
  10. "fmt"
  11. "github.com/ethereum/go-ethereum/crypto"
  12. "github.com/ethereum/go-ethereum/p2p/netutil"
  13. "net"
  14. "sync"
  15. "time"
  16. )
  17. var (
  18. ErrServerStopped = errors.New("server stopped")
  19. )
  20. // Server manages all peer connections.
  21. type Server struct {
  22. // Config fields may not be modified while the server is running.
  23. Config
  24. // Hooks for testing. These are useful because we can inhibit
  25. // the whole protocol stack.
  26. //newTransport func(net.Conn, *ecdsa.PublicKey) transport
  27. //newPeerHook func(*Peer)
  28. //listenFunc func(network, addr string) (net.Listener, error)
  29. lock sync.Mutex // protects running
  30. running bool
  31. listener net.Listener
  32. ourHandshake *protoHandshake
  33. loopWG sync.WaitGroup // loop, listenLoop
  34. //peerFeed event.Feed
  35. //log log.Logger
  36. //nodedb *enode.DB
  37. localnode *enode.LocalNode
  38. //ntab *discover.UDPv4
  39. //DiscV5 *discover.UDPv5
  40. //discmix *enode.FairMix
  41. //dialsched *dialScheduler
  42. // Channels into the run loop.
  43. quit chan struct{}
  44. //addtrusted chan *enode.Node
  45. //removetrusted chan *enode.Node
  46. //peerOp chan peerOpFunc
  47. //peerOpDone chan struct{}
  48. //delpeer chan peerDrop
  49. checkpointPostHandshake chan *conn
  50. checkpointAddPeer chan *conn
  51. // State of run loop and listenLoop.
  52. //inboundHistory expHeap
  53. }
  54. func (server *Server) Start() (err error) {
  55. server.quit = make(chan struct{})
  56. server.checkpointPostHandshake = make(chan *conn)
  57. server.checkpointAddPeer = make(chan *conn)
  58. if err := server.setupLocalNode(); err != nil {
  59. return err
  60. }
  61. if err := server.setupListening(); err != nil {
  62. return err
  63. }
  64. return nil
  65. }
  66. // 配置本地节点
  67. func (server *Server) setupLocalNode() (err error) {
  68. // 创建握手所需对象
  69. publicKey := crypto.FromECDSAPub(&server.PrivateKey.PublicKey)
  70. server.ourHandshake = &protoHandshake{
  71. Version: baseProtocolVersion,
  72. Name: server.Name,
  73. ID: publicKey[1:],
  74. }
  75. // 创建本地节点
  76. server.localnode = enode.NewLocalNode(server.PrivateKey)
  77. server.localnode.SetFallbackIP(net.IP{127, 0, 0, 1})
  78. // 配置本地静态IP
  79. ip, _ := server.NAT.ExternalIP()
  80. server.localnode.SetStaticIP(ip)
  81. return nil
  82. }
  83. // 监听器
  84. func (server *Server) setupListening() (err error) {
  85. listener, err := net.Listen("tcp", server.ListenAddr)
  86. if err != nil {
  87. return err
  88. }
  89. server.listener = listener
  90. server.ListenAddr = listener.Addr().String()
  91. if tcp, ok := listener.Addr().(*net.TCPAddr); ok {
  92. server.localnode.Set(enr.TCP(tcp.Port))
  93. if !tcp.IP.IsLoopback() && server.NAT != nil {
  94. server.loopWG.Add(1)
  95. gopool.Submit(func() {
  96. nat.Map(server.NAT, server.quit, "tcp", tcp.Port, tcp.Port, "ethereum p2p")
  97. server.loopWG.Done()
  98. })
  99. }
  100. }
  101. server.loopWG.Add(1)
  102. go server.listenLoop()
  103. return nil
  104. }
  105. func (server *Server) listenLoop() {
  106. fmt.Printf("TCP Listener up, addr: %v.", server.listener.Addr())
  107. tokens := defaultMaxPendingPeers
  108. slots := make(chan struct{}, tokens)
  109. for i := 0; i < tokens; i++ {
  110. slots <- struct{}{}
  111. }
  112. defer server.loopWG.Done()
  113. defer func() {
  114. for i := 0; i < cap(slots); i++ {
  115. <-slots
  116. }
  117. }()
  118. for {
  119. <-slots
  120. var (
  121. fd net.Conn
  122. err error
  123. lastLogTime time.Time
  124. )
  125. // accept处理
  126. for {
  127. fd, err = server.listener.Accept()
  128. if netutil.IsTemporaryError(err) {
  129. if time.Since(lastLogTime) > 1*time.Second {
  130. fmt.Errorf("temporary read error, err: %v", err)
  131. lastLogTime = time.Now()
  132. }
  133. time.Sleep(time.Millisecond * 200)
  134. continue
  135. } else if err != nil {
  136. fmt.Errorf("read error, err: %v", err)
  137. slots <- struct{}{}
  138. return
  139. }
  140. break
  141. }
  142. // accept成功的处理
  143. remoteIP := netutil.AddrIP(fd.RemoteAddr())
  144. // TODO 检查此IP是是否能加入本地节点的链接
  145. //if err := server.checkInboundConn(remoteIP); err != nil {
  146. // srv.log.Debug("Rejected inbound connection", "addr", fd.RemoteAddr(), "err", err)
  147. // fd.Close()
  148. // slots <- struct{}{}
  149. // continue
  150. //}
  151. if remoteIP != nil {
  152. var addr *net.TCPAddr
  153. if tcp, ok := fd.RemoteAddr().(*net.TCPAddr); ok {
  154. addr = tcp
  155. }
  156. fd = newMeteredConn(fd, true, addr)
  157. fmt.Printf("Accepted connection, addr: %v.", fd.RemoteAddr())
  158. }
  159. gopool.Submit(func() {
  160. server.SetupConn(fd, inboundConn, nil)
  161. slots <- struct{}{}
  162. })
  163. }
  164. }
  165. func (server *Server) newRLPX(conn net.Conn, dialDest *ecdsa.PublicKey) transport {
  166. return &rlpxTransport{conn: rlpx.NewConn(conn, dialDest)}
  167. }
  168. func (server *Server) SetupConn(fd net.Conn, flags connFlag, dialDest *enode.Node) error {
  169. c := &conn{fd: fd, flags: flags, cont: make(chan error)}
  170. if dialDest == nil {
  171. c.transport = server.newRLPX(fd, nil)
  172. } else {
  173. c.transport = server.newRLPX(fd, dialDest.Pubkey())
  174. }
  175. err := server.setupConn(c, dialDest)
  176. if err != nil {
  177. c.close(err)
  178. }
  179. return err
  180. }
  181. func (server *Server) setupConn(c *conn, dialDest *enode.Node) error {
  182. remotePubkey, err := c.doEncHandshake(server.PrivateKey)
  183. if err != nil {
  184. return err
  185. }
  186. // 将connection转换成node
  187. c.node = enode.NodeFromConn(remotePubkey, c.fd)
  188. fmt.Printf("id: %v, addr: %v, conn: %v", c.node.ID(), c.fd.RemoteAddr(), c.flags)
  189. // 检查是否需要握手
  190. err = server.checkpoint(c, server.checkpointPostHandshake)
  191. if err != nil {
  192. return err
  193. }
  194. // 进行握手
  195. phs, err := c.doProtoHandshake(server.ourHandshake)
  196. if err != nil {
  197. return err
  198. }
  199. c.caps, c.name = phs.Caps, phs.Name
  200. // 将此链接放入addPeer的检查点
  201. err = server.checkpoint(c, server.checkpointAddPeer)
  202. if err != nil {
  203. return err
  204. }
  205. return nil
  206. }
  207. func (server *Server) checkpoint(c *conn, stage chan<- *conn) error {
  208. select {
  209. case stage <- c:
  210. case <-server.quit:
  211. return ErrServerStopped
  212. }
  213. return <-c.cont
  214. }
  215. // 配置节点发现逻辑
  216. func (server *Server) setupDiscovery() (err error) {
  217. return nil
  218. }
  219. // 设置拨号调度器
  220. func (server *Server) setupDialScheduler() (err error) {
  221. return nil
  222. }