package p2p import ( "blockchain-go/common/gopool" "blockchain-go/p2p/discover" "blockchain-go/p2p/enode" "blockchain-go/p2p/enr" "blockchain-go/p2p/nat" "blockchain-go/p2p/rlpx" "blockchain-go/params" "crypto/ecdsa" "errors" "fmt" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/p2p/netutil" "net" "os" "os/signal" "sort" "sync" "syscall" "time" ) var ( ErrServerStopped = errors.New("server stopped") ) // Server manages all peer connections. type Server struct { Config //newTransport func(net.Conn, *ecdsa.PublicKey) transport //newPeerHook func(*Peer) //listenFunc func(network, addr string) (net.Listener, error) lock sync.Mutex // protects running running bool listener net.Listener ourHandshake *protoHandshake loopWG sync.WaitGroup // loop, listenLoop //peerFeed event.Feed //log log.Logger //nodedb *enode.DB localnode *enode.LocalNode ntab *discover.UDPv4 //DiscV5 *discover.UDPv5 discmix *enode.FairMix dialsched *dialScheduler quit chan struct{} //addtrusted chan *enode.Node //removetrusted chan *enode.Node //peerOp chan peerOpFunc //peerOpDone chan struct{} //delpeer chan peerDrop checkpointPostHandshake chan *conn checkpointAddPeer chan *conn sigs chan os.Signal //inboundHistory expHeap } func (server *Server) Start() (err error) { server.sigs = make(chan os.Signal, 1) signal.Notify(server.sigs, syscall.SIGINT, syscall.SIGTERM) server.quit = make(chan struct{}) server.checkpointPostHandshake = make(chan *conn) server.checkpointAddPeer = make(chan *conn) // 配置远端引导节点 if err := server.setupBootstrapNodes(); err != nil { return err } // 配置本地协议 if err := server.setupCaps(); err != nil { return nil } // 配置本地节点 if err := server.setupLocalNode(); err != nil { return err } // 配置节点连接监听器 if err := server.setupListening(); err != nil { return err } // 配置节点发现器 if err := server.setupDiscovery(); err != nil { return err } // 配置拨号调度 server.setupDialScheduler() // server核心运行 go server.StopListener() server.run() return nil } func (server *Server) run() { server.loopWG.Add(1) fmt.Printf("Started P2P networking, self: %v.\n", server.localnode.Node().URLv4()) //var ( // peers = make(map[enode.ID]*Peer) // inboundCount = 0 //) running: for { select { case <-server.quit: fmt.Printf("exit signal by user.\n") break running case c := <-server.checkpointPostHandshake: fmt.Printf("checkpointPostHandshake: %v\n", c.name) case c := <-server.checkpointAddPeer: fmt.Printf("checkpointAddPeer: %v.\n", c.name) } } } func (server *Server) StopListener() { <-server.sigs close(server.quit) server.loopWG.Done() server.discmix.Close() server.dialsched.stop() } // 本地协议 func (server *Server) setupCaps() (err error) { // 创建握手器 publicKey := crypto.FromECDSAPub(&server.PrivateKey.PublicKey) server.ourHandshake = &protoHandshake{ Version: baseProtocolVersion, Name: server.Name, ID: publicKey[1:], } // 配置握手器 for _, capability := range OurCaps { server.ourHandshake.Caps = append(server.ourHandshake.Caps, capability) } sort.Sort(capsByNameAndVersion(server.ourHandshake.Caps)) return nil } // 配置节点发现逻辑 func (server *Server) setupDiscovery() (err error) { server.discmix = enode.NewFairMix(discmixTimeout) // 添加特定协议的发现源。 added := make(map[string]bool) for _, proto := range server.Protocols { if proto.DialCandidates != nil && !added[proto.Name] { server.discmix.AddSource(proto.DialCandidates) added[proto.Name] = true } } addr, err := net.ResolveUDPAddr("udp", server.ListenAddr) if err != nil { return err } conn, err := net.ListenUDP("udp", addr) if err != nil { return err } realAddr := conn.LocalAddr().(*net.UDPAddr) fmt.Printf("UDP listener up, addr: %v.\n", realAddr) if server.NAT != nil { if !realAddr.IP.IsLoopback() { server.loopWG.Add(1) gopool.Submit(func() { nat.Map(server.NAT, server.quit, "udp", realAddr.Port, realAddr.Port, "blockchain-p2p discovery") server.loopWG.Done() }) } } server.localnode.SetFallbackUDP(realAddr.Port) // 设置V4的发现协议 var unhandled chan discover.ReadPacket //var sconn *sharedUDPConn //if server.DiscoveryV5 { // unhandled = make(chan discover.ReadPacket, 100) // sconn = &sharedUDPConn{conn, unhandled} //} cfg := discover.Config{ PrivateKey: server.PrivateKey, NetRestrict: server.NetRestrict, Bootnodes: server.BootstrapNodes, Unhandled: unhandled, //Log: server.log, } ntab, err := discover.ListenV4(conn, server.localnode, cfg) if err != nil { return err } server.ntab = ntab server.discmix.AddSource(ntab.RandomNodes()) return nil } // 设置引导节点,更快发现指定网络 func (server *Server) setupBootstrapNodes() (err error) { urls := params.MainnetBootNodes server.BootstrapNodes = make([]*enode.Node, 0, len(urls)) for _, url := range urls { if url != "" { node, err := enode.Parse(enode.ValidSchemes, url) if err != nil { return err } server.BootstrapNodes = append(server.BootstrapNodes, node) } } return nil } // 设置拨号调度器 func (server *Server) setupDialScheduler() { server.MaxPeers = params.MaxPeers server.MaxPendingPeers = params.MaxPendingPeers config := dialConfig{ self: server.localnode.ID(), maxDialPeers: server.maxDialedConns(), maxActiveDials: server.MaxPendingPeers, log: server.Logger, netRestrict: server.NetRestrict, dialer: server.Dialer, clock: server.clock, } if server.ntab != nil { config.resolver = server.ntab } if config.dialer == nil { config.dialer = tcpDialer{&net.Dialer{Timeout: defaultDialTimeout}} } server.dialsched = newDialScheduler(config, server.discmix, server.SetupConn) } func (server *Server) maxDialedConns() (limit int) { if server.NoDial || server.MaxPeers == 0 { return 0 } if server.DialRatio == 0 { limit = server.MaxPeers / defaultDialRatio } else { limit = server.MaxPeers / server.DialRatio } if limit == 0 { limit = 1 } return limit } // 配置本地节点 func (server *Server) setupLocalNode() (err error) { // 创建本地节点 server.localnode = enode.NewLocalNode(server.PrivateKey) server.localnode.SetFallbackIP(net.IP{127, 0, 0, 1}) // 配置本地静态IP ip, _ := server.NAT.ExternalIP() server.localnode.SetStaticIP(ip) return nil } // 设置新节点连接监听器 func (server *Server) setupListening() (err error) { listener, err := net.Listen("tcp", params.ListenerPort) if err != nil { return err } server.listener = listener server.ListenAddr = listener.Addr().String() if tcp, ok := listener.Addr().(*net.TCPAddr); ok { server.localnode.Set(enr.TCP(tcp.Port)) if !tcp.IP.IsLoopback() && server.NAT != nil { server.loopWG.Add(1) gopool.Submit(func() { nat.Map(server.NAT, server.quit, "tcp", tcp.Port, tcp.Port, "blockchain-p2p") server.loopWG.Done() }) } } server.loopWG.Add(1) go server.listenLoop() return nil } func (server *Server) listenLoop() { fmt.Printf("TCP Listener up, addr: %v.\n", server.listener.Addr()) tokens := defaultMaxPendingPeers slots := make(chan struct{}, tokens) for i := 0; i < tokens; i++ { slots <- struct{}{} } defer server.loopWG.Done() defer func() { for i := 0; i < cap(slots); i++ { <-slots } }() for { <-slots var ( fd net.Conn err error lastLogTime time.Time ) // accept处理 for { fd, err = server.listener.Accept() if netutil.IsTemporaryError(err) { if time.Since(lastLogTime) > 1*time.Second { fmt.Errorf("temporary read error, err: %v", err) lastLogTime = time.Now() } time.Sleep(time.Millisecond * 200) continue } else if err != nil { fmt.Errorf("read error, err: %v", err) slots <- struct{}{} return } break } // accept成功的处理 remoteIP := netutil.AddrIP(fd.RemoteAddr()) // TODO 检查此IP是是否能加入本地节点的链接 //if err := server.checkInboundConn(remoteIP); err != nil { // srv.log.Debug("Rejected inbound connection", "addr", fd.RemoteAddr(), "err", err) // fd.Close() // slots <- struct{}{} // continue //} if remoteIP != nil { var addr *net.TCPAddr if tcp, ok := fd.RemoteAddr().(*net.TCPAddr); ok { addr = tcp } fd = newMeteredConn(fd, true, addr) fmt.Printf("Accepted connection, addr: %v.", fd.RemoteAddr()) } gopool.Submit(func() { server.SetupConn(fd, inboundConn, nil) slots <- struct{}{} }) } } func (server *Server) newRLPX(conn net.Conn, dialDest *ecdsa.PublicKey) transport { return &rlpxTransport{conn: rlpx.NewConn(conn, dialDest)} } func (server *Server) SetupConn(fd net.Conn, flags connFlag, dialDest *enode.Node) error { fmt.Printf("setup conn %v.", fd.RemoteAddr()) c := &conn{fd: fd, flags: flags, cont: make(chan error)} if dialDest == nil { c.transport = server.newRLPX(fd, nil) } else { c.transport = server.newRLPX(fd, dialDest.Pubkey()) } err := server.setupConn(c, dialDest) if err != nil { c.close(err) } return err } func (server *Server) setupConn(c *conn, dialDest *enode.Node) error { remotePubkey, err := c.doEncHandshake(server.PrivateKey) if err != nil { return err } // 将connection转换成node c.node = enode.NodeFromConn(remotePubkey, c.fd) fmt.Printf("id: %v, addr: %v, conn: %v", c.node.ID(), c.fd.RemoteAddr(), c.flags) // 检查是否需要握手 err = server.checkpoint(c, server.checkpointPostHandshake) if err != nil { return err } // 进行握手 phs, err := c.doProtoHandshake(server.ourHandshake) if err != nil { return err } c.caps, c.name = phs.Caps, phs.Name // 握手成功后将此链接放入addPeer的检查点 err = server.checkpoint(c, server.checkpointAddPeer) if err != nil { return err } return nil } func (server *Server) checkpoint(c *conn, stage chan<- *conn) error { select { case stage <- c: case <-server.quit: return ErrServerStopped } return <-c.cont }