Bladeren bron

实现了discover挂载。

skyfffire 2 jaren geleden
bovenliggende
commit
a4ea5af639
6 gewijzigde bestanden met toevoegingen van 176 en 93 verwijderingen
  1. 1 1
      p2p/config.go
  2. 4 3
      p2p/discover/lookup.go
  3. 18 18
      p2p/discover/table.go
  4. 49 49
      p2p/discover/v4_udp.go
  5. 13 13
      p2p/discover/v5_udp.go
  6. 91 9
      p2p/server.go

+ 1 - 1
p2p/config.go

@@ -94,7 +94,7 @@ type Config struct {
 	// Protocols should contain the protocols supported
 	// by the server. Matching protocols are launched for
 	// each peer.
-	//Protocols []Protocol `toml:"-"`
+	Protocols []Protocol `toml:"-"`
 
 	// If ListenAddr is set to a non-nil address, the server
 	// will listen for incoming connections.

+ 4 - 3
p2p/discover/lookup.go

@@ -142,7 +142,8 @@ func (it *lookup) slowdown() {
 }
 
 func (it *lookup) query(n *node, reply chan<- []*node) {
-	fails := it.tab.db.FindFails(n.ID(), n.IP())
+	//fails := it.tab.db.FindFails(n.ID(), n.IP())
+	fails := 0
 	r, err := it.queryfunc(n)
 	if err == errClosed {
 		// Avoid recording failures on shutdown.
@@ -150,7 +151,7 @@ func (it *lookup) query(n *node, reply chan<- []*node) {
 		return
 	} else if len(r) == 0 {
 		fails++
-		it.tab.db.UpdateFindFails(n.ID(), n.IP(), fails)
+		//it.tab.db.UpdateFindFails(n.ID(), n.IP(), fails)
 		// Remove the node from the local table if it fails to return anything useful too
 		// many times, but only if there are enough other nodes in the bucket.
 		dropped := false
@@ -161,7 +162,7 @@ func (it *lookup) query(n *node, reply chan<- []*node) {
 		it.tab.log.Trace("FINDNODE failed", "id", n.ID(), "failcount", fails, "dropped", dropped, "err", err)
 	} else if fails > 0 {
 		// Reset failure counter because it counts _consecutive_ failures.
-		it.tab.db.UpdateFindFails(n.ID(), n.IP(), 0)
+		//it.tab.db.UpdateFindFails(n.ID(), n.IP(), 0)
 	}
 
 	// Grab as many nodes as possible. Some of them might not be alive anymore, but we'll

+ 18 - 18
p2p/discover/table.go

@@ -72,8 +72,8 @@ type Table struct {
 	rand    *mrand.Rand       // source of randomness, periodically reseeded
 	ips     netutil.DistinctNetSet
 
-	log        log.Logger
-	db         *enode.DB // database of known nodes
+	log log.Logger
+	//db         *enode.DB // database of known nodes
 	net        transport
 	refreshReq chan chan struct{}
 	initDone   chan struct{}
@@ -100,10 +100,10 @@ type bucket struct {
 	ips          netutil.DistinctNetSet
 }
 
-func newTable(t transport, db *enode.DB, bootnodes []*enode.Node, log log.Logger) (*Table, error) {
+func newTable(t transport, bootnodes []*enode.Node, log log.Logger) (*Table, error) {
 	tab := &Table{
-		net:        t,
-		db:         db,
+		net: t,
+		//db:         db,
 		refreshReq: make(chan chan struct{}),
 		initDone:   make(chan struct{}),
 		closeReq:   make(chan struct{}),
@@ -121,7 +121,7 @@ func newTable(t transport, db *enode.DB, bootnodes []*enode.Node, log log.Logger
 		}
 	}
 	tab.seedRand()
-	tab.loadSeedNodes()
+	//tab.loadSeedNodes()
 
 	return tab, nil
 }
@@ -296,7 +296,7 @@ func (tab *Table) doRefresh(done chan struct{}) {
 	// Load nodes from the database and insert
 	// them. This should yield a few previously seen nodes that are
 	// (hopefully) still alive.
-	tab.loadSeedNodes()
+	//tab.loadSeedNodes()
 
 	// Run self lookup to discover new neighbor nodes.
 	tab.net.lookupSelf()
@@ -312,16 +312,16 @@ func (tab *Table) doRefresh(done chan struct{}) {
 	}
 }
 
-func (tab *Table) loadSeedNodes() {
-	seeds := wrapNodes(tab.db.QuerySeeds(seedCount, seedMaxAge))
-	seeds = append(seeds, tab.nursery...)
-	for i := range seeds {
-		seed := seeds[i]
-		age := log.Lazy{Fn: func() interface{} { return time.Since(tab.db.LastPongReceived(seed.ID(), seed.IP())) }}
-		tab.log.Trace("Found seed node in database", "id", seed.ID(), "addr", seed.addr(), "age", age)
-		tab.addSeenNode(seed)
-	}
-}
+//func (tab *Table) loadSeedNodes() {
+//	seeds := wrapNodes(tab.db.QuerySeeds(seedCount, seedMaxAge))
+//	seeds = append(seeds, tab.nursery...)
+//	for i := range seeds {
+//		seed := seeds[i]
+//		age := log.Lazy{Fn: func() interface{} { return time.Since(tab.db.LastPongReceived(seed.ID(), seed.IP())) }}
+//		tab.log.Trace("Found seed node in database", "id", seed.ID(), "addr", seed.addr(), "age", age)
+//		tab.addSeenNode(seed)
+//	}
+//}
 
 // doRevalidate checks that the last node in a random bucket is still live and replaces or
 // deletes the node if it isn't.
@@ -398,7 +398,7 @@ func (tab *Table) copyLiveNodes() {
 	for _, b := range &tab.buckets {
 		for _, n := range b.entries {
 			if n.livenessChecks > 0 && now.Sub(n.addedAt) >= seedMinTableTime {
-				tab.db.UpdateNode(unwrapNode(n))
+				//tab.db.UpdateNode(unwrapNode(n))
 			}
 		}
 	}

+ 49 - 49
p2p/discover/v4_udp.go

@@ -72,10 +72,10 @@ type UDPv4 struct {
 	netrestrict *netutil.Netlist
 	priv        *ecdsa.PrivateKey
 	localNode   *enode.LocalNode
-	db          *enode.DB
-	tab         *Table
-	closeOnce   sync.Once
-	wg          sync.WaitGroup
+	//db          *enode.DB
+	tab       *Table
+	closeOnce sync.Once
+	wg        sync.WaitGroup
 
 	addReplyMatcher chan *replyMatcher
 	gotreply        chan reply
@@ -132,11 +132,11 @@ func ListenV4(c UDPConn, ln *enode.LocalNode, cfg Config) (*UDPv4, error) {
 	cfg = cfg.withDefaults()
 	closeCtx, cancel := context.WithCancel(context.Background())
 	t := &UDPv4{
-		conn:            c,
-		priv:            cfg.PrivateKey,
-		netrestrict:     cfg.NetRestrict,
-		localNode:       ln,
-		db:              ln.Database(),
+		conn:        c,
+		priv:        cfg.PrivateKey,
+		netrestrict: cfg.NetRestrict,
+		localNode:   ln,
+		//db:              ln.Database(),
 		gotreply:        make(chan reply),
 		addReplyMatcher: make(chan *replyMatcher),
 		closeCtx:        closeCtx,
@@ -144,7 +144,7 @@ func ListenV4(c UDPConn, ln *enode.LocalNode, cfg Config) (*UDPv4, error) {
 		log:             cfg.Log,
 	}
 
-	tab, err := newTable(t, ln.Database(), cfg.Bootnodes, t.log)
+	tab, err := newTable(t, cfg.Bootnodes, t.log)
 	if err != nil {
 		return nil, err
 	}
@@ -302,7 +302,7 @@ func (t *UDPv4) newLookup(ctx context.Context, targetKey encPubkey) *lookup {
 // findnode sends a findnode request to the given node and waits until
 // the node has sent up to k neighbors.
 func (t *UDPv4) findnode(toid enode.ID, toaddr *net.UDPAddr, target v4wire.Pubkey) ([]*node, error) {
-	t.ensureBond(toid, toaddr)
+	//t.ensureBond(toid, toaddr)
 
 	// Add a matcher for 'neighbours' replies to the pending reply queue. The matcher is
 	// active until enough nodes have been received.
@@ -340,7 +340,7 @@ func (t *UDPv4) findnode(toid enode.ID, toaddr *net.UDPAddr, target v4wire.Pubke
 // RequestENR sends enrRequest to the given node and waits for a response.
 func (t *UDPv4) RequestENR(n *enode.Node) (*enode.Node, error) {
 	addr := &net.UDPAddr{IP: n.IP(), Port: n.UDP()}
-	t.ensureBond(n.ID(), addr)
+	//t.ensureBond(n.ID(), addr)
 
 	req := &v4wire.ENRRequest{
 		Expiration: uint64(time.Now().Add(expiration).Unix()),
@@ -563,22 +563,22 @@ func (t *UDPv4) handlePacket(from *net.UDPAddr, buf []byte) error {
 	return err
 }
 
-// checkBond checks if the given node has a recent enough endpoint proof.
-func (t *UDPv4) checkBond(id enode.ID, ip net.IP) bool {
-	return time.Since(t.db.LastPongReceived(id, ip)) < bondExpiration
-}
-
-// ensureBond solicits a ping from a node if we haven't seen a ping from it for a while.
-// This ensures there is a valid endpoint proof on the remote end.
-func (t *UDPv4) ensureBond(toid enode.ID, toaddr *net.UDPAddr) {
-	tooOld := time.Since(t.db.LastPingReceived(toid, toaddr.IP)) > bondExpiration
-	if tooOld || t.db.FindFails(toid, toaddr.IP) > maxFindnodeFailures {
-		rm := t.sendPing(toid, toaddr, nil)
-		<-rm.errc
-		// Wait for them to ping back and process our pong.
-		time.Sleep(respTimeout)
-	}
-}
+//// checkBond checks if the given node has a recent enough endpoint proof.
+//func (t *UDPv4) checkBond(id enode.ID, ip net.IP) bool {
+//	return time.Since(t.db.LastPongReceived(id, ip)) < bondExpiration
+//}
+//
+//// ensureBond solicits a ping from a node if we haven't seen a ping from it for a while.
+//// This ensures there is a valid endpoint proof on the remote end.
+//func (t *UDPv4) ensureBond(toid enode.ID, toaddr *net.UDPAddr) {
+//	tooOld := time.Since(t.db.LastPingReceived(toid, toaddr.IP)) > bondExpiration
+//	if tooOld || t.db.FindFails(toid, toaddr.IP) > maxFindnodeFailures {
+//		rm := t.sendPing(toid, toaddr, nil)
+//		<-rm.errc
+//		// Wait for them to ping back and process our pong.
+//		time.Sleep(respTimeout)
+//	}
+//}
 
 func (t *UDPv4) nodeFromRPC(sender *net.UDPAddr, rn v4wire.Node) (*node, error) {
 	if rn.UDP <= 1024 {
@@ -673,16 +673,16 @@ func (t *UDPv4) handlePing(h *packetHandlerV4, from *net.UDPAddr, fromID enode.I
 
 	// Ping back if our last pong on file is too far in the past.
 	n := wrapNode(enode.NewV4(h.senderKey, from.IP, int(req.From.TCP), from.Port))
-	if time.Since(t.db.LastPongReceived(n.ID(), from.IP)) > bondExpiration {
-		t.sendPing(fromID, from, func() {
-			t.tab.addVerifiedNode(n)
-		})
-	} else {
-		t.tab.addVerifiedNode(n)
-	}
+	//if time.Since(t.db.LastPongReceived(n.ID(), from.IP)) > bondExpiration {
+	//	t.sendPing(fromID, from, func() {
+	//		t.tab.addVerifiedNode(n)
+	//	})
+	//} else {
+	t.tab.addVerifiedNode(n)
+	//}
 
 	// Update node database and endpoint predictor.
-	t.db.UpdateLastPingReceived(n.ID(), from.IP, time.Now())
+	//t.db.UpdateLastPingReceived(n.ID(), from.IP, time.Now())
 	t.localNode.UDPEndpointStatement(from, &net.UDPAddr{IP: req.To.IP, Port: int(req.To.UDP)})
 }
 
@@ -698,7 +698,7 @@ func (t *UDPv4) verifyPong(h *packetHandlerV4, from *net.UDPAddr, fromID enode.I
 		return errUnsolicitedReply
 	}
 	t.localNode.UDPEndpointStatement(from, &net.UDPAddr{IP: req.To.IP, Port: int(req.To.UDP)})
-	t.db.UpdateLastPongReceived(fromID, from.IP, time.Now())
+	//t.db.UpdateLastPongReceived(fromID, from.IP, time.Now())
 	return nil
 }
 
@@ -710,15 +710,15 @@ func (t *UDPv4) verifyFindnode(h *packetHandlerV4, from *net.UDPAddr, fromID eno
 	if v4wire.Expired(req.Expiration) {
 		return errExpired
 	}
-	if !t.checkBond(fromID, from.IP) {
-		// No endpoint proof pong exists, we don't process the packet. This prevents an
-		// attack vector where the discovery protocol could be used to amplify traffic in a
-		// DDOS attack. A malicious actor would send a findnode request with the IP address
-		// and UDP port of the target as the source address. The recipient of the findnode
-		// packet would then send a neighbors packet (which is a much bigger packet than
-		// findnode) to the victim.
-		return errUnknownNode
-	}
+	//if !t.checkBond(fromID, from.IP) {
+	//	// No endpoint proof pong exists, we don't process the packet. This prevents an
+	//	// attack vector where the discovery protocol could be used to amplify traffic in a
+	//	// DDOS attack. A malicious actor would send a findnode request with the IP address
+	//	// and UDP port of the target as the source address. The recipient of the findnode
+	//	// packet would then send a neighbors packet (which is a much bigger packet than
+	//	// findnode) to the victim.
+	//	return errUnknownNode
+	//}
 	return nil
 }
 
@@ -770,9 +770,9 @@ func (t *UDPv4) verifyENRRequest(h *packetHandlerV4, from *net.UDPAddr, fromID e
 	if v4wire.Expired(req.Expiration) {
 		return errExpired
 	}
-	if !t.checkBond(fromID, from.IP) {
-		return errUnknownNode
-	}
+	//if !t.checkBond(fromID, from.IP) {
+	//	return errUnknownNode
+	//}
 	return nil
 }
 

+ 13 - 13
p2p/discover/v5_udp.go

@@ -62,12 +62,12 @@ type codecV5 interface {
 // UDPv5 is the implementation of protocol version 5.
 type UDPv5 struct {
 	// static fields
-	conn         UDPConn
-	tab          *Table
-	netrestrict  *netutil.Netlist
-	priv         *ecdsa.PrivateKey
-	localNode    *enode.LocalNode
-	db           *enode.DB
+	conn        UDPConn
+	tab         *Table
+	netrestrict *netutil.Netlist
+	priv        *ecdsa.PrivateKey
+	localNode   *enode.LocalNode
+	//db           *enode.DB
 	log          log.Logger
 	clock        mclock.Clock
 	validSchemes enr.IdentityScheme
@@ -140,9 +140,9 @@ func newUDPv5(conn UDPConn, ln *enode.LocalNode, cfg Config) (*UDPv5, error) {
 	cfg = cfg.withDefaults()
 	t := &UDPv5{
 		// static fields
-		conn:         conn,
-		localNode:    ln,
-		db:           ln.Database(),
+		conn:      conn,
+		localNode: ln,
+		//db:           ln.Database(),
 		netrestrict:  cfg.NetRestrict,
 		priv:         cfg.PrivateKey,
 		log:          cfg.Log,
@@ -164,7 +164,7 @@ func newUDPv5(conn UDPConn, ln *enode.LocalNode, cfg Config) (*UDPv5, error) {
 		closeCtx:       closeCtx,
 		cancelCloseCtx: cancelCloseCtx,
 	}
-	tab, err := newTable(t, t.db, cfg.Bootnodes, cfg.Log)
+	tab, err := newTable(t, cfg.Bootnodes, cfg.Log)
 	if err != nil {
 		return nil, err
 	}
@@ -687,9 +687,9 @@ func (t *UDPv5) getNode(id enode.ID) *enode.Node {
 	if n := t.tab.getNode(id); n != nil {
 		return n
 	}
-	if n := t.localNode.Database().Node(id); n != nil {
-		return n
-	}
+	//if n := t.localNode.Database().Node(id); n != nil {
+	//	return n
+	//}
 	return nil
 }
 

+ 91 - 9
p2p/server.go

@@ -2,6 +2,7 @@ package p2p
 
 import (
 	"blockchain-go/common/gopool"
+	"blockchain-go/p2p/discover"
 	"blockchain-go/p2p/enode"
 	"blockchain-go/p2p/enr"
 	"blockchain-go/p2p/nat"
@@ -42,9 +43,9 @@ type Server struct {
 
 	//nodedb    *enode.DB
 	localnode *enode.LocalNode
-	//ntab      *discover.UDPv4
+	ntab      *discover.UDPv4
 	//DiscV5    *discover.UDPv5
-	//discmix *enode.FairMix
+	discmix *enode.FairMix
 	//dialsched *dialScheduler
 
 	// Channels into the run loop.
@@ -74,15 +75,80 @@ func (server *Server) Start() (err error) {
 		return err
 	}
 
-	// TODO 准备实现discover
 	if err := server.setupDiscovery(); err != nil {
 		return err
 	}
 
+	if err := server.setupDialScheduler(); err != nil {
+		return err
+	}
+
+	return nil
+}
+
+// 配置节点发现逻辑
+func (server *Server) setupDiscovery() (err error) {
+	server.discmix = enode.NewFairMix(discmixTimeout)
+
+	// 添加特定协议的发现源。
+	added := make(map[string]bool)
+	for _, proto := range server.Protocols {
+		if proto.DialCandidates != nil && !added[proto.Name] {
+			server.discmix.AddSource(proto.DialCandidates)
+			added[proto.Name] = true
+		}
+	}
+
+	addr, err := net.ResolveUDPAddr("udp", server.ListenAddr)
+	if err != nil {
+		return err
+	}
+	conn, err := net.ListenUDP("udp", addr)
+	if err != nil {
+		return err
+	}
+
+	realAddr := conn.LocalAddr().(*net.UDPAddr)
+	fmt.Printf("UDP listener up, addr: %v.", realAddr)
+
+	if server.NAT != nil {
+		if !realAddr.IP.IsLoopback() {
+			server.loopWG.Add(1)
+
+			gopool.Submit(func() {
+				nat.Map(server.NAT, server.quit, "udp", realAddr.Port, realAddr.Port, "ethereum discovery")
+				server.loopWG.Done()
+			})
+		}
+	}
+	server.localnode.SetFallbackUDP(realAddr.Port)
+
+	// 设置V4的发现协议
+	var unhandled chan discover.ReadPacket
+	//var sconn *sharedUDPConn
+	//if server.DiscoveryV5 {
+	//	unhandled = make(chan discover.ReadPacket, 100)
+	//	sconn = &sharedUDPConn{conn, unhandled}
+	//}
+	cfg := discover.Config{
+		PrivateKey: server.PrivateKey,
+		//NetRestrict: server.NetRestrict,
+		//Bootnodes:   server.BootstrapNodes,
+		Unhandled: unhandled,
+		//Log:         server.log,
+	}
+	ntab, err := discover.ListenV4(conn, server.localnode, cfg)
+	if err != nil {
+		return err
+	}
+	server.ntab = ntab
+	server.discmix.AddSource(ntab.RandomNodes())
+
 	return nil
 }
 
-func (server *Server) setupDiscover() (err error) {
+// 设置拨号调度器
+func (server *Server) setupDialScheduler() (err error) {
 	return nil
 }
 
@@ -265,12 +331,28 @@ func (server *Server) checkpoint(c *conn, stage chan<- *conn) error {
 	return <-c.cont
 }
 
-// 配置节点发现逻辑
-func (server *Server) setupDiscovery() (err error) {
-	return nil
+// sharedUDPConn implements a shared connection. Write sends messages to the underlying connection while read returns
+// messages that were found unprocessable and sent to the unhandled channel by the primary listener.
+type sharedUDPConn struct {
+	*net.UDPConn
+	unhandled chan discover.ReadPacket
 }
 
-// 设置拨号调度器
-func (server *Server) setupDialScheduler() (err error) {
+// ReadFromUDP implements discover.UDPConn
+func (s *sharedUDPConn) ReadFromUDP(b []byte) (n int, addr *net.UDPAddr, err error) {
+	packet, ok := <-s.unhandled
+	if !ok {
+		return 0, nil, errors.New("connection was closed")
+	}
+	l := len(packet.Data)
+	if l > len(b) {
+		l = len(b)
+	}
+	copy(b[:l], packet.Data[:l])
+	return l, packet.Addr, nil
+}
+
+// Close implements discover.UDPConn
+func (s *sharedUDPConn) Close() error {
 	return nil
 }