server.go 12 KB


  1. package p2p
  2. import (
  3. "blockchain-go/common/gopool"
  4. "blockchain-go/event"
  5. "blockchain-go/p2p/discover"
  6. "blockchain-go/p2p/enode"
  7. "blockchain-go/p2p/enr"
  8. "blockchain-go/p2p/nat"
  9. "blockchain-go/p2p/rlpx"
  10. "blockchain-go/params"
  11. "crypto/ecdsa"
  12. "errors"
  13. "fmt"
  14. "github.com/ethereum/go-ethereum/crypto"
  15. "github.com/ethereum/go-ethereum/p2p/netutil"
  16. "net"
  17. "os"
  18. "os/signal"
  19. "sort"
  20. "sync"
  21. "syscall"
  22. "time"
  23. )
  24. var (
  25. ErrServerStopped = errors.New("server stopped")
  26. )
  27. // Server manages all peer connections.
  28. type Server struct {
  29. Config
  30. //newTransport func(net.Conn, *ecdsa.PublicKey) transport
  31. //newPeerHook func(*Peer)
  32. //listenFunc func(network, addr string) (net.Listener, error)
  33. lock sync.Mutex // protects running
  34. running bool
  35. listener net.Listener
  36. ourHandshake *protoHandshake
  37. loopWG sync.WaitGroup // loop, listenLoop
  38. peerFeed event.Feed
  39. //log log.Logger
  40. //nodedb *enode.DB
  41. localnode *enode.LocalNode
  42. ntab *discover.UDPv4
  43. //DiscV5 *discover.UDPv5
  44. discmix *enode.FairMix
  45. dialsched *dialScheduler
  46. quit chan struct{}
  47. //addtrusted chan *enode.Node
  48. //removetrusted chan *enode.Node
  49. //peerOp chan peerOpFunc
  50. //peerOpDone chan struct{}
  51. delpeer chan peerDrop
  52. checkpointPostHandshake chan *conn
  53. checkpointAddPeer chan *conn
  54. sigs chan os.Signal
  55. //inboundHistory expHeap
  56. }
  57. func (server *Server) Start() (err error) {
  58. server.sigs = make(chan os.Signal, 1)
  59. signal.Notify(server.sigs, syscall.SIGINT, syscall.SIGTERM)
  60. server.quit = make(chan struct{})
  61. server.checkpointPostHandshake = make(chan *conn)
  62. server.checkpointAddPeer = make(chan *conn)
  63. // 配置远端引导节点
  64. if err := server.setupBootstrapNodes(); err != nil {
  65. return err
  66. }
  67. // 配置本地协议
  68. if err := server.setupCaps(); err != nil {
  69. return nil
  70. }
  71. // 配置本地节点
  72. if err := server.setupLocalNode(); err != nil {
  73. return err
  74. }
  75. // 配置节点连接监听器
  76. if err := server.setupListening(); err != nil {
  77. return err
  78. }
  79. // 配置节点发现器
  80. if err := server.setupDiscovery(); err != nil {
  81. return err
  82. }
  83. // 配置拨号调度
  84. server.setupDialScheduler()
  85. // server核心运行
  86. go server.StopListener()
  87. server.run()
  88. return nil
  89. }
  90. func (server *Server) run() {
  91. server.loopWG.Add(1)
  92. fmt.Printf("Started P2P networking, self: %v.\n", server.localnode.Node().URLv4())
  93. var (
  94. peers = make(map[enode.ID]*Peer)
  95. inboundCount = 0
  96. )
  97. running:
  98. for {
  99. select {
  100. case <-server.quit:
  101. fmt.Printf("exit signal by user.\n")
  102. break running
  103. case c := <-server.checkpointPostHandshake:
  104. //p := server.launchPeer(c)
  105. //fmt.Printf("Check peer in local: %v\n", p.rw.node.IP())
  106. c.cont <- server.postHandshakeChecks(peers, inboundCount, c)
  107. case c := <-server.checkpointAddPeer:
  108. p := server.launchPeer(c)
  109. peers[c.node.ID()] = p
  110. server.dialsched.peerAdded(c)
  111. if p.Inbound() {
  112. inboundCount++
  113. }
  114. fmt.Printf("Adding p2p peer: %v, peer count: %v.\n", p.rw.node.IP(), len(peers))
  115. }
  116. }
  117. }
  118. func (server *Server) postHandshakeChecks(peers map[enode.ID]*Peer, inboundCount int, c *conn) error {
  119. switch {
  120. case !c.is(trustedConn) && len(peers) >= server.MaxPeers:
  121. return DiscTooManyPeers
  122. // TODO 暂时不做入站出站限制
  123. //case !c.is(trustedConn) && c.is(inboundConn) && inboundCount >= server.maxInboundConns():
  124. // return DiscTooManyPeers
  125. case peers[c.node.ID()] != nil:
  126. return DiscAlreadyConnected
  127. case c.node.ID() == server.localnode.ID():
  128. return DiscSelf
  129. default:
  130. return nil
  131. }
  132. }
  133. func (server *Server) launchPeer(c *conn) *Peer {
  134. p := newPeer(c, server.Protocols)
  135. gopool.Submit(func() {
  136. server.runPeer(p)
  137. })
  138. return p
  139. }
  140. func (server *Server) runPeer(p *Peer) {
  141. server.peerFeed.Send(&PeerEvent{
  142. Type: PeerEventTypeAdd,
  143. Peer: p.ID(),
  144. RemoteAddress: p.RemoteAddr().String(),
  145. LocalAddress: p.LocalAddr().String(),
  146. })
  147. remoteRequested, err := p.run()
  148. server.delpeer <- peerDrop{p, err, remoteRequested}
  149. server.peerFeed.Send(&PeerEvent{
  150. Type: PeerEventTypeDrop,
  151. Peer: p.ID(),
  152. Error: err.Error(),
  153. RemoteAddress: p.RemoteAddr().String(),
  154. LocalAddress: p.LocalAddr().String(),
  155. })
  156. }
  157. func (server *Server) StopListener() {
  158. <-server.sigs
  159. close(server.quit)
  160. server.loopWG.Done()
  161. server.discmix.Close()
  162. server.dialsched.stop()
  163. }
  164. // 本地协议
  165. func (server *Server) setupCaps() (err error) {
  166. // 创建握手器
  167. publicKey := crypto.FromECDSAPub(&server.PrivateKey.PublicKey)
  168. server.ourHandshake = &protoHandshake{
  169. Version: baseProtocolVersion,
  170. Name: server.Name,
  171. ID: publicKey[1:],
  172. }
  173. // 配置握手器
  174. for _, capability := range OurCaps {
  175. server.ourHandshake.Caps = append(server.ourHandshake.Caps, capability)
  176. }
  177. sort.Sort(capsByNameAndVersion(server.ourHandshake.Caps))
  178. return nil
  179. }
  180. // 配置节点发现逻辑
  181. func (server *Server) setupDiscovery() (err error) {
  182. server.discmix = enode.NewFairMix(discmixTimeout)
  183. // TODO 添加特定协议的发现源。
  184. //dnsclient := dnsdisc.NewClient(dnsdisc.Config{})
  185. //dialCandidates, err := dnsclient.NewIterator()
  186. //server.discmix.AddSource(dialCandidates)
  187. addr, err := net.ResolveUDPAddr("udp", server.ListenAddr)
  188. if err != nil {
  189. return err
  190. }
  191. conn, err := net.ListenUDP("udp", addr)
  192. if err != nil {
  193. return err
  194. }
  195. realAddr := conn.LocalAddr().(*net.UDPAddr)
  196. fmt.Printf("UDP listener up, addr: %v.\n", realAddr)
  197. if server.NAT != nil {
  198. if !realAddr.IP.IsLoopback() {
  199. server.loopWG.Add(1)
  200. gopool.Submit(func() {
  201. nat.Map(server.NAT, server.quit, "udp", realAddr.Port, realAddr.Port, "blockchain-p2p discovery")
  202. server.loopWG.Done()
  203. })
  204. }
  205. }
  206. server.localnode.SetFallbackUDP(realAddr.Port)
  207. // 设置V4的发现协议
  208. var unhandled chan discover.ReadPacket
  209. //var sconn *sharedUDPConn
  210. //if server.DiscoveryV5 {
  211. // unhandled = make(chan discover.ReadPacket, 100)
  212. // sconn = &sharedUDPConn{conn, unhandled}
  213. //}
  214. cfg := discover.Config{
  215. PrivateKey: server.PrivateKey,
  216. NetRestrict: server.NetRestrict,
  217. Bootnodes: server.BootstrapNodes,
  218. Unhandled: unhandled,
  219. //Log: server.log,
  220. }
  221. ntab, err := discover.ListenV4(conn, server.localnode, cfg)
  222. if err != nil {
  223. return err
  224. }
  225. server.ntab = ntab
  226. server.discmix.AddSource(ntab.RandomNodes())
  227. return nil
  228. }
  229. // 设置引导节点,更快发现指定网络
  230. func (server *Server) setupBootstrapNodes() (err error) {
  231. urls := params.MainnetBootNodes
  232. server.BootstrapNodes = make([]*enode.Node, 0, len(urls))
  233. for _, url := range urls {
  234. if url != "" {
  235. node, err := enode.Parse(enode.ValidSchemes, url)
  236. if err != nil {
  237. return err
  238. }
  239. server.BootstrapNodes = append(server.BootstrapNodes, node)
  240. }
  241. }
  242. return nil
  243. }
  244. // 设置拨号调度器
  245. func (server *Server) setupDialScheduler() {
  246. server.MaxPeers = params.MaxPeers
  247. server.MaxPendingPeers = params.MaxPendingPeers
  248. config := dialConfig{
  249. self: server.localnode.ID(),
  250. maxDialPeers: server.maxDialedConns(),
  251. maxActiveDials: server.MaxPendingPeers,
  252. log: server.Logger,
  253. netRestrict: server.NetRestrict,
  254. dialer: server.Dialer,
  255. clock: server.clock,
  256. }
  257. if server.ntab != nil {
  258. config.resolver = server.ntab
  259. }
  260. if config.dialer == nil {
  261. config.dialer = tcpDialer{&net.Dialer{Timeout: defaultDialTimeout}}
  262. }
  263. server.dialsched = newDialScheduler(config, server.discmix, server.SetupConn)
  264. }
  265. func (server *Server) maxDialedConns() (limit int) {
  266. if server.NoDial || server.MaxPeers == 0 {
  267. return 0
  268. }
  269. if server.DialRatio == 0 {
  270. limit = server.MaxPeers / defaultDialRatio
  271. } else {
  272. limit = server.MaxPeers / server.DialRatio
  273. }
  274. if limit == 0 {
  275. limit = 1
  276. }
  277. return limit
  278. }
  279. // 配置本地节点
  280. func (server *Server) setupLocalNode() (err error) {
  281. // 创建本地节点
  282. server.localnode = enode.NewLocalNode(server.PrivateKey)
  283. server.localnode.SetFallbackIP(net.IP{127, 0, 0, 1})
  284. // 配置本地静态IP
  285. ip, _ := server.NAT.ExternalIP()
  286. server.localnode.SetStaticIP(ip)
  287. return nil
  288. }
  289. // 设置新节点连接监听器
  290. func (server *Server) setupListening() (err error) {
  291. listener, err := net.Listen("tcp", params.ListenerPort)
  292. if err != nil {
  293. return err
  294. }
  295. server.listener = listener
  296. server.ListenAddr = listener.Addr().String()
  297. if tcp, ok := listener.Addr().(*net.TCPAddr); ok {
  298. server.localnode.Set(enr.TCP(tcp.Port))
  299. if !tcp.IP.IsLoopback() && server.NAT != nil {
  300. server.loopWG.Add(1)
  301. gopool.Submit(func() {
  302. nat.Map(server.NAT, server.quit, "tcp", tcp.Port, tcp.Port, "blockchain-p2p")
  303. server.loopWG.Done()
  304. })
  305. }
  306. }
  307. server.loopWG.Add(1)
  308. go server.listenLoop()
  309. return nil
  310. }
  311. func (server *Server) listenLoop() {
  312. fmt.Printf("TCP Listener up, addr: %v.\n", server.listener.Addr())
  313. tokens := defaultMaxPendingPeers
  314. slots := make(chan struct{}, tokens)
  315. for i := 0; i < tokens; i++ {
  316. slots <- struct{}{}
  317. }
  318. defer server.loopWG.Done()
  319. defer func() {
  320. for i := 0; i < cap(slots); i++ {
  321. <-slots
  322. }
  323. }()
  324. for {
  325. <-slots
  326. var (
  327. fd net.Conn
  328. err error
  329. lastLogTime time.Time
  330. )
  331. // accept处理
  332. for {
  333. fd, err = server.listener.Accept()
  334. if netutil.IsTemporaryError(err) {
  335. if time.Since(lastLogTime) > 1*time.Second {
  336. fmt.Errorf("temporary read error, err: %v", err)
  337. lastLogTime = time.Now()
  338. }
  339. time.Sleep(time.Millisecond * 200)
  340. continue
  341. } else if err != nil {
  342. fmt.Errorf("read error, err: %v", err)
  343. slots <- struct{}{}
  344. return
  345. }
  346. break
  347. }
  348. // accept成功的处理
  349. remoteIP := netutil.AddrIP(fd.RemoteAddr())
  350. // TODO 检查此IP是是否能加入本地节点的链接
  351. //if err := server.checkInboundConn(remoteIP); err != nil {
  352. // srv.log.Debug("Rejected inbound connection", "addr", fd.RemoteAddr(), "err", err)
  353. // fd.Close()
  354. // slots <- struct{}{}
  355. // continue
  356. //}
  357. if remoteIP != nil {
  358. var addr *net.TCPAddr
  359. if tcp, ok := fd.RemoteAddr().(*net.TCPAddr); ok {
  360. addr = tcp
  361. }
  362. fd = newMeteredConn(fd, true, addr)
  363. fmt.Printf("Accepted connection, addr: %v.", fd.RemoteAddr())
  364. }
  365. gopool.Submit(func() {
  366. server.SetupConn(fd, inboundConn, nil)
  367. slots <- struct{}{}
  368. })
  369. }
  370. }
  371. func (server *Server) newRLPX(conn net.Conn, dialDest *ecdsa.PublicKey) transport {
  372. return &rlpxTransport{conn: rlpx.NewConn(conn, dialDest)}
  373. }
  374. func (server *Server) SetupConn(fd net.Conn, flags connFlag, dialDest *enode.Node) error {
  375. //fmt.Printf("Setup conn %v.\n", fd.RemoteAddr())
  376. c := &conn{fd: fd, flags: flags, cont: make(chan error)}
  377. if dialDest == nil {
  378. c.transport = server.newRLPX(fd, nil)
  379. } else {
  380. c.transport = server.newRLPX(fd, dialDest.Pubkey())
  381. }
  382. err := server.setupConn(c, dialDest)
  383. if err != nil {
  384. c.close(err)
  385. }
  386. return err
  387. }
  388. func (server *Server) setupConn(c *conn, dialDest *enode.Node) error {
  389. remotePubkey, err := c.doEncHandshake(server.PrivateKey)
  390. if err != nil {
  391. return err
  392. }
  393. // 将connection转换成node
  394. c.node = enode.NodeFromConn(remotePubkey, c.fd)
  395. //fmt.Printf("Parse node: id: %v, addr: %v\n", c.node.ID(), c.fd.RemoteAddr())
  396. // 检查是否需要握手
  397. err = server.checkpoint(c, server.checkpointPostHandshake)
  398. if err != nil {
  399. return err
  400. }
  401. // 进行握手
  402. phs, err := c.doProtoHandshake(server.ourHandshake)
  403. if err != nil {
  404. return err
  405. }
  406. c.caps, c.name = phs.Caps, phs.Name
  407. //fmt.Printf("Handshake ok, id: %v, addr: %v.\n", c.node.ID(), c.fd.RemoteAddr())
  408. // 握手成功后将此链接放入addPeer的检查点
  409. err = server.checkpoint(c, server.checkpointAddPeer)
  410. if err != nil {
  411. return err
  412. }
  413. return nil
  414. }
  415. func (server *Server) checkpoint(c *conn, stage chan<- *conn) error {
  416. select {
  417. case stage <- c:
  418. case <-server.quit:
  419. return ErrServerStopped
  420. }
  421. return <-c.cont
  422. }