||
- package p2p
- import (
- "blockchain-go/common/gopool"
- "blockchain-go/p2p/discover"
- "blockchain-go/p2p/enode"
- "blockchain-go/p2p/enr"
- "blockchain-go/p2p/nat"
- "blockchain-go/p2p/rlpx"
- "blockchain-go/params"
- "crypto/ecdsa"
- "errors"
- "fmt"
- "github.com/ethereum/go-ethereum/crypto"
- "github.com/ethereum/go-ethereum/p2p/netutil"
- "net"
- "sync"
- "time"
- )
- var (
- ErrServerStopped = errors.New("server stopped")
- )
- // Server manages all peer connections.
- type Server struct {
- // Config fields may not be modified while the server is running.
- Config
- // Hooks for testing. These are useful because we can inhibit
- // the whole protocol stack.
- //newTransport func(net.Conn, *ecdsa.PublicKey) transport
- //newPeerHook func(*Peer)
- //listenFunc func(network, addr string) (net.Listener, error)
- lock sync.Mutex // protects running
- running bool
- listener net.Listener
- ourHandshake *protoHandshake
- loopWG sync.WaitGroup // loop, listenLoop
- //peerFeed event.Feed
- //log log.Logger
- //nodedb *enode.DB
- localnode *enode.LocalNode
- ntab *discover.UDPv4
- //DiscV5 *discover.UDPv5
- discmix *enode.FairMix
- dialsched *dialScheduler
- // Channels into the run loop.
- quit chan struct{}
- //addtrusted chan *enode.Node
- //removetrusted chan *enode.Node
- //peerOp chan peerOpFunc
- //peerOpDone chan struct{}
- //delpeer chan peerDrop
- checkpointPostHandshake chan *conn
- checkpointAddPeer chan *conn
- // State of run loop and listenLoop.
- //inboundHistory expHeap
- }
- func (server *Server) Start() (err error) {
- server.quit = make(chan struct{})
- server.checkpointPostHandshake = make(chan *conn)
- server.checkpointAddPeer = make(chan *conn)
- if err := server.setupBootstrapNodes(); err != nil {
- return err
- }
- if err := server.setupLocalNode(); err != nil {
- return err
- }
- if err := server.setupListening(); err != nil {
- return err
- }
- if err := server.setupDiscovery(); err != nil {
- return err
- }
- server.setupDialScheduler()
- return nil
- }
- // 配置节点发现逻辑
- func (server *Server) setupDiscovery() (err error) {
- server.discmix = enode.NewFairMix(discmixTimeout)
- // 添加特定协议的发现源。
- added := make(map[string]bool)
- for _, proto := range server.Protocols {
- if proto.DialCandidates != nil && !added[proto.Name] {
- server.discmix.AddSource(proto.DialCandidates)
- added[proto.Name] = true
- }
- }
- addr, err := net.ResolveUDPAddr("udp", server.ListenAddr)
- if err != nil {
- return err
- }
- conn, err := net.ListenUDP("udp", addr)
- if err != nil {
- return err
- }
- realAddr := conn.LocalAddr().(*net.UDPAddr)
- fmt.Printf("UDP listener up, addr: %v.", realAddr)
- if server.NAT != nil {
- if !realAddr.IP.IsLoopback() {
- server.loopWG.Add(1)
- gopool.Submit(func() {
- nat.Map(server.NAT, server.quit, "udp", realAddr.Port, realAddr.Port, "ethereum discovery")
- server.loopWG.Done()
- })
- }
- }
- server.localnode.SetFallbackUDP(realAddr.Port)
- // 设置V4的发现协议
- var unhandled chan discover.ReadPacket
- //var sconn *sharedUDPConn
- //if server.DiscoveryV5 {
- // unhandled = make(chan discover.ReadPacket, 100)
- // sconn = &sharedUDPConn{conn, unhandled}
- //}
- cfg := discover.Config{
- PrivateKey: server.PrivateKey,
- NetRestrict: server.NetRestrict,
- Bootnodes: server.BootstrapNodes,
- Unhandled: unhandled,
- //Log: server.log,
- }
- ntab, err := discover.ListenV4(conn, server.localnode, cfg)
- if err != nil {
- return err
- }
- server.ntab = ntab
- server.discmix.AddSource(ntab.RandomNodes())
- return nil
- }
- func (server *Server) setupBootstrapNodes() (err error) {
- urls := params.MainnetBootNodes
- server.BootstrapNodes = make([]*enode.Node, 0, len(urls))
- for _, url := range urls {
- if url != "" {
- node, err := enode.Parse(enode.ValidSchemes, url)
- if err != nil {
- return err
- }
- server.BootstrapNodes = append(server.BootstrapNodes, node)
- }
- }
- return nil
- }
- // 设置拨号调度器
- func (server *Server) setupDialScheduler() {
- config := dialConfig{
- self: server.localnode.ID(),
- maxDialPeers: server.maxDialedConns(),
- maxActiveDials: server.MaxPendingPeers,
- log: server.Logger,
- netRestrict: server.NetRestrict,
- dialer: server.Dialer,
- clock: server.clock,
- }
- if server.ntab != nil {
- config.resolver = server.ntab
- }
- if config.dialer == nil {
- config.dialer = tcpDialer{&net.Dialer{Timeout: defaultDialTimeout}}
- }
- server.dialsched = newDialScheduler(config, server.discmix, server.SetupConn)
- //for _, n := range srv.StaticNodes {
- // srv.dialsched.addStatic(n)
- //}
- }
- func (server *Server) maxDialedConns() (limit int) {
- if server.NoDial || server.MaxPeers == 0 {
- return 0
- }
- if server.DialRatio == 0 {
- limit = server.MaxPeers / defaultDialRatio
- } else {
- limit = server.MaxPeers / server.DialRatio
- }
- if limit == 0 {
- limit = 1
- }
- return limit
- }
- // 配置本地节点
- func (server *Server) setupLocalNode() (err error) {
- // 创建握手所需对象
- publicKey := crypto.FromECDSAPub(&server.PrivateKey.PublicKey)
- server.ourHandshake = &protoHandshake{
- Version: baseProtocolVersion,
- Name: server.Name,
- ID: publicKey[1:],
- }
- // 创建本地节点
- server.localnode = enode.NewLocalNode(server.PrivateKey)
- server.localnode.SetFallbackIP(net.IP{127, 0, 0, 1})
- // 配置本地静态IP
- ip, _ := server.NAT.ExternalIP()
- server.localnode.SetStaticIP(ip)
- return nil
- }
- // 监听器
- func (server *Server) setupListening() (err error) {
- listener, err := net.Listen("tcp", server.ListenAddr)
- if err != nil {
- return err
- }
- server.listener = listener
- server.ListenAddr = listener.Addr().String()
- if tcp, ok := listener.Addr().(*net.TCPAddr); ok {
- server.localnode.Set(enr.TCP(tcp.Port))
- if !tcp.IP.IsLoopback() && server.NAT != nil {
- server.loopWG.Add(1)
- gopool.Submit(func() {
- nat.Map(server.NAT, server.quit, "tcp", tcp.Port, tcp.Port, "ethereum p2p")
- server.loopWG.Done()
- })
- }
- }
- server.loopWG.Add(1)
- go server.listenLoop()
- return nil
- }
- func (server *Server) listenLoop() {
- fmt.Printf("TCP Listener up, addr: %v.", server.listener.Addr())
- tokens := defaultMaxPendingPeers
- slots := make(chan struct{}, tokens)
- for i := 0; i < tokens; i++ {
- slots <- struct{}{}
- }
- defer server.loopWG.Done()
- defer func() {
- for i := 0; i < cap(slots); i++ {
- <-slots
- }
- }()
- for {
- <-slots
- var (
- fd net.Conn
- err error
- lastLogTime time.Time
- )
- // accept处理
- for {
- fd, err = server.listener.Accept()
- if netutil.IsTemporaryError(err) {
- if time.Since(lastLogTime) > 1*time.Second {
- fmt.Errorf("temporary read error, err: %v", err)
- lastLogTime = time.Now()
- }
- time.Sleep(time.Millisecond * 200)
- continue
- } else if err != nil {
- fmt.Errorf("read error, err: %v", err)
- slots <- struct{}{}
- return
- }
- break
- }
- // accept成功的处理
- remoteIP := netutil.AddrIP(fd.RemoteAddr())
- // TODO 检查此IP是是否能加入本地节点的链接
- //if err := server.checkInboundConn(remoteIP); err != nil {
- // srv.log.Debug("Rejected inbound connection", "addr", fd.RemoteAddr(), "err", err)
- // fd.Close()
- // slots <- struct{}{}
- // continue
- //}
- if remoteIP != nil {
- var addr *net.TCPAddr
- if tcp, ok := fd.RemoteAddr().(*net.TCPAddr); ok {
- addr = tcp
- }
- fd = newMeteredConn(fd, true, addr)
- fmt.Printf("Accepted connection, addr: %v.", fd.RemoteAddr())
- }
- gopool.Submit(func() {
- server.SetupConn(fd, inboundConn, nil)
- slots <- struct{}{}
- })
- }
- }
- func (server *Server) newRLPX(conn net.Conn, dialDest *ecdsa.PublicKey) transport {
- return &rlpxTransport{conn: rlpx.NewConn(conn, dialDest)}
- }
- func (server *Server) SetupConn(fd net.Conn, flags connFlag, dialDest *enode.Node) error {
- c := &conn{fd: fd, flags: flags, cont: make(chan error)}
- if dialDest == nil {
- c.transport = server.newRLPX(fd, nil)
- } else {
- c.transport = server.newRLPX(fd, dialDest.Pubkey())
- }
- err := server.setupConn(c, dialDest)
- if err != nil {
- c.close(err)
- }
- return err
- }
- func (server *Server) setupConn(c *conn, dialDest *enode.Node) error {
- remotePubkey, err := c.doEncHandshake(server.PrivateKey)
- if err != nil {
- return err
- }
- // 将connection转换成node
- c.node = enode.NodeFromConn(remotePubkey, c.fd)
- fmt.Printf("id: %v, addr: %v, conn: %v", c.node.ID(), c.fd.RemoteAddr(), c.flags)
- // 检查是否需要握手
- err = server.checkpoint(c, server.checkpointPostHandshake)
- if err != nil {
- return err
- }
- // 进行握手
- phs, err := c.doProtoHandshake(server.ourHandshake)
- if err != nil {
- return err
- }
- c.caps, c.name = phs.Caps, phs.Name
- // 将此链接放入addPeer的检查点
- err = server.checkpoint(c, server.checkpointAddPeer)
- if err != nil {
- return err
- }
- return nil
- }
- func (server *Server) checkpoint(c *conn, stage chan<- *conn) error {
- select {
- case stage <- c:
- case <-server.quit:
- return ErrServerStopped
- }
- return <-c.cont
- }
- // sharedUDPConn implements a shared connection. Write sends messages to the underlying connection while read returns
- // messages that were found unprocessable and sent to the unhandled channel by the primary listener.
- type sharedUDPConn struct {
- *net.UDPConn
- unhandled chan discover.ReadPacket
- }
- // ReadFromUDP implements discover.UDPConn
- func (s *sharedUDPConn) ReadFromUDP(b []byte) (n int, addr *net.UDPAddr, err error) {
- packet, ok := <-s.unhandled
- if !ok {
- return 0, nil, errors.New("connection was closed")
- }
- l := len(packet.Data)
- if l > len(b) {
- l = len(b)
- }
- copy(b[:l], packet.Data[:l])
- return l, packet.Addr, nil
- }
- // Close implements discover.UDPConn
- func (s *sharedUDPConn) Close() error {
- return nil
- }
|