server.go 9.5 KB


  1. package p2p
  2. import (
  3. "blockchain-go/common/gopool"
  4. "blockchain-go/p2p/discover"
  5. "blockchain-go/p2p/enode"
  6. "blockchain-go/p2p/enr"
  7. "blockchain-go/p2p/nat"
  8. "blockchain-go/p2p/rlpx"
  9. "blockchain-go/params"
  10. "crypto/ecdsa"
  11. "errors"
  12. "fmt"
  13. "github.com/ethereum/go-ethereum/crypto"
  14. "github.com/ethereum/go-ethereum/p2p/netutil"
  15. "net"
  16. "sync"
  17. "time"
  18. )
  19. var (
  20. ErrServerStopped = errors.New("server stopped")
  21. )
  22. // Server manages all peer connections.
  23. type Server struct {
  24. // Config fields may not be modified while the server is running.
  25. Config
  26. // Hooks for testing. These are useful because we can inhibit
  27. // the whole protocol stack.
  28. //newTransport func(net.Conn, *ecdsa.PublicKey) transport
  29. //newPeerHook func(*Peer)
  30. //listenFunc func(network, addr string) (net.Listener, error)
  31. lock sync.Mutex // protects running
  32. running bool
  33. listener net.Listener
  34. ourHandshake *protoHandshake
  35. loopWG sync.WaitGroup // loop, listenLoop
  36. //peerFeed event.Feed
  37. //log log.Logger
  38. //nodedb *enode.DB
  39. localnode *enode.LocalNode
  40. ntab *discover.UDPv4
  41. //DiscV5 *discover.UDPv5
  42. discmix *enode.FairMix
  43. dialsched *dialScheduler
  44. // Channels into the run loop.
  45. quit chan struct{}
  46. //addtrusted chan *enode.Node
  47. //removetrusted chan *enode.Node
  48. //peerOp chan peerOpFunc
  49. //peerOpDone chan struct{}
  50. //delpeer chan peerDrop
  51. checkpointPostHandshake chan *conn
  52. checkpointAddPeer chan *conn
  53. // State of run loop and listenLoop.
  54. //inboundHistory expHeap
  55. }
  56. func (server *Server) Start() (err error) {
  57. server.quit = make(chan struct{})
  58. server.checkpointPostHandshake = make(chan *conn)
  59. server.checkpointAddPeer = make(chan *conn)
  60. if err := server.setupBootstrapNodes(); err != nil {
  61. return err
  62. }
  63. if err := server.setupLocalNode(); err != nil {
  64. return err
  65. }
  66. if err := server.setupListening(); err != nil {
  67. return err
  68. }
  69. if err := server.setupDiscovery(); err != nil {
  70. return err
  71. }
  72. server.setupDialScheduler()
  73. return nil
  74. }
  75. // 配置节点发现逻辑
  76. func (server *Server) setupDiscovery() (err error) {
  77. server.discmix = enode.NewFairMix(discmixTimeout)
  78. // 添加特定协议的发现源。
  79. added := make(map[string]bool)
  80. for _, proto := range server.Protocols {
  81. if proto.DialCandidates != nil && !added[proto.Name] {
  82. server.discmix.AddSource(proto.DialCandidates)
  83. added[proto.Name] = true
  84. }
  85. }
  86. addr, err := net.ResolveUDPAddr("udp", server.ListenAddr)
  87. if err != nil {
  88. return err
  89. }
  90. conn, err := net.ListenUDP("udp", addr)
  91. if err != nil {
  92. return err
  93. }
  94. realAddr := conn.LocalAddr().(*net.UDPAddr)
  95. fmt.Printf("UDP listener up, addr: %v.", realAddr)
  96. if server.NAT != nil {
  97. if !realAddr.IP.IsLoopback() {
  98. server.loopWG.Add(1)
  99. gopool.Submit(func() {
  100. nat.Map(server.NAT, server.quit, "udp", realAddr.Port, realAddr.Port, "ethereum discovery")
  101. server.loopWG.Done()
  102. })
  103. }
  104. }
  105. server.localnode.SetFallbackUDP(realAddr.Port)
  106. // 设置V4的发现协议
  107. var unhandled chan discover.ReadPacket
  108. //var sconn *sharedUDPConn
  109. //if server.DiscoveryV5 {
  110. // unhandled = make(chan discover.ReadPacket, 100)
  111. // sconn = &sharedUDPConn{conn, unhandled}
  112. //}
  113. cfg := discover.Config{
  114. PrivateKey: server.PrivateKey,
  115. NetRestrict: server.NetRestrict,
  116. Bootnodes: server.BootstrapNodes,
  117. Unhandled: unhandled,
  118. //Log: server.log,
  119. }
  120. ntab, err := discover.ListenV4(conn, server.localnode, cfg)
  121. if err != nil {
  122. return err
  123. }
  124. server.ntab = ntab
  125. server.discmix.AddSource(ntab.RandomNodes())
  126. return nil
  127. }
  128. func (server *Server) setupBootstrapNodes() (err error) {
  129. urls := params.MainnetBootNodes
  130. server.BootstrapNodes = make([]*enode.Node, 0, len(urls))
  131. for _, url := range urls {
  132. if url != "" {
  133. node, err := enode.Parse(enode.ValidSchemes, url)
  134. if err != nil {
  135. return err
  136. }
  137. server.BootstrapNodes = append(server.BootstrapNodes, node)
  138. }
  139. }
  140. return nil
  141. }
  142. // 设置拨号调度器
  143. func (server *Server) setupDialScheduler() {
  144. config := dialConfig{
  145. self: server.localnode.ID(),
  146. maxDialPeers: server.maxDialedConns(),
  147. maxActiveDials: server.MaxPendingPeers,
  148. log: server.Logger,
  149. netRestrict: server.NetRestrict,
  150. dialer: server.Dialer,
  151. clock: server.clock,
  152. }
  153. if server.ntab != nil {
  154. config.resolver = server.ntab
  155. }
  156. if config.dialer == nil {
  157. config.dialer = tcpDialer{&net.Dialer{Timeout: defaultDialTimeout}}
  158. }
  159. server.dialsched = newDialScheduler(config, server.discmix, server.SetupConn)
  160. //for _, n := range srv.StaticNodes {
  161. // srv.dialsched.addStatic(n)
  162. //}
  163. }
  164. func (server *Server) maxDialedConns() (limit int) {
  165. if server.NoDial || server.MaxPeers == 0 {
  166. return 0
  167. }
  168. if server.DialRatio == 0 {
  169. limit = server.MaxPeers / defaultDialRatio
  170. } else {
  171. limit = server.MaxPeers / server.DialRatio
  172. }
  173. if limit == 0 {
  174. limit = 1
  175. }
  176. return limit
  177. }
  178. // 配置本地节点
  179. func (server *Server) setupLocalNode() (err error) {
  180. // 创建握手所需对象
  181. publicKey := crypto.FromECDSAPub(&server.PrivateKey.PublicKey)
  182. server.ourHandshake = &protoHandshake{
  183. Version: baseProtocolVersion,
  184. Name: server.Name,
  185. ID: publicKey[1:],
  186. }
  187. // 创建本地节点
  188. server.localnode = enode.NewLocalNode(server.PrivateKey)
  189. server.localnode.SetFallbackIP(net.IP{127, 0, 0, 1})
  190. // 配置本地静态IP
  191. ip, _ := server.NAT.ExternalIP()
  192. server.localnode.SetStaticIP(ip)
  193. return nil
  194. }
  195. // 监听器
  196. func (server *Server) setupListening() (err error) {
  197. listener, err := net.Listen("tcp", server.ListenAddr)
  198. if err != nil {
  199. return err
  200. }
  201. server.listener = listener
  202. server.ListenAddr = listener.Addr().String()
  203. if tcp, ok := listener.Addr().(*net.TCPAddr); ok {
  204. server.localnode.Set(enr.TCP(tcp.Port))
  205. if !tcp.IP.IsLoopback() && server.NAT != nil {
  206. server.loopWG.Add(1)
  207. gopool.Submit(func() {
  208. nat.Map(server.NAT, server.quit, "tcp", tcp.Port, tcp.Port, "ethereum p2p")
  209. server.loopWG.Done()
  210. })
  211. }
  212. }
  213. server.loopWG.Add(1)
  214. go server.listenLoop()
  215. return nil
  216. }
  217. func (server *Server) listenLoop() {
  218. fmt.Printf("TCP Listener up, addr: %v.", server.listener.Addr())
  219. tokens := defaultMaxPendingPeers
  220. slots := make(chan struct{}, tokens)
  221. for i := 0; i < tokens; i++ {
  222. slots <- struct{}{}
  223. }
  224. defer server.loopWG.Done()
  225. defer func() {
  226. for i := 0; i < cap(slots); i++ {
  227. <-slots
  228. }
  229. }()
  230. for {
  231. <-slots
  232. var (
  233. fd net.Conn
  234. err error
  235. lastLogTime time.Time
  236. )
  237. // accept处理
  238. for {
  239. fd, err = server.listener.Accept()
  240. if netutil.IsTemporaryError(err) {
  241. if time.Since(lastLogTime) > 1*time.Second {
  242. fmt.Errorf("temporary read error, err: %v", err)
  243. lastLogTime = time.Now()
  244. }
  245. time.Sleep(time.Millisecond * 200)
  246. continue
  247. } else if err != nil {
  248. fmt.Errorf("read error, err: %v", err)
  249. slots <- struct{}{}
  250. return
  251. }
  252. break
  253. }
  254. // accept成功的处理
  255. remoteIP := netutil.AddrIP(fd.RemoteAddr())
  256. // TODO 检查此IP是是否能加入本地节点的链接
  257. //if err := server.checkInboundConn(remoteIP); err != nil {
  258. // srv.log.Debug("Rejected inbound connection", "addr", fd.RemoteAddr(), "err", err)
  259. // fd.Close()
  260. // slots <- struct{}{}
  261. // continue
  262. //}
  263. if remoteIP != nil {
  264. var addr *net.TCPAddr
  265. if tcp, ok := fd.RemoteAddr().(*net.TCPAddr); ok {
  266. addr = tcp
  267. }
  268. fd = newMeteredConn(fd, true, addr)
  269. fmt.Printf("Accepted connection, addr: %v.", fd.RemoteAddr())
  270. }
  271. gopool.Submit(func() {
  272. server.SetupConn(fd, inboundConn, nil)
  273. slots <- struct{}{}
  274. })
  275. }
  276. }
  277. func (server *Server) newRLPX(conn net.Conn, dialDest *ecdsa.PublicKey) transport {
  278. return &rlpxTransport{conn: rlpx.NewConn(conn, dialDest)}
  279. }
  280. func (server *Server) SetupConn(fd net.Conn, flags connFlag, dialDest *enode.Node) error {
  281. c := &conn{fd: fd, flags: flags, cont: make(chan error)}
  282. if dialDest == nil {
  283. c.transport = server.newRLPX(fd, nil)
  284. } else {
  285. c.transport = server.newRLPX(fd, dialDest.Pubkey())
  286. }
  287. err := server.setupConn(c, dialDest)
  288. if err != nil {
  289. c.close(err)
  290. }
  291. return err
  292. }
  293. func (server *Server) setupConn(c *conn, dialDest *enode.Node) error {
  294. remotePubkey, err := c.doEncHandshake(server.PrivateKey)
  295. if err != nil {
  296. return err
  297. }
  298. // 将connection转换成node
  299. c.node = enode.NodeFromConn(remotePubkey, c.fd)
  300. fmt.Printf("id: %v, addr: %v, conn: %v", c.node.ID(), c.fd.RemoteAddr(), c.flags)
  301. // 检查是否需要握手
  302. err = server.checkpoint(c, server.checkpointPostHandshake)
  303. if err != nil {
  304. return err
  305. }
  306. // 进行握手
  307. phs, err := c.doProtoHandshake(server.ourHandshake)
  308. if err != nil {
  309. return err
  310. }
  311. c.caps, c.name = phs.Caps, phs.Name
  312. // 将此链接放入addPeer的检查点
  313. err = server.checkpoint(c, server.checkpointAddPeer)
  314. if err != nil {
  315. return err
  316. }
  317. return nil
  318. }
  319. func (server *Server) checkpoint(c *conn, stage chan<- *conn) error {
  320. select {
  321. case stage <- c:
  322. case <-server.quit:
  323. return ErrServerStopped
  324. }
  325. return <-c.cont
  326. }
  327. // sharedUDPConn implements a shared connection. Write sends messages to the underlying connection while read returns
  328. // messages that were found unprocessable and sent to the unhandled channel by the primary listener.
  329. type sharedUDPConn struct {
  330. *net.UDPConn
  331. unhandled chan discover.ReadPacket
  332. }
  333. // ReadFromUDP implements discover.UDPConn
  334. func (s *sharedUDPConn) ReadFromUDP(b []byte) (n int, addr *net.UDPAddr, err error) {
  335. packet, ok := <-s.unhandled
  336. if !ok {
  337. return 0, nil, errors.New("connection was closed")
  338. }
  339. l := len(packet.Data)
  340. if l > len(b) {
  341. l = len(b)
  342. }
  343. copy(b[:l], packet.Data[:l])
  344. return l, packet.Addr, nil
  345. }
  346. // Close implements discover.UDPConn
  347. func (s *sharedUDPConn) Close() error {
  348. return nil
  349. }