server.go 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391
  1. package p2p
  2. import (
  3. "blockchain-go/common/gopool"
  4. "blockchain-go/p2p/discover"
  5. "blockchain-go/p2p/enode"
  6. "blockchain-go/p2p/enr"
  7. "blockchain-go/p2p/nat"
  8. "blockchain-go/p2p/rlpx"
  9. "crypto/ecdsa"
  10. "errors"
  11. "fmt"
  12. "github.com/ethereum/go-ethereum/crypto"
  13. "github.com/ethereum/go-ethereum/p2p/netutil"
  14. "net"
  15. "sync"
  16. "time"
  17. )
  18. var (
  19. ErrServerStopped = errors.New("server stopped")
  20. )
  21. // Server manages all peer connections.
  22. type Server struct {
  23. // Config fields may not be modified while the server is running.
  24. Config
  25. // Hooks for testing. These are useful because we can inhibit
  26. // the whole protocol stack.
  27. //newTransport func(net.Conn, *ecdsa.PublicKey) transport
  28. //newPeerHook func(*Peer)
  29. //listenFunc func(network, addr string) (net.Listener, error)
  30. lock sync.Mutex // protects running
  31. running bool
  32. listener net.Listener
  33. ourHandshake *protoHandshake
  34. loopWG sync.WaitGroup // loop, listenLoop
  35. //peerFeed event.Feed
  36. //log log.Logger
  37. //nodedb *enode.DB
  38. localnode *enode.LocalNode
  39. ntab *discover.UDPv4
  40. //DiscV5 *discover.UDPv5
  41. discmix *enode.FairMix
  42. dialsched *dialScheduler
  43. // Channels into the run loop.
  44. quit chan struct{}
  45. //addtrusted chan *enode.Node
  46. //removetrusted chan *enode.Node
  47. //peerOp chan peerOpFunc
  48. //peerOpDone chan struct{}
  49. //delpeer chan peerDrop
  50. checkpointPostHandshake chan *conn
  51. checkpointAddPeer chan *conn
  52. // State of run loop and listenLoop.
  53. //inboundHistory expHeap
  54. }
  55. func (server *Server) Start() (err error) {
  56. server.quit = make(chan struct{})
  57. server.checkpointPostHandshake = make(chan *conn)
  58. server.checkpointAddPeer = make(chan *conn)
  59. if err := server.setupLocalNode(); err != nil {
  60. return err
  61. }
  62. if err := server.setupListening(); err != nil {
  63. return err
  64. }
  65. if err := server.setupDiscovery(); err != nil {
  66. return err
  67. }
  68. server.setupDialScheduler()
  69. return nil
  70. }
  71. // 配置节点发现逻辑
  72. func (server *Server) setupDiscovery() (err error) {
  73. server.discmix = enode.NewFairMix(discmixTimeout)
  74. // 添加特定协议的发现源。
  75. added := make(map[string]bool)
  76. for _, proto := range server.Protocols {
  77. if proto.DialCandidates != nil && !added[proto.Name] {
  78. server.discmix.AddSource(proto.DialCandidates)
  79. added[proto.Name] = true
  80. }
  81. }
  82. addr, err := net.ResolveUDPAddr("udp", server.ListenAddr)
  83. if err != nil {
  84. return err
  85. }
  86. conn, err := net.ListenUDP("udp", addr)
  87. if err != nil {
  88. return err
  89. }
  90. realAddr := conn.LocalAddr().(*net.UDPAddr)
  91. fmt.Printf("UDP listener up, addr: %v.", realAddr)
  92. if server.NAT != nil {
  93. if !realAddr.IP.IsLoopback() {
  94. server.loopWG.Add(1)
  95. gopool.Submit(func() {
  96. nat.Map(server.NAT, server.quit, "udp", realAddr.Port, realAddr.Port, "ethereum discovery")
  97. server.loopWG.Done()
  98. })
  99. }
  100. }
  101. server.localnode.SetFallbackUDP(realAddr.Port)
  102. // 设置V4的发现协议
  103. var unhandled chan discover.ReadPacket
  104. //var sconn *sharedUDPConn
  105. //if server.DiscoveryV5 {
  106. // unhandled = make(chan discover.ReadPacket, 100)
  107. // sconn = &sharedUDPConn{conn, unhandled}
  108. //}
  109. cfg := discover.Config{
  110. PrivateKey: server.PrivateKey,
  111. //NetRestrict: server.NetRestrict,
  112. //Bootnodes: server.BootstrapNodes,
  113. Unhandled: unhandled,
  114. //Log: server.log,
  115. }
  116. ntab, err := discover.ListenV4(conn, server.localnode, cfg)
  117. if err != nil {
  118. return err
  119. }
  120. server.ntab = ntab
  121. server.discmix.AddSource(ntab.RandomNodes())
  122. return nil
  123. }
  124. // 设置拨号调度器
  125. func (server *Server) setupDialScheduler() {
  126. config := dialConfig{
  127. self: server.localnode.ID(),
  128. maxDialPeers: server.maxDialedConns(),
  129. maxActiveDials: server.MaxPendingPeers,
  130. log: server.Logger,
  131. netRestrict: server.NetRestrict,
  132. dialer: server.Dialer,
  133. clock: server.clock,
  134. }
  135. if server.ntab != nil {
  136. config.resolver = server.ntab
  137. }
  138. if config.dialer == nil {
  139. config.dialer = tcpDialer{&net.Dialer{Timeout: defaultDialTimeout}}
  140. }
  141. server.dialsched = newDialScheduler(config, server.discmix, server.SetupConn)
  142. //for _, n := range srv.StaticNodes {
  143. // srv.dialsched.addStatic(n)
  144. //}
  145. }
  146. func (server *Server) maxDialedConns() (limit int) {
  147. if server.NoDial || server.MaxPeers == 0 {
  148. return 0
  149. }
  150. if server.DialRatio == 0 {
  151. limit = server.MaxPeers / defaultDialRatio
  152. } else {
  153. limit = server.MaxPeers / server.DialRatio
  154. }
  155. if limit == 0 {
  156. limit = 1
  157. }
  158. return limit
  159. }
  160. // 配置本地节点
  161. func (server *Server) setupLocalNode() (err error) {
  162. // 创建握手所需对象
  163. publicKey := crypto.FromECDSAPub(&server.PrivateKey.PublicKey)
  164. server.ourHandshake = &protoHandshake{
  165. Version: baseProtocolVersion,
  166. Name: server.Name,
  167. ID: publicKey[1:],
  168. }
  169. // 创建本地节点
  170. server.localnode = enode.NewLocalNode(server.PrivateKey)
  171. server.localnode.SetFallbackIP(net.IP{127, 0, 0, 1})
  172. // 配置本地静态IP
  173. ip, _ := server.NAT.ExternalIP()
  174. server.localnode.SetStaticIP(ip)
  175. return nil
  176. }
  177. // 监听器
  178. func (server *Server) setupListening() (err error) {
  179. listener, err := net.Listen("tcp", server.ListenAddr)
  180. if err != nil {
  181. return err
  182. }
  183. server.listener = listener
  184. server.ListenAddr = listener.Addr().String()
  185. if tcp, ok := listener.Addr().(*net.TCPAddr); ok {
  186. server.localnode.Set(enr.TCP(tcp.Port))
  187. if !tcp.IP.IsLoopback() && server.NAT != nil {
  188. server.loopWG.Add(1)
  189. gopool.Submit(func() {
  190. nat.Map(server.NAT, server.quit, "tcp", tcp.Port, tcp.Port, "ethereum p2p")
  191. server.loopWG.Done()
  192. })
  193. }
  194. }
  195. server.loopWG.Add(1)
  196. go server.listenLoop()
  197. return nil
  198. }
  199. func (server *Server) listenLoop() {
  200. fmt.Printf("TCP Listener up, addr: %v.", server.listener.Addr())
  201. tokens := defaultMaxPendingPeers
  202. slots := make(chan struct{}, tokens)
  203. for i := 0; i < tokens; i++ {
  204. slots <- struct{}{}
  205. }
  206. defer server.loopWG.Done()
  207. defer func() {
  208. for i := 0; i < cap(slots); i++ {
  209. <-slots
  210. }
  211. }()
  212. for {
  213. <-slots
  214. var (
  215. fd net.Conn
  216. err error
  217. lastLogTime time.Time
  218. )
  219. // accept处理
  220. for {
  221. fd, err = server.listener.Accept()
  222. if netutil.IsTemporaryError(err) {
  223. if time.Since(lastLogTime) > 1*time.Second {
  224. fmt.Errorf("temporary read error, err: %v", err)
  225. lastLogTime = time.Now()
  226. }
  227. time.Sleep(time.Millisecond * 200)
  228. continue
  229. } else if err != nil {
  230. fmt.Errorf("read error, err: %v", err)
  231. slots <- struct{}{}
  232. return
  233. }
  234. break
  235. }
  236. // accept成功的处理
  237. remoteIP := netutil.AddrIP(fd.RemoteAddr())
  238. // TODO 检查此IP是是否能加入本地节点的链接
  239. //if err := server.checkInboundConn(remoteIP); err != nil {
  240. // srv.log.Debug("Rejected inbound connection", "addr", fd.RemoteAddr(), "err", err)
  241. // fd.Close()
  242. // slots <- struct{}{}
  243. // continue
  244. //}
  245. if remoteIP != nil {
  246. var addr *net.TCPAddr
  247. if tcp, ok := fd.RemoteAddr().(*net.TCPAddr); ok {
  248. addr = tcp
  249. }
  250. fd = newMeteredConn(fd, true, addr)
  251. fmt.Printf("Accepted connection, addr: %v.", fd.RemoteAddr())
  252. }
  253. gopool.Submit(func() {
  254. server.SetupConn(fd, inboundConn, nil)
  255. slots <- struct{}{}
  256. })
  257. }
  258. }
  259. func (server *Server) newRLPX(conn net.Conn, dialDest *ecdsa.PublicKey) transport {
  260. return &rlpxTransport{conn: rlpx.NewConn(conn, dialDest)}
  261. }
  262. func (server *Server) SetupConn(fd net.Conn, flags connFlag, dialDest *enode.Node) error {
  263. c := &conn{fd: fd, flags: flags, cont: make(chan error)}
  264. if dialDest == nil {
  265. c.transport = server.newRLPX(fd, nil)
  266. } else {
  267. c.transport = server.newRLPX(fd, dialDest.Pubkey())
  268. }
  269. err := server.setupConn(c, dialDest)
  270. if err != nil {
  271. c.close(err)
  272. }
  273. return err
  274. }
  275. func (server *Server) setupConn(c *conn, dialDest *enode.Node) error {
  276. remotePubkey, err := c.doEncHandshake(server.PrivateKey)
  277. if err != nil {
  278. return err
  279. }
  280. // 将connection转换成node
  281. c.node = enode.NodeFromConn(remotePubkey, c.fd)
  282. fmt.Printf("id: %v, addr: %v, conn: %v", c.node.ID(), c.fd.RemoteAddr(), c.flags)
  283. // 检查是否需要握手
  284. err = server.checkpoint(c, server.checkpointPostHandshake)
  285. if err != nil {
  286. return err
  287. }
  288. // 进行握手
  289. phs, err := c.doProtoHandshake(server.ourHandshake)
  290. if err != nil {
  291. return err
  292. }
  293. c.caps, c.name = phs.Caps, phs.Name
  294. // 将此链接放入addPeer的检查点
  295. err = server.checkpoint(c, server.checkpointAddPeer)
  296. if err != nil {
  297. return err
  298. }
  299. return nil
  300. }
  301. func (server *Server) checkpoint(c *conn, stage chan<- *conn) error {
  302. select {
  303. case stage <- c:
  304. case <-server.quit:
  305. return ErrServerStopped
  306. }
  307. return <-c.cont
  308. }
  309. // sharedUDPConn implements a shared connection. Write sends messages to the underlying connection while read returns
  310. // messages that were found unprocessable and sent to the unhandled channel by the primary listener.
  311. type sharedUDPConn struct {
  312. *net.UDPConn
  313. unhandled chan discover.ReadPacket
  314. }
  315. // ReadFromUDP implements discover.UDPConn
  316. func (s *sharedUDPConn) ReadFromUDP(b []byte) (n int, addr *net.UDPAddr, err error) {
  317. packet, ok := <-s.unhandled
  318. if !ok {
  319. return 0, nil, errors.New("connection was closed")
  320. }
  321. l := len(packet.Data)
  322. if l > len(b) {
  323. l = len(b)
  324. }
  325. copy(b[:l], packet.Data[:l])
  326. return l, packet.Addr, nil
  327. }
  328. // Close implements discover.UDPConn
  329. func (s *sharedUDPConn) Close() error {
  330. return nil
  331. }