server.go 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425
  1. package p2p
  2. import (
  3. "blockchain-go/common/gopool"
  4. "blockchain-go/p2p/discover"
  5. "blockchain-go/p2p/enode"
  6. "blockchain-go/p2p/enr"
  7. "blockchain-go/p2p/nat"
  8. "blockchain-go/p2p/rlpx"
  9. "blockchain-go/params"
  10. "crypto/ecdsa"
  11. "errors"
  12. "fmt"
  13. "github.com/ethereum/go-ethereum/crypto"
  14. "github.com/ethereum/go-ethereum/p2p/netutil"
  15. "net"
  16. "sort"
  17. "sync"
  18. "time"
  19. )
  20. var (
  21. ErrServerStopped = errors.New("server stopped")
  22. )
  23. // Server manages all peer connections.
  24. type Server struct {
  25. Config
  26. //newTransport func(net.Conn, *ecdsa.PublicKey) transport
  27. //newPeerHook func(*Peer)
  28. //listenFunc func(network, addr string) (net.Listener, error)
  29. lock sync.Mutex // protects running
  30. running bool
  31. listener net.Listener
  32. ourHandshake *protoHandshake
  33. loopWG sync.WaitGroup // loop, listenLoop
  34. //peerFeed event.Feed
  35. //log log.Logger
  36. //nodedb *enode.DB
  37. localnode *enode.LocalNode
  38. ntab *discover.UDPv4
  39. //DiscV5 *discover.UDPv5
  40. discmix *enode.FairMix
  41. dialsched *dialScheduler
  42. quit chan struct{}
  43. //addtrusted chan *enode.Node
  44. //removetrusted chan *enode.Node
  45. //peerOp chan peerOpFunc
  46. //peerOpDone chan struct{}
  47. //delpeer chan peerDrop
  48. checkpointPostHandshake chan *conn
  49. checkpointAddPeer chan *conn
  50. //inboundHistory expHeap
  51. }
  52. func (server *Server) Start() (err error) {
  53. server.quit = make(chan struct{})
  54. server.checkpointPostHandshake = make(chan *conn)
  55. server.checkpointAddPeer = make(chan *conn)
  56. // 配置远端引导节点
  57. if err := server.setupBootstrapNodes(); err != nil {
  58. return err
  59. }
  60. // 配置本地协议
  61. if err := server.setupCaps(); err != nil {
  62. return nil
  63. }
  64. // 配置本地节点
  65. if err := server.setupLocalNode(); err != nil {
  66. return err
  67. }
  68. // 配置节点连接监听器
  69. if err := server.setupListening(); err != nil {
  70. return err
  71. }
  72. // 配置节点发现器
  73. if err := server.setupDiscovery(); err != nil {
  74. return err
  75. }
  76. // 配置拨号调度
  77. server.setupDialScheduler()
  78. // server核心允许
  79. server.run()
  80. return nil
  81. }
  82. func (server *Server) run() {
  83. server.loopWG.Add(1)
  84. fmt.Printf("Started P2P networking, self: %v.\n", server.localnode.Node().URLv4())
  85. defer server.loopWG.Done()
  86. defer server.discmix.Close()
  87. defer server.dialsched.stop()
  88. //var (
  89. // peers = make(map[enode.ID]*Peer)
  90. // inboundCount = 0
  91. //)
  92. running:
  93. for {
  94. select {
  95. case <-server.quit:
  96. break running
  97. }
  98. }
  99. }
  100. // 本地协议
  101. func (server *Server) setupCaps() (err error) {
  102. // 创建握手器
  103. publicKey := crypto.FromECDSAPub(&server.PrivateKey.PublicKey)
  104. server.ourHandshake = &protoHandshake{
  105. Version: baseProtocolVersion,
  106. Name: server.Name,
  107. ID: publicKey[1:],
  108. }
  109. // 配置握手器
  110. for _, capability := range OurCaps {
  111. server.ourHandshake.Caps = append(server.ourHandshake.Caps, capability)
  112. }
  113. sort.Sort(capsByNameAndVersion(server.ourHandshake.Caps))
  114. return nil
  115. }
  116. // 配置节点发现逻辑
  117. func (server *Server) setupDiscovery() (err error) {
  118. server.discmix = enode.NewFairMix(discmixTimeout)
  119. // 添加特定协议的发现源。
  120. added := make(map[string]bool)
  121. for _, proto := range server.Protocols {
  122. if proto.DialCandidates != nil && !added[proto.Name] {
  123. server.discmix.AddSource(proto.DialCandidates)
  124. added[proto.Name] = true
  125. }
  126. }
  127. addr, err := net.ResolveUDPAddr("udp", server.ListenAddr)
  128. if err != nil {
  129. return err
  130. }
  131. conn, err := net.ListenUDP("udp", addr)
  132. if err != nil {
  133. return err
  134. }
  135. realAddr := conn.LocalAddr().(*net.UDPAddr)
  136. fmt.Printf("UDP listener up, addr: %v.\n", realAddr)
  137. if server.NAT != nil {
  138. if !realAddr.IP.IsLoopback() {
  139. server.loopWG.Add(1)
  140. gopool.Submit(func() {
  141. nat.Map(server.NAT, server.quit, "udp", realAddr.Port, realAddr.Port, "blockchain-p2p discovery")
  142. server.loopWG.Done()
  143. })
  144. }
  145. }
  146. server.localnode.SetFallbackUDP(realAddr.Port)
  147. // 设置V4的发现协议
  148. var unhandled chan discover.ReadPacket
  149. //var sconn *sharedUDPConn
  150. //if server.DiscoveryV5 {
  151. // unhandled = make(chan discover.ReadPacket, 100)
  152. // sconn = &sharedUDPConn{conn, unhandled}
  153. //}
  154. cfg := discover.Config{
  155. PrivateKey: server.PrivateKey,
  156. NetRestrict: server.NetRestrict,
  157. Bootnodes: server.BootstrapNodes,
  158. Unhandled: unhandled,
  159. //Log: server.log,
  160. }
  161. ntab, err := discover.ListenV4(conn, server.localnode, cfg)
  162. if err != nil {
  163. return err
  164. }
  165. server.ntab = ntab
  166. server.discmix.AddSource(ntab.RandomNodes())
  167. return nil
  168. }
  169. // 设置引导节点,更快发现指定网络
  170. func (server *Server) setupBootstrapNodes() (err error) {
  171. urls := params.MainnetBootNodes
  172. server.BootstrapNodes = make([]*enode.Node, 0, len(urls))
  173. for _, url := range urls {
  174. if url != "" {
  175. node, err := enode.Parse(enode.ValidSchemes, url)
  176. if err != nil {
  177. return err
  178. }
  179. server.BootstrapNodes = append(server.BootstrapNodes, node)
  180. }
  181. }
  182. return nil
  183. }
  184. // 设置拨号调度器
  185. func (server *Server) setupDialScheduler() {
  186. config := dialConfig{
  187. self: server.localnode.ID(),
  188. maxDialPeers: server.maxDialedConns(),
  189. maxActiveDials: server.MaxPendingPeers,
  190. log: server.Logger,
  191. netRestrict: server.NetRestrict,
  192. dialer: server.Dialer,
  193. clock: server.clock,
  194. }
  195. if server.ntab != nil {
  196. config.resolver = server.ntab
  197. }
  198. if config.dialer == nil {
  199. config.dialer = tcpDialer{&net.Dialer{Timeout: defaultDialTimeout}}
  200. }
  201. server.dialsched = newDialScheduler(config, server.discmix, server.SetupConn)
  202. }
  203. func (server *Server) maxDialedConns() (limit int) {
  204. if server.NoDial || server.MaxPeers == 0 {
  205. return 0
  206. }
  207. if server.DialRatio == 0 {
  208. limit = server.MaxPeers / defaultDialRatio
  209. } else {
  210. limit = server.MaxPeers / server.DialRatio
  211. }
  212. if limit == 0 {
  213. limit = 1
  214. }
  215. return limit
  216. }
  217. // 配置本地节点
  218. func (server *Server) setupLocalNode() (err error) {
  219. // 创建本地节点
  220. server.localnode = enode.NewLocalNode(server.PrivateKey)
  221. server.localnode.SetFallbackIP(net.IP{127, 0, 0, 1})
  222. // 配置本地静态IP
  223. ip, _ := server.NAT.ExternalIP()
  224. server.localnode.SetStaticIP(ip)
  225. return nil
  226. }
  227. // 设置新节点连接监听器
  228. func (server *Server) setupListening() (err error) {
  229. listener, err := net.Listen("tcp", server.ListenAddr)
  230. if err != nil {
  231. return err
  232. }
  233. server.listener = listener
  234. server.ListenAddr = listener.Addr().String()
  235. if tcp, ok := listener.Addr().(*net.TCPAddr); ok {
  236. server.localnode.Set(enr.TCP(tcp.Port))
  237. if !tcp.IP.IsLoopback() && server.NAT != nil {
  238. server.loopWG.Add(1)
  239. gopool.Submit(func() {
  240. nat.Map(server.NAT, server.quit, "tcp", tcp.Port, tcp.Port, "ethereum p2p")
  241. server.loopWG.Done()
  242. })
  243. }
  244. }
  245. server.loopWG.Add(1)
  246. go server.listenLoop()
  247. return nil
  248. }
  249. func (server *Server) listenLoop() {
  250. fmt.Printf("TCP Listener up, addr: %v.\n", server.listener.Addr())
  251. tokens := defaultMaxPendingPeers
  252. slots := make(chan struct{}, tokens)
  253. for i := 0; i < tokens; i++ {
  254. slots <- struct{}{}
  255. }
  256. defer server.loopWG.Done()
  257. defer func() {
  258. for i := 0; i < cap(slots); i++ {
  259. <-slots
  260. }
  261. }()
  262. for {
  263. <-slots
  264. var (
  265. fd net.Conn
  266. err error
  267. lastLogTime time.Time
  268. )
  269. // accept处理
  270. for {
  271. fd, err = server.listener.Accept()
  272. if netutil.IsTemporaryError(err) {
  273. if time.Since(lastLogTime) > 1*time.Second {
  274. fmt.Errorf("temporary read error, err: %v", err)
  275. lastLogTime = time.Now()
  276. }
  277. time.Sleep(time.Millisecond * 200)
  278. continue
  279. } else if err != nil {
  280. fmt.Errorf("read error, err: %v", err)
  281. slots <- struct{}{}
  282. return
  283. }
  284. break
  285. }
  286. // accept成功的处理
  287. remoteIP := netutil.AddrIP(fd.RemoteAddr())
  288. // TODO 检查此IP是是否能加入本地节点的链接
  289. //if err := server.checkInboundConn(remoteIP); err != nil {
  290. // srv.log.Debug("Rejected inbound connection", "addr", fd.RemoteAddr(), "err", err)
  291. // fd.Close()
  292. // slots <- struct{}{}
  293. // continue
  294. //}
  295. if remoteIP != nil {
  296. var addr *net.TCPAddr
  297. if tcp, ok := fd.RemoteAddr().(*net.TCPAddr); ok {
  298. addr = tcp
  299. }
  300. fd = newMeteredConn(fd, true, addr)
  301. fmt.Printf("Accepted connection, addr: %v.", fd.RemoteAddr())
  302. }
  303. gopool.Submit(func() {
  304. server.SetupConn(fd, inboundConn, nil)
  305. slots <- struct{}{}
  306. })
  307. }
  308. }
  309. func (server *Server) newRLPX(conn net.Conn, dialDest *ecdsa.PublicKey) transport {
  310. return &rlpxTransport{conn: rlpx.NewConn(conn, dialDest)}
  311. }
  312. func (server *Server) SetupConn(fd net.Conn, flags connFlag, dialDest *enode.Node) error {
  313. c := &conn{fd: fd, flags: flags, cont: make(chan error)}
  314. if dialDest == nil {
  315. c.transport = server.newRLPX(fd, nil)
  316. } else {
  317. c.transport = server.newRLPX(fd, dialDest.Pubkey())
  318. }
  319. err := server.setupConn(c, dialDest)
  320. if err != nil {
  321. c.close(err)
  322. }
  323. return err
  324. }
  325. func (server *Server) setupConn(c *conn, dialDest *enode.Node) error {
  326. remotePubkey, err := c.doEncHandshake(server.PrivateKey)
  327. if err != nil {
  328. return err
  329. }
  330. // 将connection转换成node
  331. c.node = enode.NodeFromConn(remotePubkey, c.fd)
  332. fmt.Printf("id: %v, addr: %v, conn: %v", c.node.ID(), c.fd.RemoteAddr(), c.flags)
  333. // 检查是否需要握手
  334. err = server.checkpoint(c, server.checkpointPostHandshake)
  335. if err != nil {
  336. return err
  337. }
  338. // 进行握手
  339. phs, err := c.doProtoHandshake(server.ourHandshake)
  340. if err != nil {
  341. return err
  342. }
  343. c.caps, c.name = phs.Caps, phs.Name
  344. // 握手成功后将此链接放入addPeer的检查点
  345. err = server.checkpoint(c, server.checkpointAddPeer)
  346. if err != nil {
  347. return err
  348. }
  349. return nil
  350. }
  351. func (server *Server) checkpoint(c *conn, stage chan<- *conn) error {
  352. select {
  353. case stage <- c:
  354. case <-server.quit:
  355. return ErrServerStopped
  356. }
  357. return <-c.cont
  358. }