| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267 |
- package p2p
- import (
- "blockchain-go/common/gopool"
- "blockchain-go/p2p/enode"
- "blockchain-go/p2p/enr"
- "blockchain-go/p2p/nat"
- "blockchain-go/p2p/rlpx"
- "crypto/ecdsa"
- "errors"
- "fmt"
- "github.com/ethereum/go-ethereum/crypto"
- "github.com/ethereum/go-ethereum/p2p/netutil"
- "net"
- "sync"
- "time"
- )
- var (
- ErrServerStopped = errors.New("server stopped")
- )
- // Server manages all peer connections.
- type Server struct {
- // Config fields may not be modified while the server is running.
- Config
- // Hooks for testing. These are useful because we can inhibit
- // the whole protocol stack.
- //newTransport func(net.Conn, *ecdsa.PublicKey) transport
- //newPeerHook func(*Peer)
- //listenFunc func(network, addr string) (net.Listener, error)
- lock sync.Mutex // protects running
- running bool
- listener net.Listener
- ourHandshake *protoHandshake
- loopWG sync.WaitGroup // loop, listenLoop
- //peerFeed event.Feed
- //log log.Logger
- //nodedb *enode.DB
- localnode *enode.LocalNode
- //ntab *discover.UDPv4
- //DiscV5 *discover.UDPv5
- //discmix *enode.FairMix
- //dialsched *dialScheduler
- // Channels into the run loop.
- quit chan struct{}
- //addtrusted chan *enode.Node
- //removetrusted chan *enode.Node
- //peerOp chan peerOpFunc
- //peerOpDone chan struct{}
- //delpeer chan peerDrop
- checkpointPostHandshake chan *conn
- checkpointAddPeer chan *conn
- // State of run loop and listenLoop.
- //inboundHistory expHeap
- }
- func (server *Server) Start() (err error) {
- server.quit = make(chan struct{})
- server.checkpointPostHandshake = make(chan *conn)
- server.checkpointAddPeer = make(chan *conn)
- if err := server.setupLocalNode(); err != nil {
- return err
- }
- if err := server.setupListening(); err != nil {
- return err
- }
- return nil
- }
- // 配置本地节点
- func (server *Server) setupLocalNode() (err error) {
- // 创建握手所需对象
- publicKey := crypto.FromECDSAPub(&server.PrivateKey.PublicKey)
- server.ourHandshake = &protoHandshake{
- Version: baseProtocolVersion,
- Name: server.Name,
- ID: publicKey[1:],
- }
- // 创建本地节点
- server.localnode = enode.NewLocalNode(server.PrivateKey)
- server.localnode.SetFallbackIP(net.IP{127, 0, 0, 1})
- // 配置本地静态IP
- ip, _ := server.NAT.ExternalIP()
- server.localnode.SetStaticIP(ip)
- return nil
- }
- // 监听器
- func (server *Server) setupListening() (err error) {
- listener, err := net.Listen("tcp", server.ListenAddr)
- if err != nil {
- return err
- }
- server.listener = listener
- server.ListenAddr = listener.Addr().String()
- if tcp, ok := listener.Addr().(*net.TCPAddr); ok {
- server.localnode.Set(enr.TCP(tcp.Port))
- if !tcp.IP.IsLoopback() && server.NAT != nil {
- server.loopWG.Add(1)
- gopool.Submit(func() {
- nat.Map(server.NAT, server.quit, "tcp", tcp.Port, tcp.Port, "ethereum p2p")
- server.loopWG.Done()
- })
- }
- }
- server.loopWG.Add(1)
- go server.listenLoop()
- return nil
- }
- func (server *Server) listenLoop() {
- fmt.Printf("TCP Listener up, addr: %v.", server.listener.Addr())
- tokens := defaultMaxPendingPeers
- slots := make(chan struct{}, tokens)
- for i := 0; i < tokens; i++ {
- slots <- struct{}{}
- }
- defer server.loopWG.Done()
- defer func() {
- for i := 0; i < cap(slots); i++ {
- <-slots
- }
- }()
- for {
- <-slots
- var (
- fd net.Conn
- err error
- lastLogTime time.Time
- )
- // accept处理
- for {
- fd, err = server.listener.Accept()
- if netutil.IsTemporaryError(err) {
- if time.Since(lastLogTime) > 1*time.Second {
- fmt.Errorf("temporary read error, err: %v", err)
- lastLogTime = time.Now()
- }
- time.Sleep(time.Millisecond * 200)
- continue
- } else if err != nil {
- fmt.Errorf("read error, err: %v", err)
- slots <- struct{}{}
- return
- }
- break
- }
- // accept成功的处理
- remoteIP := netutil.AddrIP(fd.RemoteAddr())
- // TODO 检查此IP是是否能加入本地节点的链接
- //if err := server.checkInboundConn(remoteIP); err != nil {
- // srv.log.Debug("Rejected inbound connection", "addr", fd.RemoteAddr(), "err", err)
- // fd.Close()
- // slots <- struct{}{}
- // continue
- //}
- if remoteIP != nil {
- var addr *net.TCPAddr
- if tcp, ok := fd.RemoteAddr().(*net.TCPAddr); ok {
- addr = tcp
- }
- fd = newMeteredConn(fd, true, addr)
- fmt.Printf("Accepted connection, addr: %v.", fd.RemoteAddr())
- }
- gopool.Submit(func() {
- server.SetupConn(fd, inboundConn, nil)
- slots <- struct{}{}
- })
- }
- }
- func (server *Server) newRLPX(conn net.Conn, dialDest *ecdsa.PublicKey) transport {
- return &rlpxTransport{conn: rlpx.NewConn(conn, dialDest)}
- }
- func (server *Server) SetupConn(fd net.Conn, flags connFlag, dialDest *enode.Node) error {
- c := &conn{fd: fd, flags: flags, cont: make(chan error)}
- if dialDest == nil {
- c.transport = server.newRLPX(fd, nil)
- } else {
- c.transport = server.newRLPX(fd, dialDest.Pubkey())
- }
- err := server.setupConn(c, dialDest)
- if err != nil {
- c.close(err)
- }
- return err
- }
- func (server *Server) setupConn(c *conn, dialDest *enode.Node) error {
- remotePubkey, err := c.doEncHandshake(server.PrivateKey)
- if err != nil {
- return err
- }
- // 将connection转换成node
- c.node = enode.NodeFromConn(remotePubkey, c.fd)
- fmt.Printf("id: %v, addr: %v, conn: %v", c.node.ID(), c.fd.RemoteAddr(), c.flags)
- // 检查是否需要握手
- err = server.checkpoint(c, server.checkpointPostHandshake)
- if err != nil {
- return err
- }
- // 进行握手
- phs, err := c.doProtoHandshake(server.ourHandshake)
- if err != nil {
- return err
- }
- c.caps, c.name = phs.Caps, phs.Name
- // 将此链接放入addPeer的检查点
- err = server.checkpoint(c, server.checkpointAddPeer)
- if err != nil {
- return err
- }
- return nil
- }
- func (server *Server) checkpoint(c *conn, stage chan<- *conn) error {
- select {
- case stage <- c:
- case <-server.quit:
- return ErrServerStopped
- }
- return <-c.cont
- }
- // 配置节点发现逻辑
- func (server *Server) setupDiscovery() (err error) {
- return nil
- }
- // 设置拨号调度器
- func (server *Server) setupDialScheduler() (err error) {
- return nil
- }
|