server.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447
  1. package p2p
  2. import (
  3. "blockchain-go/common/gopool"
  4. "blockchain-go/p2p/discover"
  5. "blockchain-go/p2p/dnsdisc"
  6. "blockchain-go/p2p/enode"
  7. "blockchain-go/p2p/enr"
  8. "blockchain-go/p2p/nat"
  9. "blockchain-go/p2p/rlpx"
  10. "blockchain-go/params"
  11. "crypto/ecdsa"
  12. "errors"
  13. "fmt"
  14. "github.com/ethereum/go-ethereum/crypto"
  15. "github.com/ethereum/go-ethereum/p2p/netutil"
  16. "net"
  17. "os"
  18. "os/signal"
  19. "sort"
  20. "sync"
  21. "syscall"
  22. "time"
  23. )
  24. var (
  25. ErrServerStopped = errors.New("server stopped")
  26. )
  27. // Server manages all peer connections.
  28. type Server struct {
  29. Config
  30. //newTransport func(net.Conn, *ecdsa.PublicKey) transport
  31. //newPeerHook func(*Peer)
  32. //listenFunc func(network, addr string) (net.Listener, error)
  33. lock sync.Mutex // protects running
  34. running bool
  35. listener net.Listener
  36. ourHandshake *protoHandshake
  37. loopWG sync.WaitGroup // loop, listenLoop
  38. //peerFeed event.Feed
  39. //log log.Logger
  40. //nodedb *enode.DB
  41. localnode *enode.LocalNode
  42. ntab *discover.UDPv4
  43. //DiscV5 *discover.UDPv5
  44. discmix *enode.FairMix
  45. dialsched *dialScheduler
  46. quit chan struct{}
  47. //addtrusted chan *enode.Node
  48. //removetrusted chan *enode.Node
  49. //peerOp chan peerOpFunc
  50. //peerOpDone chan struct{}
  51. //delpeer chan peerDrop
  52. checkpointPostHandshake chan *conn
  53. checkpointAddPeer chan *conn
  54. sigs chan os.Signal
  55. //inboundHistory expHeap
  56. }
  57. func (server *Server) Start() (err error) {
  58. server.sigs = make(chan os.Signal, 1)
  59. signal.Notify(server.sigs, syscall.SIGINT, syscall.SIGTERM)
  60. server.quit = make(chan struct{})
  61. server.checkpointPostHandshake = make(chan *conn)
  62. server.checkpointAddPeer = make(chan *conn)
  63. // 配置远端引导节点
  64. if err := server.setupBootstrapNodes(); err != nil {
  65. return err
  66. }
  67. // 配置本地协议
  68. if err := server.setupCaps(); err != nil {
  69. return nil
  70. }
  71. // 配置本地节点
  72. if err := server.setupLocalNode(); err != nil {
  73. return err
  74. }
  75. // 配置节点连接监听器
  76. if err := server.setupListening(); err != nil {
  77. return err
  78. }
  79. // 配置节点发现器
  80. if err := server.setupDiscovery(); err != nil {
  81. return err
  82. }
  83. // 配置拨号调度
  84. server.setupDialScheduler()
  85. // server核心运行
  86. go server.StopListener()
  87. server.run()
  88. return nil
  89. }
  90. func (server *Server) run() {
  91. server.loopWG.Add(1)
  92. fmt.Printf("Started P2P networking, self: %v.\n", server.localnode.Node().URLv4())
  93. //var (
  94. // peers = make(map[enode.ID]*Peer)
  95. // inboundCount = 0
  96. //)
  97. running:
  98. for {
  99. select {
  100. case <-server.quit:
  101. fmt.Printf("exit signal by user.\n")
  102. break running
  103. case c := <-server.checkpointPostHandshake:
  104. fmt.Printf("checkpointPostHandshake: %v\n", c.name)
  105. case c := <-server.checkpointAddPeer:
  106. fmt.Printf("checkpointAddPeer: %v.\n", c.name)
  107. }
  108. }
  109. }
  110. func (server *Server) StopListener() {
  111. <-server.sigs
  112. close(server.quit)
  113. server.loopWG.Done()
  114. server.discmix.Close()
  115. server.dialsched.stop()
  116. }
  117. // 本地协议
  118. func (server *Server) setupCaps() (err error) {
  119. // 创建握手器
  120. publicKey := crypto.FromECDSAPub(&server.PrivateKey.PublicKey)
  121. server.ourHandshake = &protoHandshake{
  122. Version: baseProtocolVersion,
  123. Name: server.Name,
  124. ID: publicKey[1:],
  125. }
  126. // 配置握手器
  127. for _, capability := range OurCaps {
  128. server.ourHandshake.Caps = append(server.ourHandshake.Caps, capability)
  129. }
  130. sort.Sort(capsByNameAndVersion(server.ourHandshake.Caps))
  131. return nil
  132. }
  133. // 配置节点发现逻辑
  134. func (server *Server) setupDiscovery() (err error) {
  135. server.discmix = enode.NewFairMix(discmixTimeout)
  136. // 添加特定协议的发现源。
  137. dnsclient := dnsdisc.NewClient(dnsdisc.Config{})
  138. dialCandidates, err := dnsclient.NewIterator()
  139. server.discmix.AddSource(dialCandidates)
  140. addr, err := net.ResolveUDPAddr("udp", server.ListenAddr)
  141. if err != nil {
  142. return err
  143. }
  144. conn, err := net.ListenUDP("udp", addr)
  145. if err != nil {
  146. return err
  147. }
  148. realAddr := conn.LocalAddr().(*net.UDPAddr)
  149. fmt.Printf("UDP listener up, addr: %v.\n", realAddr)
  150. if server.NAT != nil {
  151. if !realAddr.IP.IsLoopback() {
  152. server.loopWG.Add(1)
  153. gopool.Submit(func() {
  154. nat.Map(server.NAT, server.quit, "udp", realAddr.Port, realAddr.Port, "blockchain-p2p discovery")
  155. server.loopWG.Done()
  156. })
  157. }
  158. }
  159. server.localnode.SetFallbackUDP(realAddr.Port)
  160. // 设置V4的发现协议
  161. var unhandled chan discover.ReadPacket
  162. //var sconn *sharedUDPConn
  163. //if server.DiscoveryV5 {
  164. // unhandled = make(chan discover.ReadPacket, 100)
  165. // sconn = &sharedUDPConn{conn, unhandled}
  166. //}
  167. cfg := discover.Config{
  168. PrivateKey: server.PrivateKey,
  169. NetRestrict: server.NetRestrict,
  170. Bootnodes: server.BootstrapNodes,
  171. Unhandled: unhandled,
  172. //Log: server.log,
  173. }
  174. ntab, err := discover.ListenV4(conn, server.localnode, cfg)
  175. if err != nil {
  176. return err
  177. }
  178. server.ntab = ntab
  179. server.discmix.AddSource(ntab.RandomNodes())
  180. return nil
  181. }
  182. // 设置引导节点,更快发现指定网络
  183. func (server *Server) setupBootstrapNodes() (err error) {
  184. urls := params.MainnetBootNodes
  185. server.BootstrapNodes = make([]*enode.Node, 0, len(urls))
  186. for _, url := range urls {
  187. if url != "" {
  188. node, err := enode.Parse(enode.ValidSchemes, url)
  189. if err != nil {
  190. return err
  191. }
  192. server.BootstrapNodes = append(server.BootstrapNodes, node)
  193. }
  194. }
  195. return nil
  196. }
  197. // 设置拨号调度器
  198. func (server *Server) setupDialScheduler() {
  199. server.MaxPeers = params.MaxPeers
  200. server.MaxPendingPeers = params.MaxPendingPeers
  201. config := dialConfig{
  202. self: server.localnode.ID(),
  203. maxDialPeers: server.maxDialedConns(),
  204. maxActiveDials: server.MaxPendingPeers,
  205. log: server.Logger,
  206. netRestrict: server.NetRestrict,
  207. dialer: server.Dialer,
  208. clock: server.clock,
  209. }
  210. if server.ntab != nil {
  211. config.resolver = server.ntab
  212. }
  213. if config.dialer == nil {
  214. config.dialer = tcpDialer{&net.Dialer{Timeout: defaultDialTimeout}}
  215. }
  216. server.dialsched = newDialScheduler(config, server.discmix, server.SetupConn)
  217. }
  218. func (server *Server) maxDialedConns() (limit int) {
  219. if server.NoDial || server.MaxPeers == 0 {
  220. return 0
  221. }
  222. if server.DialRatio == 0 {
  223. limit = server.MaxPeers / defaultDialRatio
  224. } else {
  225. limit = server.MaxPeers / server.DialRatio
  226. }
  227. if limit == 0 {
  228. limit = 1
  229. }
  230. return limit
  231. }
  232. // 配置本地节点
  233. func (server *Server) setupLocalNode() (err error) {
  234. // 创建本地节点
  235. server.localnode = enode.NewLocalNode(server.PrivateKey)
  236. server.localnode.SetFallbackIP(net.IP{127, 0, 0, 1})
  237. // 配置本地静态IP
  238. ip, _ := server.NAT.ExternalIP()
  239. server.localnode.SetStaticIP(ip)
  240. return nil
  241. }
  242. // 设置新节点连接监听器
  243. func (server *Server) setupListening() (err error) {
  244. listener, err := net.Listen("tcp", params.ListenerPort)
  245. if err != nil {
  246. return err
  247. }
  248. server.listener = listener
  249. server.ListenAddr = listener.Addr().String()
  250. if tcp, ok := listener.Addr().(*net.TCPAddr); ok {
  251. server.localnode.Set(enr.TCP(tcp.Port))
  252. if !tcp.IP.IsLoopback() && server.NAT != nil {
  253. server.loopWG.Add(1)
  254. gopool.Submit(func() {
  255. nat.Map(server.NAT, server.quit, "tcp", tcp.Port, tcp.Port, "blockchain-p2p")
  256. server.loopWG.Done()
  257. })
  258. }
  259. }
  260. server.loopWG.Add(1)
  261. go server.listenLoop()
  262. return nil
  263. }
  264. func (server *Server) listenLoop() {
  265. fmt.Printf("TCP Listener up, addr: %v.\n", server.listener.Addr())
  266. tokens := defaultMaxPendingPeers
  267. slots := make(chan struct{}, tokens)
  268. for i := 0; i < tokens; i++ {
  269. slots <- struct{}{}
  270. }
  271. defer server.loopWG.Done()
  272. defer func() {
  273. for i := 0; i < cap(slots); i++ {
  274. <-slots
  275. }
  276. }()
  277. for {
  278. <-slots
  279. var (
  280. fd net.Conn
  281. err error
  282. lastLogTime time.Time
  283. )
  284. // accept处理
  285. for {
  286. fd, err = server.listener.Accept()
  287. if netutil.IsTemporaryError(err) {
  288. if time.Since(lastLogTime) > 1*time.Second {
  289. fmt.Errorf("temporary read error, err: %v", err)
  290. lastLogTime = time.Now()
  291. }
  292. time.Sleep(time.Millisecond * 200)
  293. continue
  294. } else if err != nil {
  295. fmt.Errorf("read error, err: %v", err)
  296. slots <- struct{}{}
  297. return
  298. }
  299. break
  300. }
  301. // accept成功的处理
  302. remoteIP := netutil.AddrIP(fd.RemoteAddr())
  303. // TODO 检查此IP是是否能加入本地节点的链接
  304. //if err := server.checkInboundConn(remoteIP); err != nil {
  305. // srv.log.Debug("Rejected inbound connection", "addr", fd.RemoteAddr(), "err", err)
  306. // fd.Close()
  307. // slots <- struct{}{}
  308. // continue
  309. //}
  310. if remoteIP != nil {
  311. var addr *net.TCPAddr
  312. if tcp, ok := fd.RemoteAddr().(*net.TCPAddr); ok {
  313. addr = tcp
  314. }
  315. fd = newMeteredConn(fd, true, addr)
  316. fmt.Printf("Accepted connection, addr: %v.", fd.RemoteAddr())
  317. }
  318. gopool.Submit(func() {
  319. server.SetupConn(fd, inboundConn, nil)
  320. slots <- struct{}{}
  321. })
  322. }
  323. }
  324. func (server *Server) newRLPX(conn net.Conn, dialDest *ecdsa.PublicKey) transport {
  325. return &rlpxTransport{conn: rlpx.NewConn(conn, dialDest)}
  326. }
  327. func (server *Server) SetupConn(fd net.Conn, flags connFlag, dialDest *enode.Node) error {
  328. fmt.Printf("setup conn %v.", fd.RemoteAddr())
  329. c := &conn{fd: fd, flags: flags, cont: make(chan error)}
  330. if dialDest == nil {
  331. c.transport = server.newRLPX(fd, nil)
  332. } else {
  333. c.transport = server.newRLPX(fd, dialDest.Pubkey())
  334. }
  335. err := server.setupConn(c, dialDest)
  336. if err != nil {
  337. c.close(err)
  338. }
  339. return err
  340. }
  341. func (server *Server) setupConn(c *conn, dialDest *enode.Node) error {
  342. remotePubkey, err := c.doEncHandshake(server.PrivateKey)
  343. if err != nil {
  344. return err
  345. }
  346. // 将connection转换成node
  347. c.node = enode.NodeFromConn(remotePubkey, c.fd)
  348. fmt.Printf("id: %v, addr: %v, conn: %v", c.node.ID(), c.fd.RemoteAddr(), c.flags)
  349. // 检查是否需要握手
  350. err = server.checkpoint(c, server.checkpointPostHandshake)
  351. if err != nil {
  352. return err
  353. }
  354. // 进行握手
  355. phs, err := c.doProtoHandshake(server.ourHandshake)
  356. if err != nil {
  357. return err
  358. }
  359. c.caps, c.name = phs.Caps, phs.Name
  360. // 握手成功后将此链接放入addPeer的检查点
  361. err = server.checkpoint(c, server.checkpointAddPeer)
  362. if err != nil {
  363. return err
  364. }
  365. return nil
  366. }
  367. func (server *Server) checkpoint(c *conn, stage chan<- *conn) error {
  368. select {
  369. case stage <- c:
  370. case <-server.quit:
  371. return ErrServerStopped
  372. }
  373. return <-c.cont
  374. }