Procházet zdrojové kódy

p2p: improve peer selection logic

This commit introduces a new (temporary) peer selection
strategy based on random lookups.

While we're here, also implement the TODOs in dialLoop.
Felix Lange před 10 roky
rodič
revize
22d1f0faf1
1 změnil soubory, kde provedl 61 přidání a 56 odebrání
  1. 61 56
      p2p/server.go

+ 61 - 56
p2p/server.go

@@ -3,6 +3,7 @@ package p2p
 import (
 	"bytes"
 	"crypto/ecdsa"
+	"crypto/rand"
 	"errors"
 	"fmt"
 	"net"
@@ -276,46 +277,58 @@ func (srv *Server) listenLoop() {
 }
 
 func (srv *Server) dialLoop() {
+	var (
+		dialed      = make(chan *discover.Node)
+		dialing     = make(map[discover.NodeID]bool)
+		findresults = make(chan []*discover.Node)
+		refresh     = time.NewTimer(0)
+	)
 	defer srv.loopWG.Done()
-	refresh := time.NewTicker(refreshPeersInterval)
 	defer refresh.Stop()
 
-	srv.ntab.Bootstrap(srv.BootstrapNodes)
-	go srv.findPeers()
-
-	dialed := make(chan *discover.Node)
-	dialing := make(map[discover.NodeID]bool)
-
-	// TODO: limit number of active dials
-	// TODO: ensure only one findPeers goroutine is running
-	// TODO: pause findPeers when we're at capacity
+	// TODO: maybe limit number of active dials
+	dial := func(dest *discover.Node) {
+		srv.lock.Lock()
+		ok, _ := srv.checkPeer(dest.ID)
+		srv.lock.Unlock()
+		// Don't dial nodes that would fail the checks in addPeer.
+		// This is important because the connection handshake is a lot
+		// of work and we'd rather avoid doing that work for peers
+		// that can't be added.
+		if !ok || dialing[dest.ID] {
+			return
+		}
+		dialing[dest.ID] = true
+		srv.peerWG.Add(1)
+		go func() {
+			srv.dialNode(dest)
+			dialed <- dest
+		}()
+	}
 
+	srv.ntab.Bootstrap(srv.BootstrapNodes)
 	for {
 		select {
 		case <-refresh.C:
-
-			go srv.findPeers()
-
-		case dest := <-srv.peerConnect:
-			// avoid dialing nodes that are already connected.
-			// there is another check for this in addPeer,
-			// which runs after the handshake.
 			srv.lock.Lock()
-			_, isconnected := srv.peers[dest.ID]
+			needpeers := len(srv.peers) < srv.MaxPeers
 			srv.lock.Unlock()
-			if isconnected || dialing[dest.ID] || dest.ID == srv.Self().ID {
-				continue
+			if needpeers {
+				go func() {
+					var target discover.NodeID
+					rand.Read(target[:])
+					findresults <- srv.ntab.Lookup(target)
+				}()
+				refresh.Stop()
 			}
 
-			dialing[dest.ID] = true
-			srv.peerWG.Add(1)
-			go func() {
-				srv.dialNode(dest)
-				// at this point, the peer has been added
-				// or discarded. either way, we're not dialing it anymore.
-				dialed <- dest
-			}()
-
+		case dest := <-srv.peerConnect:
+			dial(dest)
+		case dests := <-findresults:
+			for _, dest := range dests {
+				dial(dest)
+			}
+			refresh.Reset(refreshPeersInterval)
 		case dest := <-dialed:
 			delete(dialing, dest.ID)
 
@@ -341,24 +354,6 @@ func (srv *Server) Self() *discover.Node {
 	return srv.ntab.Self()
 }
 
-func (srv *Server) findPeers() {
-	far := srv.Self().ID
-	for i := range far {
-		far[i] = ^far[i]
-	}
-	closeToSelf := srv.ntab.Lookup(srv.Self().ID)
-	farFromSelf := srv.ntab.Lookup(far)
-
-	for i := 0; i < len(closeToSelf) || i < len(farFromSelf); i++ {
-		if i < len(closeToSelf) {
-			srv.peerConnect <- closeToSelf[i]
-		}
-		if i < len(farFromSelf) {
-			srv.peerConnect <- farFromSelf[i]
-		}
-	}
-}
-
 func (srv *Server) startPeer(fd net.Conn, dest *discover.Node) {
 	// TODO: handle/store session token
 	fd.SetDeadline(time.Now().Add(handshakeTimeout))
@@ -368,7 +363,6 @@ func (srv *Server) startPeer(fd net.Conn, dest *discover.Node) {
 		glog.V(logger.Debug).Infof("Handshake with %v failed: %v", fd.RemoteAddr(), err)
 		return
 	}
-
 	conn.MsgReadWriter = &netWrapper{
 		wrapped: conn.MsgReadWriter,
 		conn:    fd, rtimeout: frameReadTimeout, wtimeout: frameWriteTimeout,
@@ -379,24 +373,27 @@ func (srv *Server) startPeer(fd net.Conn, dest *discover.Node) {
 		p.politeDisconnect(reason)
 		return
 	}
+	// The handshakes are done and it passed all checks.
+	// Spawn the Peer loops.
+	go srv.runPeer(p)
+}
 
+func (srv *Server) runPeer(p *Peer) {
 	glog.V(logger.Debug).Infof("Added %v\n", p)
 	srvjslog.LogJson(&logger.P2PConnected{
-		RemoteId:            fmt.Sprintf("%x", conn.ID[:]),
-		RemoteAddress:       fd.RemoteAddr().String(),
-		RemoteVersionString: conn.Name,
+		RemoteId:            p.ID().String(),
+		RemoteAddress:       p.RemoteAddr().String(),
+		RemoteVersionString: p.Name(),
 		NumConnections:      srv.PeerCount(),
 	})
-
 	if srv.newPeerHook != nil {
 		srv.newPeerHook(p)
 	}
 	discreason := p.run()
 	srv.removePeer(p)
-
 	glog.V(logger.Debug).Infof("Removed %v (%v)\n", p, discreason)
 	srvjslog.LogJson(&logger.P2PDisconnected{
-		RemoteId:       fmt.Sprintf("%x", conn.ID[:]),
+		RemoteId:       p.ID().String(),
 		NumConnections: srv.PeerCount(),
 	})
 }
@@ -404,6 +401,14 @@ func (srv *Server) startPeer(fd net.Conn, dest *discover.Node) {
 func (srv *Server) addPeer(id discover.NodeID, p *Peer) (bool, DiscReason) {
 	srv.lock.Lock()
 	defer srv.lock.Unlock()
+	if ok, reason := srv.checkPeer(id); !ok {
+		return false, reason
+	}
+	srv.peers[id] = p
+	return true, 0
+}
+
+func (srv *Server) checkPeer(id discover.NodeID) (bool, DiscReason) {
 	switch {
 	case !srv.running:
 		return false, DiscQuitting
@@ -413,9 +418,9 @@ func (srv *Server) addPeer(id discover.NodeID, p *Peer) (bool, DiscReason) {
 		return false, DiscAlreadyConnected
 	case id == srv.Self().ID:
 		return false, DiscSelf
+	default:
+		return true, 0
 	}
-	srv.peers[id] = p
-	return true, 0
 }
 
 func (srv *Server) removePeer(p *Peer) {