فهرست منبع

监听loop逻辑梳理完毕,等待实现。

skyfffire 2 سال پیش
والد
کامیت
085258a0d7
1فایلهای تغییر یافته به همراه76 افزوده شده و 0 حذف شده
  1. 76 0
      p2p/server.go

+ 76 - 0
p2p/server.go

@@ -6,9 +6,12 @@ import (
 	"blockchain-go/p2p/enr"
 	"blockchain-go/p2p/nat"
 	"errors"
+	"fmt"
 	"github.com/ethereum/go-ethereum/crypto"
+	"github.com/ethereum/go-ethereum/p2p/netutil"
 	"net"
 	"sync"
+	"time"
 )
 
 var (
@@ -57,6 +60,8 @@ type Server struct {
 }
 
 func (server *Server) Start() (err error) {
+	server.quit = make(chan struct{})
+
 	if err := server.setupLocalNode(); err != nil {
 		return err
 	}
@@ -113,9 +118,80 @@ func (server *Server) setupListening() (err error) {
 		}
 	}
 
+	server.loopWG.Add(1)
+	go server.listenLoop()
+
 	return nil
 }
 
+func (server *Server) listenLoop() {
+	fmt.Printf("TCP Listener up, addr: %v.", server.listener.Addr())
+
+	tokens := defaultMaxPendingPeers
+	slots := make(chan struct{}, tokens)
+	for i := 0; i < tokens; i++ {
+		slots <- struct{}{}
+	}
+
+	defer server.loopWG.Done()
+	defer func() {
+		for i := 0; i < cap(slots); i++ {
+			<-slots
+		}
+	}()
+
+	for {
+		<-slots
+
+		var (
+			fd          net.Conn
+			err         error
+			lastLogTime time.Time
+		)
+
+		// accept处理
+		for {
+			fd, err = server.listener.Accept()
+			if netutil.IsTemporaryError(err) {
+				if time.Since(lastLogTime) > 1*time.Second {
+					fmt.Errorf("temporary read error, err: %v", err)
+
+					lastLogTime = time.Now()
+				}
+				time.Sleep(time.Millisecond * 200)
+				continue
+			} else if err != nil {
+				fmt.Errorf("read error, err: %v", err)
+				slots <- struct{}{}
+				return
+			}
+			break
+		}
+
+		// accept成功的处理
+		remoteIP := netutil.AddrIP(fd.RemoteAddr())
+		// 检查此IP是是否能加入本地节点的链接
+		//if err := server.checkInboundConn(remoteIP); err != nil {
+		//	srv.log.Debug("Rejected inbound connection", "addr", fd.RemoteAddr(), "err", err)
+		//	fd.Close()
+		//	slots <- struct{}{}
+		//	continue
+		//}
+		if remoteIP != nil {
+			//var addr *net.TCPAddr
+			//if tcp, ok := fd.RemoteAddr().(*net.TCPAddr); ok {
+			//	addr = tcp
+			//}
+			//fd = newMeteredConn(fd, true, addr)
+			//srv.log.Trace("Accepted connection", "addr", fd.RemoteAddr())
+		}
+		gopool.Submit(func() {
+			//server.setupConn(fd, inboundConn, nil)
+			slots <- struct{}{}
+		})
+	}
+}
+
 // 配置节点发现逻辑
 func (server *Server) setupDiscovery() (err error) {
 	return nil