package p2p import ( "blockchain-go/common/gopool" "blockchain-go/p2p/discover" "blockchain-go/p2p/enode" "blockchain-go/p2p/enr" "blockchain-go/p2p/nat" "blockchain-go/p2p/rlpx" "blockchain-go/params" "crypto/ecdsa" "errors" "fmt" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/p2p/netutil" "net" "os" "os/signal" "sort" "sync" "syscall" "time" ) var ( ErrServerStopped = errors.New("server stopped") ) // Server manages all peer connections. type Server struct { Config //newTransport func(net.Conn, *ecdsa.PublicKey) transport //newPeerHook func(*Peer) //listenFunc func(network, addr string) (net.Listener, error) lock sync.Mutex // protects running running bool listener net.Listener ourHandshake *protoHandshake loopWG sync.WaitGroup // loop, listenLoop //peerFeed event.Feed //log log.Logger //nodedb *enode.DB localnode *enode.LocalNode ntab *discover.UDPv4 //DiscV5 *discover.UDPv5 discmix *enode.FairMix dialsched *dialScheduler quit chan struct{} //addtrusted chan *enode.Node //removetrusted chan *enode.Node //peerOp chan peerOpFunc //peerOpDone chan struct{} //delpeer chan peerDrop checkpointPostHandshake chan *conn checkpointAddPeer chan *conn sigs chan os.Signal //inboundHistory expHeap } func (server *Server) Start() (err error) { server.sigs = make(chan os.Signal, 1) signal.Notify(server.sigs, syscall.SIGINT, syscall.SIGTERM) server.quit = make(chan struct{}) server.checkpointPostHandshake = make(chan *conn) server.checkpointAddPeer = make(chan *conn) // 配置远端引导节点 if err := server.setupBootstrapNodes(); err != nil { return err } // 配置本地协议 if err := server.setupCaps(); err != nil { return nil } // 配置本地节点 if err := server.setupLocalNode(); err != nil { return err } // 配置节点连接监听器 if err := server.setupListening(); err != nil { return err } // 配置节点发现器 if err := server.setupDiscovery(); err != nil { return err } // 配置拨号调度 server.setupDialScheduler() // server核心运行 go server.StopListener() server.run() return nil } func (server *Server) run() { server.loopWG.Add(1) fmt.Printf("Started P2P networking, self: %v.\n", server.localnode.Node().URLv4()) var ( peers = make(map[enode.ID]*Peer) inboundCount = 0 ) running: for { select { case <-server.quit: fmt.Printf("exit signal by user.\n") break running case c := <-server.checkpointPostHandshake: //p := server.launchPeer(c) //fmt.Printf("Check peer in local: %v\n", p.rw.node.IP()) c.cont <- server.postHandshakeChecks(peers, inboundCount, c) case c := <-server.checkpointAddPeer: p := server.launchPeer(c) peers[c.node.ID()] = p server.dialsched.peerAdded(c) if p.Inbound() { inboundCount++ } fmt.Printf("Adding p2p peer: %v, peer count: %v.\n", p.rw.node.IP(), len(peers)) } } } func (server *Server) postHandshakeChecks(peers map[enode.ID]*Peer, inboundCount int, c *conn) error { switch { case !c.is(trustedConn) && len(peers) >= server.MaxPeers: return DiscTooManyPeers // TODO 暂时不做入站出站限制 //case !c.is(trustedConn) && c.is(inboundConn) && inboundCount >= server.maxInboundConns(): // return DiscTooManyPeers case peers[c.node.ID()] != nil: return DiscAlreadyConnected case c.node.ID() == server.localnode.ID(): return DiscSelf default: return nil } } func (server *Server) launchPeer(c *conn) *Peer { p := newPeer(c, server.Protocols) gopool.Submit(func() { server.runPeer(p) }) return p } func (server *Server) runPeer(p *Peer) { // TODO 实现peer的细节 } func (server *Server) StopListener() { <-server.sigs close(server.quit) server.loopWG.Done() server.discmix.Close() server.dialsched.stop() } // 本地协议 func (server *Server) setupCaps() (err error) { // 创建握手器 publicKey := crypto.FromECDSAPub(&server.PrivateKey.PublicKey) server.ourHandshake = &protoHandshake{ Version: baseProtocolVersion, Name: server.Name, ID: publicKey[1:], } // 配置握手器 for _, capability := range OurCaps { server.ourHandshake.Caps = append(server.ourHandshake.Caps, capability) } sort.Sort(capsByNameAndVersion(server.ourHandshake.Caps)) return nil } // 配置节点发现逻辑 func (server *Server) setupDiscovery() (err error) { server.discmix = enode.NewFairMix(discmixTimeout) // TODO 添加特定协议的发现源。 //dnsclient := dnsdisc.NewClient(dnsdisc.Config{}) //dialCandidates, err := dnsclient.NewIterator() //server.discmix.AddSource(dialCandidates) addr, err := net.ResolveUDPAddr("udp", server.ListenAddr) if err != nil { return err } conn, err := net.ListenUDP("udp", addr) if err != nil { return err } realAddr := conn.LocalAddr().(*net.UDPAddr) fmt.Printf("UDP listener up, addr: %v.\n", realAddr) if server.NAT != nil { if !realAddr.IP.IsLoopback() { server.loopWG.Add(1) gopool.Submit(func() { nat.Map(server.NAT, server.quit, "udp", realAddr.Port, realAddr.Port, "blockchain-p2p discovery") server.loopWG.Done() }) } } server.localnode.SetFallbackUDP(realAddr.Port) // 设置V4的发现协议 var unhandled chan discover.ReadPacket //var sconn *sharedUDPConn //if server.DiscoveryV5 { // unhandled = make(chan discover.ReadPacket, 100) // sconn = &sharedUDPConn{conn, unhandled} //} cfg := discover.Config{ PrivateKey: server.PrivateKey, NetRestrict: server.NetRestrict, Bootnodes: server.BootstrapNodes, Unhandled: unhandled, //Log: server.log, } ntab, err := discover.ListenV4(conn, server.localnode, cfg) if err != nil { return err } server.ntab = ntab server.discmix.AddSource(ntab.RandomNodes()) return nil } // 设置引导节点,更快发现指定网络 func (server *Server) setupBootstrapNodes() (err error) { urls := params.MainnetBootNodes server.BootstrapNodes = make([]*enode.Node, 0, len(urls)) for _, url := range urls { if url != "" { node, err := enode.Parse(enode.ValidSchemes, url) if err != nil { return err } server.BootstrapNodes = append(server.BootstrapNodes, node) } } return nil } // 设置拨号调度器 func (server *Server) setupDialScheduler() { server.MaxPeers = params.MaxPeers server.MaxPendingPeers = params.MaxPendingPeers config := dialConfig{ self: server.localnode.ID(), maxDialPeers: server.maxDialedConns(), maxActiveDials: server.MaxPendingPeers, log: server.Logger, netRestrict: server.NetRestrict, dialer: server.Dialer, clock: server.clock, } if server.ntab != nil { config.resolver = server.ntab } if config.dialer == nil { config.dialer = tcpDialer{&net.Dialer{Timeout: defaultDialTimeout}} } server.dialsched = newDialScheduler(config, server.discmix, server.SetupConn) } func (server *Server) maxDialedConns() (limit int) { if server.NoDial || server.MaxPeers == 0 { return 0 } if server.DialRatio == 0 { limit = server.MaxPeers / defaultDialRatio } else { limit = server.MaxPeers / server.DialRatio } if limit == 0 { limit = 1 } return limit } // 配置本地节点 func (server *Server) setupLocalNode() (err error) { // 创建本地节点 server.localnode = enode.NewLocalNode(server.PrivateKey) server.localnode.SetFallbackIP(net.IP{127, 0, 0, 1}) // 配置本地静态IP ip, _ := server.NAT.ExternalIP() server.localnode.SetStaticIP(ip) return nil } // 设置新节点连接监听器 func (server *Server) setupListening() (err error) { listener, err := net.Listen("tcp", params.ListenerPort) if err != nil { return err } server.listener = listener server.ListenAddr = listener.Addr().String() if tcp, ok := listener.Addr().(*net.TCPAddr); ok { server.localnode.Set(enr.TCP(tcp.Port)) if !tcp.IP.IsLoopback() && server.NAT != nil { server.loopWG.Add(1) gopool.Submit(func() { nat.Map(server.NAT, server.quit, "tcp", tcp.Port, tcp.Port, "blockchain-p2p") server.loopWG.Done() }) } } server.loopWG.Add(1) go server.listenLoop() return nil } func (server *Server) listenLoop() { fmt.Printf("TCP Listener up, addr: %v.\n", server.listener.Addr()) tokens := defaultMaxPendingPeers slots := make(chan struct{}, tokens) for i := 0; i < tokens; i++ { slots <- struct{}{} } defer server.loopWG.Done() defer func() { for i := 0; i < cap(slots); i++ { <-slots } }() for { <-slots var ( fd net.Conn err error lastLogTime time.Time ) // accept处理 for { fd, err = server.listener.Accept() if netutil.IsTemporaryError(err) { if time.Since(lastLogTime) > 1*time.Second { fmt.Errorf("temporary read error, err: %v", err) lastLogTime = time.Now() } time.Sleep(time.Millisecond * 200) continue } else if err != nil { fmt.Errorf("read error, err: %v", err) slots <- struct{}{} return } break } // accept成功的处理 remoteIP := netutil.AddrIP(fd.RemoteAddr()) // TODO 检查此IP是是否能加入本地节点的链接 //if err := server.checkInboundConn(remoteIP); err != nil { // srv.log.Debug("Rejected inbound connection", "addr", fd.RemoteAddr(), "err", err) // fd.Close() // slots <- struct{}{} // continue //} if remoteIP != nil { var addr *net.TCPAddr if tcp, ok := fd.RemoteAddr().(*net.TCPAddr); ok { addr = tcp } fd = newMeteredConn(fd, true, addr) fmt.Printf("Accepted connection, addr: %v.", fd.RemoteAddr()) } gopool.Submit(func() { server.SetupConn(fd, inboundConn, nil) slots <- struct{}{} }) } } func (server *Server) newRLPX(conn net.Conn, dialDest *ecdsa.PublicKey) transport { return &rlpxTransport{conn: rlpx.NewConn(conn, dialDest)} } func (server *Server) SetupConn(fd net.Conn, flags connFlag, dialDest *enode.Node) error { //fmt.Printf("Setup conn %v.\n", fd.RemoteAddr()) c := &conn{fd: fd, flags: flags, cont: make(chan error)} if dialDest == nil { c.transport = server.newRLPX(fd, nil) } else { c.transport = server.newRLPX(fd, dialDest.Pubkey()) } err := server.setupConn(c, dialDest) if err != nil { c.close(err) } return err } func (server *Server) setupConn(c *conn, dialDest *enode.Node) error { remotePubkey, err := c.doEncHandshake(server.PrivateKey) if err != nil { return err } // 将connection转换成node c.node = enode.NodeFromConn(remotePubkey, c.fd) //fmt.Printf("Parse node: id: %v, addr: %v\n", c.node.ID(), c.fd.RemoteAddr()) // 检查是否需要握手 err = server.checkpoint(c, server.checkpointPostHandshake) if err != nil { return err } // 进行握手 phs, err := c.doProtoHandshake(server.ourHandshake) if err != nil { return err } c.caps, c.name = phs.Caps, phs.Name //fmt.Printf("Handshake ok, id: %v, addr: %v.\n", c.node.ID(), c.fd.RemoteAddr()) // 握手成功后将此链接放入addPeer的检查点 err = server.checkpoint(c, server.checkpointAddPeer) if err != nil { return err } return nil } func (server *Server) checkpoint(c *conn, stage chan<- *conn) error { select { case stage <- c: case <-server.quit: return ErrServerStopped } return <-c.cont }