metrics.go 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232
  1. // Copyright 2015 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. // Contains the meters and timers used by the networking layer.
  17. package p2p
  18. import (
  19. "net"
  20. "sync"
  21. "sync/atomic"
  22. "time"
  23. "github.com/ethereum/go-ethereum/event"
  24. "github.com/ethereum/go-ethereum/log"
  25. "github.com/ethereum/go-ethereum/metrics"
  26. )
  27. const (
  28. MetricsInboundTraffic = "p2p/ingress" // Name for the registered inbound traffic meter
  29. MetricsOutboundTraffic = "p2p/egress" // Name for the registered outbound traffic meter
  30. MetricsOutboundConnects = "p2p/dials" // Name for the registered outbound connects meter
  31. MetricsInboundConnects = "p2p/serves" // Name for the registered inbound connects meter
  32. MeteredPeerLimit = 1024 // This amount of peers are individually metered
  33. )
  34. var (
  35. ingressConnectMeter = metrics.NewRegisteredMeter(MetricsInboundConnects, nil) // Meter counting the ingress connections
  36. ingressTrafficMeter = metrics.NewRegisteredMeter(MetricsInboundTraffic, nil) // Meter metering the cumulative ingress traffic
  37. egressConnectMeter = metrics.NewRegisteredMeter(MetricsOutboundConnects, nil) // Meter counting the egress connections
  38. egressTrafficMeter = metrics.NewRegisteredMeter(MetricsOutboundTraffic, nil) // Meter metering the cumulative egress traffic
  39. activePeerGauge = metrics.NewRegisteredGauge("p2p/peers", nil) // Gauge tracking the current peer count
  40. PeerIngressRegistry = metrics.NewPrefixedChildRegistry(metrics.EphemeralRegistry, MetricsInboundTraffic+"/") // Registry containing the peer ingress
  41. PeerEgressRegistry = metrics.NewPrefixedChildRegistry(metrics.EphemeralRegistry, MetricsOutboundTraffic+"/") // Registry containing the peer egress
  42. meteredPeerFeed event.Feed // Event feed for peer metrics
  43. meteredPeerCount int32 // Actually stored peer connection count
  44. )
  45. // MeteredPeerEventType is the type of peer events emitted by a metered connection.
  46. type MeteredPeerEventType int
  47. const (
  48. // PeerHandshakeSucceeded is the type of event
  49. // emitted when a peer successfully makes the handshake.
  50. PeerHandshakeSucceeded MeteredPeerEventType = iota
  51. // PeerHandshakeFailed is the type of event emitted when a peer fails to
  52. // make the handshake or disconnects before it.
  53. PeerHandshakeFailed
  54. // PeerDisconnected is the type of event emitted when a peer disconnects.
  55. PeerDisconnected
  56. )
  57. // MeteredPeerEvent is an event emitted when peers connect or disconnect.
  58. type MeteredPeerEvent struct {
  59. Type MeteredPeerEventType // Type of peer event
  60. Addr string // TCP address of the peer
  61. Elapsed time.Duration // Time elapsed between the connection and the handshake/disconnection
  62. Peer *Peer // Connected remote node instance
  63. Ingress uint64 // Ingress count at the moment of the event
  64. Egress uint64 // Egress count at the moment of the event
  65. }
  66. // SubscribeMeteredPeerEvent registers a subscription for peer life-cycle events
  67. // if metrics collection is enabled.
  68. func SubscribeMeteredPeerEvent(ch chan<- MeteredPeerEvent) event.Subscription {
  69. return meteredPeerFeed.Subscribe(ch)
  70. }
  71. // meteredConn is a wrapper around a net.Conn that meters both the
  72. // inbound and outbound network traffic.
  73. type meteredConn struct {
  74. net.Conn // Network connection to wrap with metering
  75. connected time.Time // Connection time of the peer
  76. addr *net.TCPAddr // TCP address of the peer
  77. peer *Peer // Peer instance
  78. // trafficMetered denotes if the peer is registered in the traffic registries.
  79. // Its value is true if the metered peer count doesn't reach the limit in the
  80. // moment of the peer's connection.
  81. trafficMetered bool
  82. ingressMeter metrics.Meter // Meter for the read bytes of the peer
  83. egressMeter metrics.Meter // Meter for the written bytes of the peer
  84. lock sync.RWMutex // Lock protecting the metered connection's internals
  85. }
  86. // newMeteredConn creates a new metered connection, bumps the ingress or egress
  87. // connection meter and also increases the metered peer count. If the metrics
  88. // system is disabled or the IP address is unspecified, this function returns
  89. // the original object.
  90. func newMeteredConn(conn net.Conn, ingress bool, addr *net.TCPAddr) net.Conn {
  91. // Short circuit if metrics are disabled
  92. if !metrics.Enabled {
  93. return conn
  94. }
  95. if addr == nil || addr.IP.IsUnspecified() {
  96. log.Warn("Peer address is unspecified")
  97. return conn
  98. }
  99. // Bump the connection counters and wrap the connection
  100. if ingress {
  101. ingressConnectMeter.Mark(1)
  102. } else {
  103. egressConnectMeter.Mark(1)
  104. }
  105. activePeerGauge.Inc(1)
  106. return &meteredConn{
  107. Conn: conn,
  108. addr: addr,
  109. connected: time.Now(),
  110. }
  111. }
  112. // Read delegates a network read to the underlying connection, bumping the common
  113. // and the peer ingress traffic meters along the way.
  114. func (c *meteredConn) Read(b []byte) (n int, err error) {
  115. n, err = c.Conn.Read(b)
  116. ingressTrafficMeter.Mark(int64(n))
  117. c.lock.RLock()
  118. if c.trafficMetered {
  119. c.ingressMeter.Mark(int64(n))
  120. }
  121. c.lock.RUnlock()
  122. return n, err
  123. }
  124. // Write delegates a network write to the underlying connection, bumping the common
  125. // and the peer egress traffic meters along the way.
  126. func (c *meteredConn) Write(b []byte) (n int, err error) {
  127. n, err = c.Conn.Write(b)
  128. egressTrafficMeter.Mark(int64(n))
  129. c.lock.RLock()
  130. if c.trafficMetered {
  131. c.egressMeter.Mark(int64(n))
  132. }
  133. c.lock.RUnlock()
  134. return n, err
  135. }
  136. // handshakeDone is called after the connection passes the handshake.
  137. func (c *meteredConn) handshakeDone(peer *Peer) {
  138. if atomic.AddInt32(&meteredPeerCount, 1) >= MeteredPeerLimit {
  139. // Don't register the peer in the traffic registries.
  140. atomic.AddInt32(&meteredPeerCount, -1)
  141. c.lock.Lock()
  142. c.peer, c.trafficMetered = peer, false
  143. c.lock.Unlock()
  144. log.Warn("Metered peer count reached the limit")
  145. } else {
  146. enode := peer.Node().String()
  147. c.lock.Lock()
  148. c.peer, c.trafficMetered = peer, true
  149. c.ingressMeter = metrics.NewRegisteredMeter(enode, PeerIngressRegistry)
  150. c.egressMeter = metrics.NewRegisteredMeter(enode, PeerEgressRegistry)
  151. c.lock.Unlock()
  152. }
  153. meteredPeerFeed.Send(MeteredPeerEvent{
  154. Type: PeerHandshakeSucceeded,
  155. Addr: c.addr.String(),
  156. Peer: peer,
  157. Elapsed: time.Since(c.connected),
  158. })
  159. }
  160. // Close delegates a close operation to the underlying connection, unregisters
  161. // the peer from the traffic registries and emits close event.
  162. func (c *meteredConn) Close() error {
  163. err := c.Conn.Close()
  164. c.lock.RLock()
  165. if c.peer == nil {
  166. // If the peer disconnects before/during the handshake.
  167. c.lock.RUnlock()
  168. meteredPeerFeed.Send(MeteredPeerEvent{
  169. Type: PeerHandshakeFailed,
  170. Addr: c.addr.String(),
  171. Elapsed: time.Since(c.connected),
  172. })
  173. activePeerGauge.Dec(1)
  174. return err
  175. }
  176. peer := c.peer
  177. if !c.trafficMetered {
  178. // If the peer isn't registered in the traffic registries.
  179. c.lock.RUnlock()
  180. meteredPeerFeed.Send(MeteredPeerEvent{
  181. Type: PeerDisconnected,
  182. Addr: c.addr.String(),
  183. Peer: peer,
  184. })
  185. activePeerGauge.Dec(1)
  186. return err
  187. }
  188. ingress, egress, enode := uint64(c.ingressMeter.Count()), uint64(c.egressMeter.Count()), c.peer.Node().String()
  189. c.lock.RUnlock()
  190. // Decrement the metered peer count
  191. atomic.AddInt32(&meteredPeerCount, -1)
  192. // Unregister the peer from the traffic registries
  193. PeerIngressRegistry.Unregister(enode)
  194. PeerEgressRegistry.Unregister(enode)
  195. meteredPeerFeed.Send(MeteredPeerEvent{
  196. Type: PeerDisconnected,
  197. Addr: c.addr.String(),
  198. Peer: peer,
  199. Ingress: ingress,
  200. Egress: egress,
  201. })
  202. activePeerGauge.Dec(1)
  203. return err
  204. }