| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568 |
- package p2p
- import (
- "crypto/ecdsa"
- "crypto/rand"
- "errors"
- "fmt"
- "net"
- "sync"
- "time"
- "github.com/ethereum/go-ethereum/logger"
- "github.com/ethereum/go-ethereum/logger/glog"
- "github.com/ethereum/go-ethereum/p2p/discover"
- "github.com/ethereum/go-ethereum/p2p/nat"
- )
- const (
- defaultDialTimeout = 15 * time.Second
- refreshPeersInterval = 30 * time.Second
- staticPeerCheckInterval = 15 * time.Second
- // Maximum number of concurrently handshaking inbound connections.
- maxAcceptConns = 50
- // Maximum number of concurrently dialing outbound connections.
- maxDialingConns = 10
- // total timeout for encryption handshake and protocol
- // handshake in both directions.
- handshakeTimeout = 5 * time.Second
- // maximum time allowed for reading a complete message.
- // this is effectively the amount of time a connection can be idle.
- frameReadTimeout = 1 * time.Minute
- // maximum amount of time allowed for writing a complete message.
- frameWriteTimeout = 5 * time.Second
- )
- var srvjslog = logger.NewJsonLogger()
- // Server manages all peer connections.
- //
- // The fields of Server are used as configuration parameters.
- // You should set them before starting the Server. Fields may not be
- // modified while the server is running.
- type Server struct {
- // This field must be set to a valid secp256k1 private key.
- PrivateKey *ecdsa.PrivateKey
- // MaxPeers is the maximum number of peers that can be
- // connected. It must be greater than zero.
- MaxPeers int
- // MaxPendingPeers is the maximum number of peers that can be pending in the
- // handshake phase, counted separately for inbound and outbound connections.
- // Zero defaults to preset values.
- MaxPendingPeers int
- // Name sets the node name of this server.
- // Use common.MakeName to create a name that follows existing conventions.
- Name string
- // Bootstrap nodes are used to establish connectivity
- // with the rest of the network.
- BootstrapNodes []*discover.Node
- // Static nodes are used as pre-configured connections which are always
- // maintained and re-connected on disconnects.
- StaticNodes []*discover.Node
- // Trusted nodes are used as pre-configured connections which are always
- // allowed to connect, even above the peer limit.
- TrustedNodes []*discover.Node
- // NodeDatabase is the path to the database containing the previously seen
- // live nodes in the network.
- NodeDatabase string
- // Protocols should contain the protocols supported
- // by the server. Matching protocols are launched for
- // each peer.
- Protocols []Protocol
- // If ListenAddr is set to a non-nil address, the server
- // will listen for incoming connections.
- //
- // If the port is zero, the operating system will pick a port. The
- // ListenAddr field will be updated with the actual address when
- // the server is started.
- ListenAddr string
- // If set to a non-nil value, the given NAT port mapper
- // is used to make the listening port available to the
- // Internet.
- NAT nat.Interface
- // If Dialer is set to a non-nil value, the given Dialer
- // is used to dial outbound peer connections.
- Dialer *net.Dialer
- // If NoDial is true, the server will not dial any peers.
- NoDial bool
- // Hooks for testing. These are useful because we can inhibit
- // the whole protocol stack.
- setupFunc
- newPeerHook
- ourHandshake *protoHandshake
- lock sync.RWMutex // protects running, peers and the trust fields
- running bool
- peers map[discover.NodeID]*Peer
- staticNodes map[discover.NodeID]*discover.Node // Map of currently maintained static remote nodes
- staticDial chan *discover.Node // Dial request channel reserved for the static nodes
- staticCycle time.Duration // Overrides staticPeerCheckInterval, used for testing
- trustedNodes map[discover.NodeID]bool // Set of currently trusted remote nodes
- ntab *discover.Table
- listener net.Listener
- quit chan struct{}
- loopWG sync.WaitGroup // {dial,listen,nat}Loop
- peerWG sync.WaitGroup // active peer goroutines
- }
- type setupFunc func(net.Conn, *ecdsa.PrivateKey, *protoHandshake, *discover.Node, func(discover.NodeID) bool) (*conn, error)
- type newPeerHook func(*Peer)
- // Peers returns all connected peers.
- func (srv *Server) Peers() (peers []*Peer) {
- srv.lock.RLock()
- defer srv.lock.RUnlock()
- for _, peer := range srv.peers {
- if peer != nil {
- peers = append(peers, peer)
- }
- }
- return
- }
- // PeerCount returns the number of connected peers.
- func (srv *Server) PeerCount() int {
- srv.lock.RLock()
- n := len(srv.peers)
- srv.lock.RUnlock()
- return n
- }
- // AddPeer connects to the given node and maintains the connection until the
- // server is shut down. If the connection fails for any reason, the server will
- // attempt to reconnect the peer.
- func (srv *Server) AddPeer(node *discover.Node) {
- srv.lock.Lock()
- defer srv.lock.Unlock()
- srv.staticNodes[node.ID] = node
- }
- // Start starts running the server.
- // Servers can be re-used and started again after stopping.
- func (srv *Server) Start() (err error) {
- srv.lock.Lock()
- defer srv.lock.Unlock()
- if srv.running {
- return errors.New("server already running")
- }
- glog.V(logger.Info).Infoln("Starting Server")
- // static fields
- if srv.PrivateKey == nil {
- return fmt.Errorf("Server.PrivateKey must be set to a non-nil key")
- }
- if srv.MaxPeers <= 0 {
- return fmt.Errorf("Server.MaxPeers must be > 0")
- }
- srv.quit = make(chan struct{})
- srv.peers = make(map[discover.NodeID]*Peer)
- // Create the current trust maps, and the associated dialing channel
- srv.trustedNodes = make(map[discover.NodeID]bool)
- for _, node := range srv.TrustedNodes {
- srv.trustedNodes[node.ID] = true
- }
- srv.staticNodes = make(map[discover.NodeID]*discover.Node)
- for _, node := range srv.StaticNodes {
- srv.staticNodes[node.ID] = node
- }
- srv.staticDial = make(chan *discover.Node)
- if srv.setupFunc == nil {
- srv.setupFunc = setupConn
- }
- // node table
- ntab, err := discover.ListenUDP(srv.PrivateKey, srv.ListenAddr, srv.NAT, srv.NodeDatabase)
- if err != nil {
- return err
- }
- srv.ntab = ntab
- // handshake
- srv.ourHandshake = &protoHandshake{Version: baseProtocolVersion, Name: srv.Name, ID: ntab.Self().ID}
- for _, p := range srv.Protocols {
- srv.ourHandshake.Caps = append(srv.ourHandshake.Caps, p.cap())
- }
- // listen/dial
- if srv.ListenAddr != "" {
- if err := srv.startListening(); err != nil {
- return err
- }
- }
- if srv.Dialer == nil {
- srv.Dialer = &net.Dialer{Timeout: defaultDialTimeout}
- }
- if !srv.NoDial {
- srv.loopWG.Add(1)
- go srv.dialLoop()
- }
- if srv.NoDial && srv.ListenAddr == "" {
- glog.V(logger.Warn).Infoln("I will be kind-of useless, neither dialing nor listening.")
- }
- // maintain the static peers
- go srv.staticNodesLoop()
- srv.running = true
- return nil
- }
- func (srv *Server) startListening() error {
- listener, err := net.Listen("tcp", srv.ListenAddr)
- if err != nil {
- return err
- }
- laddr := listener.Addr().(*net.TCPAddr)
- srv.ListenAddr = laddr.String()
- srv.listener = listener
- srv.loopWG.Add(1)
- go srv.listenLoop()
- if !laddr.IP.IsLoopback() && srv.NAT != nil {
- srv.loopWG.Add(1)
- go func() {
- nat.Map(srv.NAT, srv.quit, "tcp", laddr.Port, laddr.Port, "ethereum p2p")
- srv.loopWG.Done()
- }()
- }
- return nil
- }
- // Stop terminates the server and all active peer connections.
- // It blocks until all active connections have been closed.
- func (srv *Server) Stop() {
- srv.lock.Lock()
- if !srv.running {
- srv.lock.Unlock()
- return
- }
- srv.running = false
- srv.lock.Unlock()
- glog.V(logger.Info).Infoln("Stopping Server")
- srv.ntab.Close()
- if srv.listener != nil {
- // this unblocks listener Accept
- srv.listener.Close()
- }
- close(srv.quit)
- srv.loopWG.Wait()
- // No new peers can be added at this point because dialLoop and
- // listenLoop are down. It is safe to call peerWG.Wait because
- // peerWG.Add is not called outside of those loops.
- srv.lock.Lock()
- for _, peer := range srv.peers {
- peer.Disconnect(DiscQuitting)
- }
- srv.lock.Unlock()
- srv.peerWG.Wait()
- }
- // Self returns the local node's endpoint information.
- func (srv *Server) Self() *discover.Node {
- srv.lock.RLock()
- defer srv.lock.RUnlock()
- if !srv.running {
- return &discover.Node{IP: net.ParseIP("0.0.0.0")}
- }
- return srv.ntab.Self()
- }
- // main loop for adding connections via listening
- func (srv *Server) listenLoop() {
- defer srv.loopWG.Done()
- // This channel acts as a semaphore limiting
- // active inbound connections that are lingering pre-handshake.
- // If all slots are taken, no further connections are accepted.
- tokens := maxAcceptConns
- if srv.MaxPendingPeers > 0 {
- tokens = srv.MaxPendingPeers
- }
- slots := make(chan struct{}, tokens)
- for i := 0; i < tokens; i++ {
- slots <- struct{}{}
- }
- glog.V(logger.Info).Infoln("Listening on", srv.listener.Addr())
- for {
- <-slots
- conn, err := srv.listener.Accept()
- if err != nil {
- return
- }
- glog.V(logger.Debug).Infof("Accepted conn %v\n", conn.RemoteAddr())
- srv.peerWG.Add(1)
- go func() {
- srv.startPeer(conn, nil)
- slots <- struct{}{}
- }()
- }
- }
- // staticNodesLoop is responsible for periodically checking that static
- // connections are actually live, and requests dialing if not.
- func (srv *Server) staticNodesLoop() {
- // Create a default maintenance ticker, but override it requested
- cycle := staticPeerCheckInterval
- if srv.staticCycle != 0 {
- cycle = srv.staticCycle
- }
- tick := time.NewTicker(cycle)
- for {
- select {
- case <-srv.quit:
- return
- case <-tick.C:
- // Collect all the non-connected static nodes
- needed := []*discover.Node{}
- srv.lock.RLock()
- for id, node := range srv.staticNodes {
- if _, ok := srv.peers[id]; !ok {
- needed = append(needed, node)
- }
- }
- srv.lock.RUnlock()
- // Try to dial each of them (don't hang if server terminates)
- for _, node := range needed {
- glog.V(logger.Debug).Infof("Dialing static peer %v", node)
- select {
- case srv.staticDial <- node:
- case <-srv.quit:
- return
- }
- }
- }
- }
- }
- func (srv *Server) dialLoop() {
- var (
- dialed = make(chan *discover.Node)
- dialing = make(map[discover.NodeID]bool)
- findresults = make(chan []*discover.Node)
- refresh = time.NewTimer(0)
- )
- defer srv.loopWG.Done()
- defer refresh.Stop()
- // Limit the number of concurrent dials
- tokens := maxDialingConns
- if srv.MaxPendingPeers > 0 {
- tokens = srv.MaxPendingPeers
- }
- slots := make(chan struct{}, tokens)
- for i := 0; i < tokens; i++ {
- slots <- struct{}{}
- }
- dial := func(dest *discover.Node) {
- // Don't dial nodes that would fail the checks in addPeer.
- // This is important because the connection handshake is a lot
- // of work and we'd rather avoid doing that work for peers
- // that can't be added.
- srv.lock.RLock()
- ok, _ := srv.checkPeer(dest.ID)
- srv.lock.RUnlock()
- if !ok || dialing[dest.ID] {
- return
- }
- // Request a dial slot to prevent CPU exhaustion
- <-slots
- dialing[dest.ID] = true
- srv.peerWG.Add(1)
- go func() {
- srv.dialNode(dest)
- slots <- struct{}{}
- dialed <- dest
- }()
- }
- srv.ntab.Bootstrap(srv.BootstrapNodes)
- for {
- select {
- case <-refresh.C:
- // Grab some nodes to connect to if we're not at capacity.
- srv.lock.RLock()
- needpeers := len(srv.peers) < srv.MaxPeers/2
- srv.lock.RUnlock()
- if needpeers {
- go func() {
- var target discover.NodeID
- rand.Read(target[:])
- findresults <- srv.ntab.Lookup(target)
- }()
- } else {
- // Make sure we check again if the peer count falls
- // below MaxPeers.
- refresh.Reset(refreshPeersInterval)
- }
- case dest := <-srv.staticDial:
- dial(dest)
- case dests := <-findresults:
- for _, dest := range dests {
- dial(dest)
- }
- refresh.Reset(refreshPeersInterval)
- case dest := <-dialed:
- delete(dialing, dest.ID)
- if len(dialing) == 0 {
- // Check again immediately after dialing all current candidates.
- refresh.Reset(0)
- }
- case <-srv.quit:
- // TODO: maybe wait for active dials
- return
- }
- }
- }
- func (srv *Server) dialNode(dest *discover.Node) {
- addr := &net.TCPAddr{IP: dest.IP, Port: int(dest.TCP)}
- glog.V(logger.Debug).Infof("Dialing %v\n", dest)
- conn, err := srv.Dialer.Dial("tcp", addr.String())
- if err != nil {
- // dialLoop adds to the wait group counter when launching
- // dialNode, so we need to count it down again. startPeer also
- // does that when an error occurs.
- srv.peerWG.Done()
- glog.V(logger.Detail).Infof("dial error: %v", err)
- return
- }
- srv.startPeer(conn, dest)
- }
- func (srv *Server) startPeer(fd net.Conn, dest *discover.Node) {
- // TODO: handle/store session token
- // Run setupFunc, which should create an authenticated connection
- // and run the capability exchange. Note that any early error
- // returns during that exchange need to call peerWG.Done because
- // the callers of startPeer added the peer to the wait group already.
- fd.SetDeadline(time.Now().Add(handshakeTimeout))
- conn, err := srv.setupFunc(fd, srv.PrivateKey, srv.ourHandshake, dest, srv.keepconn)
- if err != nil {
- fd.Close()
- glog.V(logger.Debug).Infof("Handshake with %v failed: %v", fd.RemoteAddr(), err)
- srv.peerWG.Done()
- return
- }
- conn.MsgReadWriter = &netWrapper{
- wrapped: conn.MsgReadWriter,
- conn: fd, rtimeout: frameReadTimeout, wtimeout: frameWriteTimeout,
- }
- p := newPeer(fd, conn, srv.Protocols)
- if ok, reason := srv.addPeer(conn, p); !ok {
- glog.V(logger.Detail).Infof("Not adding %v (%v)\n", p, reason)
- p.politeDisconnect(reason)
- srv.peerWG.Done()
- return
- }
- // The handshakes are done and it passed all checks.
- // Spawn the Peer loops.
- go srv.runPeer(p)
- }
- // preflight checks whether a connection should be kept. it runs
- // after the encryption handshake, as soon as the remote identity is
- // known.
- func (srv *Server) keepconn(id discover.NodeID) bool {
- srv.lock.RLock()
- defer srv.lock.RUnlock()
- if _, ok := srv.staticNodes[id]; ok {
- return true // static nodes are always allowed
- }
- if _, ok := srv.trustedNodes[id]; ok {
- return true // trusted nodes are always allowed
- }
- return len(srv.peers) < srv.MaxPeers
- }
- func (srv *Server) runPeer(p *Peer) {
- glog.V(logger.Debug).Infof("Added %v\n", p)
- srvjslog.LogJson(&logger.P2PConnected{
- RemoteId: p.ID().String(),
- RemoteAddress: p.RemoteAddr().String(),
- RemoteVersionString: p.Name(),
- NumConnections: srv.PeerCount(),
- })
- if srv.newPeerHook != nil {
- srv.newPeerHook(p)
- }
- discreason := p.run()
- srv.removePeer(p)
- glog.V(logger.Debug).Infof("Removed %v (%v)\n", p, discreason)
- srvjslog.LogJson(&logger.P2PDisconnected{
- RemoteId: p.ID().String(),
- NumConnections: srv.PeerCount(),
- })
- }
- func (srv *Server) addPeer(conn *conn, p *Peer) (bool, DiscReason) {
- // drop connections with no matching protocols.
- if len(srv.Protocols) > 0 && countMatchingProtocols(srv.Protocols, conn.protoHandshake.Caps) == 0 {
- return false, DiscUselessPeer
- }
- // add the peer if it passes the other checks.
- srv.lock.Lock()
- defer srv.lock.Unlock()
- if ok, reason := srv.checkPeer(conn.ID); !ok {
- return false, reason
- }
- srv.peers[conn.ID] = p
- return true, 0
- }
- // checkPeer verifies whether a peer looks promising and should be allowed/kept
- // in the pool, or if it's of no use.
- func (srv *Server) checkPeer(id discover.NodeID) (bool, DiscReason) {
- // First up, figure out if the peer is static or trusted
- _, static := srv.staticNodes[id]
- trusted := srv.trustedNodes[id]
- // Make sure the peer passes all required checks
- switch {
- case !srv.running:
- return false, DiscQuitting
- case !static && !trusted && len(srv.peers) >= srv.MaxPeers:
- return false, DiscTooManyPeers
- case srv.peers[id] != nil:
- return false, DiscAlreadyConnected
- case id == srv.ntab.Self().ID:
- return false, DiscSelf
- default:
- return true, 0
- }
- }
- func (srv *Server) removePeer(p *Peer) {
- srv.lock.Lock()
- delete(srv.peers, p.ID())
- srv.lock.Unlock()
- srv.peerWG.Done()
- }
|