server.go 11 KB


  1. package p2p
  2. import (
  3. "blockchain-go/common/gopool"
  4. "blockchain-go/p2p/discover"
  5. "blockchain-go/p2p/enode"
  6. "blockchain-go/p2p/enr"
  7. "blockchain-go/p2p/nat"
  8. "blockchain-go/p2p/rlpx"
  9. "blockchain-go/params"
  10. "crypto/ecdsa"
  11. "errors"
  12. "fmt"
  13. "github.com/ethereum/go-ethereum/crypto"
  14. "github.com/ethereum/go-ethereum/p2p/netutil"
  15. "net"
  16. "os"
  17. "os/signal"
  18. "sort"
  19. "sync"
  20. "syscall"
  21. "time"
  22. )
  23. var (
  24. ErrServerStopped = errors.New("server stopped")
  25. )
  26. // Server manages all peer connections.
  27. type Server struct {
  28. Config
  29. //newTransport func(net.Conn, *ecdsa.PublicKey) transport
  30. //newPeerHook func(*Peer)
  31. //listenFunc func(network, addr string) (net.Listener, error)
  32. lock sync.Mutex // protects running
  33. running bool
  34. listener net.Listener
  35. ourHandshake *protoHandshake
  36. loopWG sync.WaitGroup // loop, listenLoop
  37. //peerFeed event.Feed
  38. //log log.Logger
  39. //nodedb *enode.DB
  40. localnode *enode.LocalNode
  41. ntab *discover.UDPv4
  42. //DiscV5 *discover.UDPv5
  43. discmix *enode.FairMix
  44. dialsched *dialScheduler
  45. quit chan struct{}
  46. //addtrusted chan *enode.Node
  47. //removetrusted chan *enode.Node
  48. //peerOp chan peerOpFunc
  49. //peerOpDone chan struct{}
  50. //delpeer chan peerDrop
  51. checkpointPostHandshake chan *conn
  52. checkpointAddPeer chan *conn
  53. sigs chan os.Signal
  54. //inboundHistory expHeap
  55. }
  56. func (server *Server) Start() (err error) {
  57. server.sigs = make(chan os.Signal, 1)
  58. signal.Notify(server.sigs, syscall.SIGINT, syscall.SIGTERM)
  59. server.quit = make(chan struct{})
  60. server.checkpointPostHandshake = make(chan *conn)
  61. server.checkpointAddPeer = make(chan *conn)
  62. // 配置远端引导节点
  63. if err := server.setupBootstrapNodes(); err != nil {
  64. return err
  65. }
  66. // 配置本地协议
  67. if err := server.setupCaps(); err != nil {
  68. return nil
  69. }
  70. // 配置本地节点
  71. if err := server.setupLocalNode(); err != nil {
  72. return err
  73. }
  74. // 配置节点连接监听器
  75. if err := server.setupListening(); err != nil {
  76. return err
  77. }
  78. // 配置节点发现器
  79. if err := server.setupDiscovery(); err != nil {
  80. return err
  81. }
  82. // 配置拨号调度
  83. server.setupDialScheduler()
  84. // server核心运行
  85. go server.StopListener()
  86. server.run()
  87. return nil
  88. }
  89. func (server *Server) run() {
  90. server.loopWG.Add(1)
  91. fmt.Printf("Started P2P networking, self: %v.\n", server.localnode.Node().URLv4())
  92. var (
  93. peers = make(map[enode.ID]*Peer)
  94. inboundCount = 0
  95. )
  96. running:
  97. for {
  98. select {
  99. case <-server.quit:
  100. fmt.Printf("exit signal by user.\n")
  101. break running
  102. case c := <-server.checkpointPostHandshake:
  103. //p := server.launchPeer(c)
  104. //fmt.Printf("Check peer in local: %v\n", p.rw.node.IP())
  105. c.cont <- server.postHandshakeChecks(peers, inboundCount, c)
  106. case c := <-server.checkpointAddPeer:
  107. fmt.Printf("checkpointAddPeer: %v.\n", c.name)
  108. }
  109. }
  110. }
  111. func (server *Server) postHandshakeChecks(peers map[enode.ID]*Peer, inboundCount int, c *conn) error {
  112. switch {
  113. case !c.is(trustedConn) && len(peers) >= server.MaxPeers:
  114. return DiscTooManyPeers
  115. //case !c.is(trustedConn) && c.is(inboundConn) && inboundCount >= server.maxInboundConns():
  116. // return DiscTooManyPeers
  117. case peers[c.node.ID()] != nil:
  118. return DiscAlreadyConnected
  119. case c.node.ID() == server.localnode.ID():
  120. return DiscSelf
  121. default:
  122. return nil
  123. }
  124. }
  125. func (server *Server) launchPeer(c *conn) *Peer {
  126. p := newPeer(c, server.Protocols)
  127. // ❌ 首先完成eth协议,封装在在eth/protocols/eth/handler.go
  128. // TODO 上一步先不做。干脆在实现peer功能过程中再来看,究竟是什么过程需要protocols?
  129. return p
  130. }
  131. func (server *Server) StopListener() {
  132. <-server.sigs
  133. close(server.quit)
  134. server.loopWG.Done()
  135. server.discmix.Close()
  136. server.dialsched.stop()
  137. }
  138. // 本地协议
  139. func (server *Server) setupCaps() (err error) {
  140. // 创建握手器
  141. publicKey := crypto.FromECDSAPub(&server.PrivateKey.PublicKey)
  142. server.ourHandshake = &protoHandshake{
  143. Version: baseProtocolVersion,
  144. Name: server.Name,
  145. ID: publicKey[1:],
  146. }
  147. // 配置握手器
  148. for _, capability := range OurCaps {
  149. server.ourHandshake.Caps = append(server.ourHandshake.Caps, capability)
  150. }
  151. sort.Sort(capsByNameAndVersion(server.ourHandshake.Caps))
  152. return nil
  153. }
  154. // 配置节点发现逻辑
  155. func (server *Server) setupDiscovery() (err error) {
  156. server.discmix = enode.NewFairMix(discmixTimeout)
  157. // TODO 添加特定协议的发现源。
  158. //dnsclient := dnsdisc.NewClient(dnsdisc.Config{})
  159. //dialCandidates, err := dnsclient.NewIterator()
  160. //server.discmix.AddSource(dialCandidates)
  161. addr, err := net.ResolveUDPAddr("udp", server.ListenAddr)
  162. if err != nil {
  163. return err
  164. }
  165. conn, err := net.ListenUDP("udp", addr)
  166. if err != nil {
  167. return err
  168. }
  169. realAddr := conn.LocalAddr().(*net.UDPAddr)
  170. fmt.Printf("UDP listener up, addr: %v.\n", realAddr)
  171. if server.NAT != nil {
  172. if !realAddr.IP.IsLoopback() {
  173. server.loopWG.Add(1)
  174. gopool.Submit(func() {
  175. nat.Map(server.NAT, server.quit, "udp", realAddr.Port, realAddr.Port, "blockchain-p2p discovery")
  176. server.loopWG.Done()
  177. })
  178. }
  179. }
  180. server.localnode.SetFallbackUDP(realAddr.Port)
  181. // 设置V4的发现协议
  182. var unhandled chan discover.ReadPacket
  183. //var sconn *sharedUDPConn
  184. //if server.DiscoveryV5 {
  185. // unhandled = make(chan discover.ReadPacket, 100)
  186. // sconn = &sharedUDPConn{conn, unhandled}
  187. //}
  188. cfg := discover.Config{
  189. PrivateKey: server.PrivateKey,
  190. NetRestrict: server.NetRestrict,
  191. Bootnodes: server.BootstrapNodes,
  192. Unhandled: unhandled,
  193. //Log: server.log,
  194. }
  195. ntab, err := discover.ListenV4(conn, server.localnode, cfg)
  196. if err != nil {
  197. return err
  198. }
  199. server.ntab = ntab
  200. server.discmix.AddSource(ntab.RandomNodes())
  201. return nil
  202. }
  203. // 设置引导节点,更快发现指定网络
  204. func (server *Server) setupBootstrapNodes() (err error) {
  205. urls := params.MainnetBootNodes
  206. server.BootstrapNodes = make([]*enode.Node, 0, len(urls))
  207. for _, url := range urls {
  208. if url != "" {
  209. node, err := enode.Parse(enode.ValidSchemes, url)
  210. if err != nil {
  211. return err
  212. }
  213. server.BootstrapNodes = append(server.BootstrapNodes, node)
  214. }
  215. }
  216. return nil
  217. }
  218. // 设置拨号调度器
  219. func (server *Server) setupDialScheduler() {
  220. server.MaxPeers = params.MaxPeers
  221. server.MaxPendingPeers = params.MaxPendingPeers
  222. config := dialConfig{
  223. self: server.localnode.ID(),
  224. maxDialPeers: server.maxDialedConns(),
  225. maxActiveDials: server.MaxPendingPeers,
  226. log: server.Logger,
  227. netRestrict: server.NetRestrict,
  228. dialer: server.Dialer,
  229. clock: server.clock,
  230. }
  231. if server.ntab != nil {
  232. config.resolver = server.ntab
  233. }
  234. if config.dialer == nil {
  235. config.dialer = tcpDialer{&net.Dialer{Timeout: defaultDialTimeout}}
  236. }
  237. server.dialsched = newDialScheduler(config, server.discmix, server.SetupConn)
  238. }
  239. func (server *Server) maxDialedConns() (limit int) {
  240. if server.NoDial || server.MaxPeers == 0 {
  241. return 0
  242. }
  243. if server.DialRatio == 0 {
  244. limit = server.MaxPeers / defaultDialRatio
  245. } else {
  246. limit = server.MaxPeers / server.DialRatio
  247. }
  248. if limit == 0 {
  249. limit = 1
  250. }
  251. return limit
  252. }
  253. // 配置本地节点
  254. func (server *Server) setupLocalNode() (err error) {
  255. // 创建本地节点
  256. server.localnode = enode.NewLocalNode(server.PrivateKey)
  257. server.localnode.SetFallbackIP(net.IP{127, 0, 0, 1})
  258. // 配置本地静态IP
  259. ip, _ := server.NAT.ExternalIP()
  260. server.localnode.SetStaticIP(ip)
  261. return nil
  262. }
  263. // 设置新节点连接监听器
  264. func (server *Server) setupListening() (err error) {
  265. listener, err := net.Listen("tcp", params.ListenerPort)
  266. if err != nil {
  267. return err
  268. }
  269. server.listener = listener
  270. server.ListenAddr = listener.Addr().String()
  271. if tcp, ok := listener.Addr().(*net.TCPAddr); ok {
  272. server.localnode.Set(enr.TCP(tcp.Port))
  273. if !tcp.IP.IsLoopback() && server.NAT != nil {
  274. server.loopWG.Add(1)
  275. gopool.Submit(func() {
  276. nat.Map(server.NAT, server.quit, "tcp", tcp.Port, tcp.Port, "blockchain-p2p")
  277. server.loopWG.Done()
  278. })
  279. }
  280. }
  281. server.loopWG.Add(1)
  282. go server.listenLoop()
  283. return nil
  284. }
  285. func (server *Server) listenLoop() {
  286. fmt.Printf("TCP Listener up, addr: %v.\n", server.listener.Addr())
  287. tokens := defaultMaxPendingPeers
  288. slots := make(chan struct{}, tokens)
  289. for i := 0; i < tokens; i++ {
  290. slots <- struct{}{}
  291. }
  292. defer server.loopWG.Done()
  293. defer func() {
  294. for i := 0; i < cap(slots); i++ {
  295. <-slots
  296. }
  297. }()
  298. for {
  299. <-slots
  300. var (
  301. fd net.Conn
  302. err error
  303. lastLogTime time.Time
  304. )
  305. // accept处理
  306. for {
  307. fd, err = server.listener.Accept()
  308. if netutil.IsTemporaryError(err) {
  309. if time.Since(lastLogTime) > 1*time.Second {
  310. fmt.Errorf("temporary read error, err: %v", err)
  311. lastLogTime = time.Now()
  312. }
  313. time.Sleep(time.Millisecond * 200)
  314. continue
  315. } else if err != nil {
  316. fmt.Errorf("read error, err: %v", err)
  317. slots <- struct{}{}
  318. return
  319. }
  320. break
  321. }
  322. // accept成功的处理
  323. remoteIP := netutil.AddrIP(fd.RemoteAddr())
  324. // TODO 检查此IP是是否能加入本地节点的链接
  325. //if err := server.checkInboundConn(remoteIP); err != nil {
  326. // srv.log.Debug("Rejected inbound connection", "addr", fd.RemoteAddr(), "err", err)
  327. // fd.Close()
  328. // slots <- struct{}{}
  329. // continue
  330. //}
  331. if remoteIP != nil {
  332. var addr *net.TCPAddr
  333. if tcp, ok := fd.RemoteAddr().(*net.TCPAddr); ok {
  334. addr = tcp
  335. }
  336. fd = newMeteredConn(fd, true, addr)
  337. fmt.Printf("Accepted connection, addr: %v.", fd.RemoteAddr())
  338. }
  339. gopool.Submit(func() {
  340. server.SetupConn(fd, inboundConn, nil)
  341. slots <- struct{}{}
  342. })
  343. }
  344. }
  345. func (server *Server) newRLPX(conn net.Conn, dialDest *ecdsa.PublicKey) transport {
  346. return &rlpxTransport{conn: rlpx.NewConn(conn, dialDest)}
  347. }
  348. func (server *Server) SetupConn(fd net.Conn, flags connFlag, dialDest *enode.Node) error {
  349. //fmt.Printf("Setup conn %v.\n", fd.RemoteAddr())
  350. c := &conn{fd: fd, flags: flags, cont: make(chan error)}
  351. if dialDest == nil {
  352. c.transport = server.newRLPX(fd, nil)
  353. } else {
  354. c.transport = server.newRLPX(fd, dialDest.Pubkey())
  355. }
  356. err := server.setupConn(c, dialDest)
  357. if err != nil {
  358. c.close(err)
  359. }
  360. return err
  361. }
  362. func (server *Server) setupConn(c *conn, dialDest *enode.Node) error {
  363. remotePubkey, err := c.doEncHandshake(server.PrivateKey)
  364. if err != nil {
  365. return err
  366. }
  367. // 将connection转换成node
  368. c.node = enode.NodeFromConn(remotePubkey, c.fd)
  369. //fmt.Printf("Parse node: id: %v, addr: %v\n", c.node.ID(), c.fd.RemoteAddr())
  370. // 检查是否需要握手
  371. err = server.checkpoint(c, server.checkpointPostHandshake)
  372. if err != nil {
  373. return err
  374. }
  375. // 进行握手
  376. phs, err := c.doProtoHandshake(server.ourHandshake)
  377. if err != nil {
  378. return err
  379. }
  380. c.caps, c.name = phs.Caps, phs.Name
  381. fmt.Printf("Handshake ok, id: %v, addr: %v.\n", c.node.ID(), c.fd.RemoteAddr())
  382. // 握手成功后将此链接放入addPeer的检查点
  383. err = server.checkpoint(c, server.checkpointAddPeer)
  384. if err != nil {
  385. return err
  386. }
  387. return nil
  388. }
  389. func (server *Server) checkpoint(c *conn, stage chan<- *conn) error {
  390. select {
  391. case stage <- c:
  392. case <-server.quit:
  393. return ErrServerStopped
  394. }
  395. return <-c.cont
  396. }