소스 검색

准备实现conn->peer的部分,需要实现eth协议

skyfffire 2 년 전
부모
커밋
623f4e614a
2개의 변경된 파일70개의 추가작업 그리고 15개의 파일을 삭제
  1. 57 8
      p2p/peer.go
  2. 13 7
      p2p/server.go

+ 57 - 8
p2p/peer.go

@@ -1,8 +1,10 @@
 package p2p
 
 import (
+	"blockchain-go/common/mclock"
 	"blockchain-go/p2p/enode"
 	"errors"
+	"sort"
 	"time"
 )
 
@@ -75,16 +77,63 @@ type PeerEvent struct {
 
 // Peer represents a connected remote node.
 type Peer struct {
-	//rw      *conn
-	//running map[string]*protoRW
-	//log     log.Logger
-	//created mclock.AbsTime
+	rw      *conn
+	running map[string]*protoRW
+	created mclock.AbsTime
 
-	//wg       sync.WaitGroup
-	//protoErr chan error
-	//closed   chan struct{}
-	//disc     chan DiscReason
+	protoErr chan error
+	closed   chan struct{}
+	disc     chan DiscReason
 
+	//wg       sync.WaitGroup
+	//log     log.Logger
 	// events receives message send / receive events if set
 	//events *event.Feed
 }
+
+func newPeer(conn *conn, protocols []Protocol) *Peer {
+	protoMap := matchProtocols(protocols, conn.caps, conn)
+	p := &Peer{
+		rw:       conn,
+		running:  protoMap,
+		created:  mclock.Now(),
+		protoErr: make(chan error, len(protoMap)+1),
+		closed:   make(chan struct{}),
+	}
+
+	return p
+}
+
+type protoRW struct {
+	Protocol
+	in     chan Msg        // receives read messages
+	closed <-chan struct{} // receives when peer is shutting down
+	wstart <-chan struct{} // receives when write may start
+	werr   chan<- error    // for write results
+	offset uint64
+	w      MsgWriter
+}
+
+func matchProtocols(protocols []Protocol, caps []Cap, rw MsgReadWriter) map[string]*protoRW {
+	sort.Sort(capsByNameAndVersion(caps))
+	offset := baseProtocolLength
+	result := make(map[string]*protoRW)
+
+outer:
+	for _, capability := range caps {
+		for _, proto := range protocols {
+			if proto.Name == capability.Name && proto.Version == capability.Version {
+				if old := result[capability.Name]; old != nil {
+					offset -= old.Length
+				}
+
+				result[capability.Name] = &protoRW{Protocol: proto, offset: offset, in: make(chan Msg), w: rw}
+				offset += proto.Length
+
+				continue outer
+			}
+		}
+	}
+
+	return result
+}

+ 13 - 7
p2p/server.go

@@ -3,7 +3,6 @@ package p2p
 import (
 	"blockchain-go/common/gopool"
 	"blockchain-go/p2p/discover"
-	"blockchain-go/p2p/dnsdisc"
 	"blockchain-go/p2p/enode"
 	"blockchain-go/p2p/enr"
 	"blockchain-go/p2p/nat"
@@ -131,6 +130,12 @@ running:
 	}
 }
 
+func (server *Server) launchPeer(c *conn) *Peer {
+	p := newPeer(c, server.Protocols)
+	// TODO 首先完成eth协议,封装在在eth/protocols/eth/handler.go
+	return p
+}
+
 func (server *Server) StopListener() {
 	<-server.sigs
 
@@ -163,10 +168,10 @@ func (server *Server) setupCaps() (err error) {
 func (server *Server) setupDiscovery() (err error) {
 	server.discmix = enode.NewFairMix(discmixTimeout)
 
-	// 添加特定协议的发现源。
-	dnsclient := dnsdisc.NewClient(dnsdisc.Config{})
-	dialCandidates, err := dnsclient.NewIterator()
-	server.discmix.AddSource(dialCandidates)
+	// TODO 添加特定协议的发现源。
+	//dnsclient := dnsdisc.NewClient(dnsdisc.Config{})
+	//dialCandidates, err := dnsclient.NewIterator()
+	//server.discmix.AddSource(dialCandidates)
 
 	addr, err := net.ResolveUDPAddr("udp", server.ListenAddr)
 	if err != nil {
@@ -389,7 +394,7 @@ func (server *Server) newRLPX(conn net.Conn, dialDest *ecdsa.PublicKey) transpor
 }
 
 func (server *Server) SetupConn(fd net.Conn, flags connFlag, dialDest *enode.Node) error {
-	fmt.Printf("setup conn %v.", fd.RemoteAddr())
+	//fmt.Printf("Setup conn %v.\n", fd.RemoteAddr())
 
 	c := &conn{fd: fd, flags: flags, cont: make(chan error)}
 	if dialDest == nil {
@@ -413,7 +418,7 @@ func (server *Server) setupConn(c *conn, dialDest *enode.Node) error {
 
 	// 将connection转换成node
 	c.node = enode.NodeFromConn(remotePubkey, c.fd)
-	fmt.Printf("id: %v, addr: %v, conn: %v", c.node.ID(), c.fd.RemoteAddr(), c.flags)
+	//fmt.Printf("Parse node: id: %v, addr: %v\n", c.node.ID(), c.fd.RemoteAddr())
 
 	// 检查是否需要握手
 	err = server.checkpoint(c, server.checkpointPostHandshake)
@@ -427,6 +432,7 @@ func (server *Server) setupConn(c *conn, dialDest *enode.Node) error {
 		return err
 	}
 	c.caps, c.name = phs.Caps, phs.Name
+	//fmt.Printf("Handshake ok, id: %v, addr: %v.\n", c.node.ID(), c.fd.RemoteAddr())
 
 	// 握手成功后将此链接放入addPeer的检查点
 	err = server.checkpoint(c, server.checkpointAddPeer)