server.go 10.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447
  1. package p2p
  2. import (
  3. "blockchain-go/common/gopool"
  4. "blockchain-go/p2p/discover"
  5. "blockchain-go/p2p/enode"
  6. "blockchain-go/p2p/enr"
  7. "blockchain-go/p2p/nat"
  8. "blockchain-go/p2p/rlpx"
  9. "blockchain-go/params"
  10. "crypto/ecdsa"
  11. "errors"
  12. "fmt"
  13. "github.com/ethereum/go-ethereum/crypto"
  14. "github.com/ethereum/go-ethereum/p2p/netutil"
  15. "net"
  16. "os"
  17. "os/signal"
  18. "sort"
  19. "sync"
  20. "syscall"
  21. "time"
  22. )
  23. var (
  24. ErrServerStopped = errors.New("server stopped")
  25. )
  26. // Server manages all peer connections.
  27. type Server struct {
  28. Config
  29. //newTransport func(net.Conn, *ecdsa.PublicKey) transport
  30. //newPeerHook func(*Peer)
  31. //listenFunc func(network, addr string) (net.Listener, error)
  32. lock sync.Mutex // protects running
  33. running bool
  34. listener net.Listener
  35. ourHandshake *protoHandshake
  36. loopWG sync.WaitGroup // loop, listenLoop
  37. //peerFeed event.Feed
  38. //log log.Logger
  39. //nodedb *enode.DB
  40. localnode *enode.LocalNode
  41. ntab *discover.UDPv4
  42. //DiscV5 *discover.UDPv5
  43. discmix *enode.FairMix
  44. dialsched *dialScheduler
  45. quit chan struct{}
  46. //addtrusted chan *enode.Node
  47. //removetrusted chan *enode.Node
  48. //peerOp chan peerOpFunc
  49. //peerOpDone chan struct{}
  50. //delpeer chan peerDrop
  51. checkpointPostHandshake chan *conn
  52. checkpointAddPeer chan *conn
  53. sigs chan os.Signal
  54. //inboundHistory expHeap
  55. }
  56. func (server *Server) Start() (err error) {
  57. server.sigs = make(chan os.Signal, 1)
  58. signal.Notify(server.sigs, syscall.SIGINT, syscall.SIGTERM)
  59. server.quit = make(chan struct{})
  60. server.checkpointPostHandshake = make(chan *conn)
  61. server.checkpointAddPeer = make(chan *conn)
  62. // 配置远端引导节点
  63. if err := server.setupBootstrapNodes(); err != nil {
  64. return err
  65. }
  66. // 配置本地协议
  67. if err := server.setupCaps(); err != nil {
  68. return nil
  69. }
  70. // 配置本地节点
  71. if err := server.setupLocalNode(); err != nil {
  72. return err
  73. }
  74. // 配置节点连接监听器
  75. if err := server.setupListening(); err != nil {
  76. return err
  77. }
  78. // 配置节点发现器
  79. if err := server.setupDiscovery(); err != nil {
  80. return err
  81. }
  82. // 配置拨号调度
  83. server.setupDialScheduler()
  84. // server核心运行
  85. go server.StopListener()
  86. server.run()
  87. return nil
  88. }
  89. func (server *Server) run() {
  90. server.loopWG.Add(1)
  91. fmt.Printf("Started P2P networking, self: %v.\n", server.localnode.Node().URLv4())
  92. //var (
  93. // peers = make(map[enode.ID]*Peer)
  94. // inboundCount = 0
  95. //)
  96. running:
  97. for {
  98. select {
  99. case <-server.quit:
  100. fmt.Printf("exit signal by user.\n")
  101. break running
  102. case c := <-server.checkpointPostHandshake:
  103. fmt.Printf("checkpointPostHandshake: %v\n", c.name)
  104. case c := <-server.checkpointAddPeer:
  105. fmt.Printf("checkpointAddPeer: %v.\n", c.name)
  106. }
  107. }
  108. }
  109. func (server *Server) StopListener() {
  110. <-server.sigs
  111. close(server.quit)
  112. server.loopWG.Done()
  113. server.discmix.Close()
  114. server.dialsched.stop()
  115. }
  116. // 本地协议
  117. func (server *Server) setupCaps() (err error) {
  118. // 创建握手器
  119. publicKey := crypto.FromECDSAPub(&server.PrivateKey.PublicKey)
  120. server.ourHandshake = &protoHandshake{
  121. Version: baseProtocolVersion,
  122. Name: server.Name,
  123. ID: publicKey[1:],
  124. }
  125. // 配置握手器
  126. for _, capability := range OurCaps {
  127. server.ourHandshake.Caps = append(server.ourHandshake.Caps, capability)
  128. }
  129. sort.Sort(capsByNameAndVersion(server.ourHandshake.Caps))
  130. return nil
  131. }
  132. // 配置节点发现逻辑
  133. func (server *Server) setupDiscovery() (err error) {
  134. server.discmix = enode.NewFairMix(discmixTimeout)
  135. // 添加特定协议的发现源。
  136. added := make(map[string]bool)
  137. for _, proto := range server.Protocols {
  138. if proto.DialCandidates != nil && !added[proto.Name] {
  139. server.discmix.AddSource(proto.DialCandidates)
  140. added[proto.Name] = true
  141. }
  142. }
  143. addr, err := net.ResolveUDPAddr("udp", server.ListenAddr)
  144. if err != nil {
  145. return err
  146. }
  147. conn, err := net.ListenUDP("udp", addr)
  148. if err != nil {
  149. return err
  150. }
  151. realAddr := conn.LocalAddr().(*net.UDPAddr)
  152. fmt.Printf("UDP listener up, addr: %v.\n", realAddr)
  153. if server.NAT != nil {
  154. if !realAddr.IP.IsLoopback() {
  155. server.loopWG.Add(1)
  156. gopool.Submit(func() {
  157. nat.Map(server.NAT, server.quit, "udp", realAddr.Port, realAddr.Port, "blockchain-p2p discovery")
  158. server.loopWG.Done()
  159. })
  160. }
  161. }
  162. server.localnode.SetFallbackUDP(realAddr.Port)
  163. // 设置V4的发现协议
  164. var unhandled chan discover.ReadPacket
  165. //var sconn *sharedUDPConn
  166. //if server.DiscoveryV5 {
  167. // unhandled = make(chan discover.ReadPacket, 100)
  168. // sconn = &sharedUDPConn{conn, unhandled}
  169. //}
  170. cfg := discover.Config{
  171. PrivateKey: server.PrivateKey,
  172. NetRestrict: server.NetRestrict,
  173. Bootnodes: server.BootstrapNodes,
  174. Unhandled: unhandled,
  175. //Log: server.log,
  176. }
  177. ntab, err := discover.ListenV4(conn, server.localnode, cfg)
  178. if err != nil {
  179. return err
  180. }
  181. server.ntab = ntab
  182. server.discmix.AddSource(ntab.RandomNodes())
  183. return nil
  184. }
  185. // 设置引导节点,更快发现指定网络
  186. func (server *Server) setupBootstrapNodes() (err error) {
  187. urls := params.MainnetBootNodes
  188. server.BootstrapNodes = make([]*enode.Node, 0, len(urls))
  189. for _, url := range urls {
  190. if url != "" {
  191. node, err := enode.Parse(enode.ValidSchemes, url)
  192. if err != nil {
  193. return err
  194. }
  195. server.BootstrapNodes = append(server.BootstrapNodes, node)
  196. }
  197. }
  198. return nil
  199. }
  200. // 设置拨号调度器
  201. func (server *Server) setupDialScheduler() {
  202. config := dialConfig{
  203. self: server.localnode.ID(),
  204. maxDialPeers: server.maxDialedConns(),
  205. maxActiveDials: server.MaxPendingPeers,
  206. log: server.Logger,
  207. netRestrict: server.NetRestrict,
  208. dialer: server.Dialer,
  209. clock: server.clock,
  210. }
  211. if server.ntab != nil {
  212. config.resolver = server.ntab
  213. }
  214. if config.dialer == nil {
  215. config.dialer = tcpDialer{&net.Dialer{Timeout: defaultDialTimeout}}
  216. }
  217. server.dialsched = newDialScheduler(config, server.discmix, server.SetupConn)
  218. }
  219. func (server *Server) maxDialedConns() (limit int) {
  220. if server.NoDial || server.MaxPeers == 0 {
  221. return 0
  222. }
  223. if server.DialRatio == 0 {
  224. limit = server.MaxPeers / defaultDialRatio
  225. } else {
  226. limit = server.MaxPeers / server.DialRatio
  227. }
  228. if limit == 0 {
  229. limit = 1
  230. }
  231. return limit
  232. }
  233. // 配置本地节点
  234. func (server *Server) setupLocalNode() (err error) {
  235. // 创建本地节点
  236. server.localnode = enode.NewLocalNode(server.PrivateKey)
  237. server.localnode.SetFallbackIP(net.IP{127, 0, 0, 1})
  238. // 配置本地静态IP
  239. ip, _ := server.NAT.ExternalIP()
  240. server.localnode.SetStaticIP(ip)
  241. return nil
  242. }
  243. // 设置新节点连接监听器
  244. func (server *Server) setupListening() (err error) {
  245. listener, err := net.Listen("tcp", server.ListenAddr)
  246. if err != nil {
  247. return err
  248. }
  249. server.listener = listener
  250. server.ListenAddr = listener.Addr().String()
  251. if tcp, ok := listener.Addr().(*net.TCPAddr); ok {
  252. server.localnode.Set(enr.TCP(tcp.Port))
  253. if !tcp.IP.IsLoopback() && server.NAT != nil {
  254. server.loopWG.Add(1)
  255. gopool.Submit(func() {
  256. nat.Map(server.NAT, server.quit, "tcp", tcp.Port, tcp.Port, "blockchain-p2p")
  257. server.loopWG.Done()
  258. })
  259. }
  260. }
  261. server.loopWG.Add(1)
  262. go server.listenLoop()
  263. return nil
  264. }
  265. func (server *Server) listenLoop() {
  266. fmt.Printf("TCP Listener up, addr: %v.\n", server.listener.Addr())
  267. tokens := defaultMaxPendingPeers
  268. slots := make(chan struct{}, tokens)
  269. for i := 0; i < tokens; i++ {
  270. slots <- struct{}{}
  271. }
  272. defer server.loopWG.Done()
  273. defer func() {
  274. for i := 0; i < cap(slots); i++ {
  275. <-slots
  276. }
  277. }()
  278. for {
  279. <-slots
  280. var (
  281. fd net.Conn
  282. err error
  283. lastLogTime time.Time
  284. )
  285. // accept处理
  286. for {
  287. fd, err = server.listener.Accept()
  288. if netutil.IsTemporaryError(err) {
  289. if time.Since(lastLogTime) > 1*time.Second {
  290. fmt.Errorf("temporary read error, err: %v", err)
  291. lastLogTime = time.Now()
  292. }
  293. time.Sleep(time.Millisecond * 200)
  294. continue
  295. } else if err != nil {
  296. fmt.Errorf("read error, err: %v", err)
  297. slots <- struct{}{}
  298. return
  299. }
  300. break
  301. }
  302. // accept成功的处理
  303. remoteIP := netutil.AddrIP(fd.RemoteAddr())
  304. // TODO 检查此IP是是否能加入本地节点的链接
  305. //if err := server.checkInboundConn(remoteIP); err != nil {
  306. // srv.log.Debug("Rejected inbound connection", "addr", fd.RemoteAddr(), "err", err)
  307. // fd.Close()
  308. // slots <- struct{}{}
  309. // continue
  310. //}
  311. if remoteIP != nil {
  312. var addr *net.TCPAddr
  313. if tcp, ok := fd.RemoteAddr().(*net.TCPAddr); ok {
  314. addr = tcp
  315. }
  316. fd = newMeteredConn(fd, true, addr)
  317. fmt.Printf("Accepted connection, addr: %v.", fd.RemoteAddr())
  318. }
  319. gopool.Submit(func() {
  320. server.SetupConn(fd, inboundConn, nil)
  321. slots <- struct{}{}
  322. })
  323. }
  324. }
  325. func (server *Server) newRLPX(conn net.Conn, dialDest *ecdsa.PublicKey) transport {
  326. return &rlpxTransport{conn: rlpx.NewConn(conn, dialDest)}
  327. }
  328. func (server *Server) SetupConn(fd net.Conn, flags connFlag, dialDest *enode.Node) error {
  329. fmt.Printf("setup conn %v.", fd.RemoteAddr())
  330. c := &conn{fd: fd, flags: flags, cont: make(chan error)}
  331. if dialDest == nil {
  332. c.transport = server.newRLPX(fd, nil)
  333. } else {
  334. c.transport = server.newRLPX(fd, dialDest.Pubkey())
  335. }
  336. err := server.setupConn(c, dialDest)
  337. if err != nil {
  338. c.close(err)
  339. }
  340. return err
  341. }
  342. func (server *Server) setupConn(c *conn, dialDest *enode.Node) error {
  343. remotePubkey, err := c.doEncHandshake(server.PrivateKey)
  344. if err != nil {
  345. return err
  346. }
  347. // 将connection转换成node
  348. c.node = enode.NodeFromConn(remotePubkey, c.fd)
  349. fmt.Printf("id: %v, addr: %v, conn: %v", c.node.ID(), c.fd.RemoteAddr(), c.flags)
  350. // 检查是否需要握手
  351. err = server.checkpoint(c, server.checkpointPostHandshake)
  352. if err != nil {
  353. return err
  354. }
  355. // 进行握手
  356. phs, err := c.doProtoHandshake(server.ourHandshake)
  357. if err != nil {
  358. return err
  359. }
  360. c.caps, c.name = phs.Caps, phs.Name
  361. // 握手成功后将此链接放入addPeer的检查点
  362. err = server.checkpoint(c, server.checkpointAddPeer)
  363. if err != nil {
  364. return err
  365. }
  366. return nil
  367. }
  368. func (server *Server) checkpoint(c *conn, stage chan<- *conn) error {
  369. select {
  370. case stage <- c:
  371. case <-server.quit:
  372. return ErrServerStopped
  373. }
  374. return <-c.cont
  375. }