Browse Source

p2p: meter peer traffic, emit metered peer events (#17695)

This change extends the peer metrics collection:

- traces the life-cycle of the peers
- meters the peer traffic separately for every peer
- creates event feed for the peer events
- emits the peer events
Kurkó Mihály 7 years ago
parent
commit
16e4d0e005
3 changed files with 187 additions and 18 deletions
  1. 1 1
      p2p/dial.go
  2. 178 16
      p2p/metrics.go
  3. 8 1
      p2p/server.go

+ 1 - 1
p2p/dial.go

@@ -350,7 +350,7 @@ func (t *dialTask) dial(srv *Server, dest *enode.Node) error {
 	if err != nil {
 		return &dialError{err}
 	}
-	mfd := newMeteredConn(fd, false)
+	mfd := newMeteredConn(fd, false, dest.IP())
 	return srv.SetupConn(mfd, t.flags, dest)
 }
 

+ 178 - 16
p2p/metrics.go

@@ -19,53 +19,215 @@
 package p2p
 
 import (
+	"fmt"
 	"net"
+	"sync"
+	"sync/atomic"
+	"time"
 
+	"github.com/ethereum/go-ethereum/p2p/enode"
+
+	"github.com/ethereum/go-ethereum/event"
+	"github.com/ethereum/go-ethereum/log"
 	"github.com/ethereum/go-ethereum/metrics"
 )
 
+const (
+	MetricsInboundConnects  = "p2p/InboundConnects"  // Name for the registered inbound connects meter
+	MetricsInboundTraffic   = "p2p/InboundTraffic"   // Name for the registered inbound traffic meter
+	MetricsOutboundConnects = "p2p/OutboundConnects" // Name for the registered outbound connects meter
+	MetricsOutboundTraffic  = "p2p/OutboundTraffic"  // Name for the registered outbound traffic meter
+
+	MeteredPeerLimit = 1024 // This amount of peers are individually metered
+)
+
 var (
-	ingressConnectMeter = metrics.NewRegisteredMeter("p2p/InboundConnects", nil)
-	ingressTrafficMeter = metrics.NewRegisteredMeter("p2p/InboundTraffic", nil)
-	egressConnectMeter  = metrics.NewRegisteredMeter("p2p/OutboundConnects", nil)
-	egressTrafficMeter  = metrics.NewRegisteredMeter("p2p/OutboundTraffic", nil)
+	ingressConnectMeter = metrics.NewRegisteredMeter(MetricsInboundConnects, nil)  // Meter counting the ingress connections
+	ingressTrafficMeter = metrics.NewRegisteredMeter(MetricsInboundTraffic, nil)   // Meter metering the cumulative ingress traffic
+	egressConnectMeter  = metrics.NewRegisteredMeter(MetricsOutboundConnects, nil) // Meter counting the egress connections
+	egressTrafficMeter  = metrics.NewRegisteredMeter(MetricsOutboundTraffic, nil)  // Meter metering the cumulative egress traffic
+
+	PeerIngressRegistry = metrics.NewPrefixedChildRegistry(metrics.DefaultRegistry, MetricsInboundTraffic+"/")  // Registry containing the peer ingress
+	PeerEgressRegistry  = metrics.NewPrefixedChildRegistry(metrics.DefaultRegistry, MetricsOutboundTraffic+"/") // Registry containing the peer egress
+
+	meteredPeerFeed  event.Feed // Event feed for peer metrics
+	meteredPeerCount int32      // Actually stored peer connection count
 )
 
+// MeteredPeerEventType is the type of peer events emitted by a metered connection.
+type MeteredPeerEventType int
+
+const (
+	// PeerConnected is the type of event emitted when a peer successfully
+	// made the handshake.
+	PeerConnected MeteredPeerEventType = iota
+
+	// PeerDisconnected is the type of event emitted when a peer disconnects.
+	PeerDisconnected
+
+	// PeerHandshakeFailed is the type of event emitted when a peer fails to
+	// make the handshake or disconnects before the handshake.
+	PeerHandshakeFailed
+)
+
+// MeteredPeerEvent is an event emitted when peers connect or disconnect.
+type MeteredPeerEvent struct {
+	Type    MeteredPeerEventType // Type of peer event
+	IP      net.IP               // IP address of the peer
+	ID      string               // NodeID of the peer
+	Elapsed time.Duration        // Time elapsed between the connection and the handshake/disconnection
+	Ingress uint64               // Ingress count at the moment of the event
+	Egress  uint64               // Egress count at the moment of the event
+}
+
+// SubscribeMeteredPeerEvent registers a subscription for peer life-cycle events
+// if metrics collection is enabled.
+func SubscribeMeteredPeerEvent(ch chan<- MeteredPeerEvent) event.Subscription {
+	return meteredPeerFeed.Subscribe(ch)
+}
+
 // meteredConn is a wrapper around a net.Conn that meters both the
 // inbound and outbound network traffic.
 type meteredConn struct {
 	net.Conn // Network connection to wrap with metering
+
+	connected time.Time // Connection time of the peer
+	ip        net.IP    // IP address of the peer
+	id        string    // NodeID of the peer
+
+	// trafficMetered denotes if the peer is registered in the traffic registries.
+	// Its value is true if the metered peer count doesn't reach the limit in the
+	// moment of the peer's connection.
+	trafficMetered bool
+	ingressMeter   metrics.Meter // Meter for the read bytes of the peer
+	egressMeter    metrics.Meter // Meter for the written bytes of the peer
+
+	lock sync.RWMutex // Lock protecting the metered connection's internals
 }
 
-// newMeteredConn creates a new metered connection, also bumping the ingress or
-// egress connection meter. If the metrics system is disabled, this function
-// returns the original object.
-func newMeteredConn(conn net.Conn, ingress bool) net.Conn {
+// newMeteredConn creates a new metered connection, bumps the ingress or egress
+// connection meter and also increases the metered peer count. If the metrics
+// system is disabled or the IP address is unspecified, this function returns
+// the original object.
+func newMeteredConn(conn net.Conn, ingress bool, ip net.IP) net.Conn {
 	// Short circuit if metrics are disabled
 	if !metrics.Enabled {
 		return conn
 	}
-	// Otherwise bump the connection counters and wrap the connection
+	if ip.IsUnspecified() {
+		log.Warn("Peer IP is unspecified")
+		return conn
+	}
+	// Bump the connection counters and wrap the connection
 	if ingress {
 		ingressConnectMeter.Mark(1)
 	} else {
 		egressConnectMeter.Mark(1)
 	}
-	return &meteredConn{Conn: conn}
+	return &meteredConn{
+		Conn:      conn,
+		ip:        ip,
+		connected: time.Now(),
+	}
 }
 
-// Read delegates a network read to the underlying connection, bumping the ingress
-// traffic meter along the way.
+// Read delegates a network read to the underlying connection, bumping the common
+// and the peer ingress traffic meters along the way.
 func (c *meteredConn) Read(b []byte) (n int, err error) {
 	n, err = c.Conn.Read(b)
 	ingressTrafficMeter.Mark(int64(n))
-	return
+	c.lock.RLock()
+	if c.trafficMetered {
+		c.ingressMeter.Mark(int64(n))
+	}
+	c.lock.RUnlock()
+	return n, err
 }
 
-// Write delegates a network write to the underlying connection, bumping the
-// egress traffic meter along the way.
+// Write delegates a network write to the underlying connection, bumping the common
+// and the peer egress traffic meters along the way.
 func (c *meteredConn) Write(b []byte) (n int, err error) {
 	n, err = c.Conn.Write(b)
 	egressTrafficMeter.Mark(int64(n))
-	return
+	c.lock.RLock()
+	if c.trafficMetered {
+		c.egressMeter.Mark(int64(n))
+	}
+	c.lock.RUnlock()
+	return n, err
+}
+
+// handshakeDone is called when a peer handshake is done. Registers the peer to
+// the ingress and the egress traffic registries using the peer's IP and node ID,
+// also emits connect event.
+func (c *meteredConn) handshakeDone(nodeID enode.ID) {
+	id := nodeID.String()
+	if atomic.AddInt32(&meteredPeerCount, 1) >= MeteredPeerLimit {
+		// Don't register the peer in the traffic registries.
+		atomic.AddInt32(&meteredPeerCount, -1)
+		c.lock.Lock()
+		c.id, c.trafficMetered = id, false
+		c.lock.Unlock()
+		log.Warn("Metered peer count reached the limit")
+	} else {
+		key := fmt.Sprintf("%s/%s", c.ip, id)
+		c.lock.Lock()
+		c.id, c.trafficMetered = id, true
+		c.ingressMeter = metrics.NewRegisteredMeter(key, PeerIngressRegistry)
+		c.egressMeter = metrics.NewRegisteredMeter(key, PeerEgressRegistry)
+		c.lock.Unlock()
+	}
+	meteredPeerFeed.Send(MeteredPeerEvent{
+		Type:    PeerConnected,
+		IP:      c.ip,
+		ID:      id,
+		Elapsed: time.Since(c.connected),
+	})
+}
+
+// Close delegates a close operation to the underlying connection, unregisters
+// the peer from the traffic registries and emits close event.
+func (c *meteredConn) Close() error {
+	err := c.Conn.Close()
+	c.lock.RLock()
+	if c.id == "" {
+		// If the peer disconnects before the handshake.
+		c.lock.RUnlock()
+		meteredPeerFeed.Send(MeteredPeerEvent{
+			Type:    PeerHandshakeFailed,
+			IP:      c.ip,
+			Elapsed: time.Since(c.connected),
+		})
+		return err
+	}
+	id := c.id
+	if !c.trafficMetered {
+		// If the peer isn't registered in the traffic registries.
+		c.lock.RUnlock()
+		meteredPeerFeed.Send(MeteredPeerEvent{
+			Type: PeerDisconnected,
+			IP:   c.ip,
+			ID:   id,
+		})
+		return err
+	}
+	ingress, egress := uint64(c.ingressMeter.Count()), uint64(c.egressMeter.Count())
+	c.lock.RUnlock()
+
+	// Decrement the metered peer count
+	atomic.AddInt32(&meteredPeerCount, -1)
+
+	// Unregister the peer from the traffic registries
+	key := fmt.Sprintf("%s/%s", c.ip, id)
+	PeerIngressRegistry.Unregister(key)
+	PeerEgressRegistry.Unregister(key)
+
+	meteredPeerFeed.Send(MeteredPeerEvent{
+		Type:    PeerDisconnected,
+		IP:      c.ip,
+		ID:      id,
+		Ingress: ingress,
+		Egress:  egress,
+	})
+	return err
 }

+ 8 - 1
p2p/server.go

@@ -864,7 +864,11 @@ func (srv *Server) listenLoop() {
 			}
 		}
 
-		fd = newMeteredConn(fd, true)
+		var ip net.IP
+		if tcp, ok := fd.RemoteAddr().(*net.TCPAddr); ok {
+			ip = tcp.IP
+		}
+		fd = newMeteredConn(fd, true, ip)
 		srv.log.Trace("Accepted connection", "addr", fd.RemoteAddr())
 		go func() {
 			srv.SetupConn(fd, inboundConn, nil)
@@ -917,6 +921,9 @@ func (srv *Server) setupConn(c *conn, flags connFlag, dialDest *enode.Node) erro
 	} else {
 		c.node = nodeFromConn(remotePubkey, c.fd)
 	}
+	if conn, ok := c.fd.(*meteredConn); ok {
+		conn.handshakeDone(c.node.ID())
+	}
 	clog := srv.log.New("id", c.node.ID(), "addr", c.fd.RemoteAddr(), "conn", c.flags)
 	err = srv.checkpoint(c, srv.posthandshake)
 	if err != nil {