metrics.go 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238
  1. // Copyright 2015 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. // Contains the meters and timers used by the networking layer.
  17. package p2p
  18. import (
  19. "fmt"
  20. "net"
  21. "sync"
  22. "sync/atomic"
  23. "time"
  24. "github.com/ethereum/go-ethereum/event"
  25. "github.com/ethereum/go-ethereum/log"
  26. "github.com/ethereum/go-ethereum/metrics"
  27. "github.com/ethereum/go-ethereum/p2p/enode"
  28. )
  29. const (
  30. MetricsInboundTraffic = "p2p/ingress" // Name for the registered inbound traffic meter
  31. MetricsOutboundTraffic = "p2p/egress" // Name for the registered outbound traffic meter
  32. MetricsOutboundConnects = "p2p/dials" // Name for the registered outbound connects meter
  33. MetricsInboundConnects = "p2p/serves" // Name for the registered inbound connects meter
  34. MeteredPeerLimit = 1024 // This amount of peers are individually metered
  35. )
  36. var (
  37. ingressConnectMeter = metrics.NewRegisteredMeter(MetricsInboundConnects, nil) // Meter counting the ingress connections
  38. ingressTrafficMeter = metrics.NewRegisteredMeter(MetricsInboundTraffic, nil) // Meter metering the cumulative ingress traffic
  39. egressConnectMeter = metrics.NewRegisteredMeter(MetricsOutboundConnects, nil) // Meter counting the egress connections
  40. egressTrafficMeter = metrics.NewRegisteredMeter(MetricsOutboundTraffic, nil) // Meter metering the cumulative egress traffic
  41. activePeerGauge = metrics.NewRegisteredGauge("p2p/peers", nil) // Gauge tracking the current peer count
  42. PeerIngressRegistry = metrics.NewPrefixedChildRegistry(metrics.EphemeralRegistry, MetricsInboundTraffic+"/") // Registry containing the peer ingress
  43. PeerEgressRegistry = metrics.NewPrefixedChildRegistry(metrics.EphemeralRegistry, MetricsOutboundTraffic+"/") // Registry containing the peer egress
  44. meteredPeerFeed event.Feed // Event feed for peer metrics
  45. meteredPeerCount int32 // Actually stored peer connection count
  46. )
  47. // MeteredPeerEventType is the type of peer events emitted by a metered connection.
  48. type MeteredPeerEventType int
  49. const (
  50. // PeerConnected is the type of event emitted when a peer successfully
  51. // made the handshake.
  52. PeerConnected MeteredPeerEventType = iota
  53. // PeerDisconnected is the type of event emitted when a peer disconnects.
  54. PeerDisconnected
  55. // PeerHandshakeFailed is the type of event emitted when a peer fails to
  56. // make the handshake or disconnects before the handshake.
  57. PeerHandshakeFailed
  58. )
  59. // MeteredPeerEvent is an event emitted when peers connect or disconnect.
  60. type MeteredPeerEvent struct {
  61. Type MeteredPeerEventType // Type of peer event
  62. IP net.IP // IP address of the peer
  63. ID enode.ID // NodeID of the peer
  64. Elapsed time.Duration // Time elapsed between the connection and the handshake/disconnection
  65. Ingress uint64 // Ingress count at the moment of the event
  66. Egress uint64 // Egress count at the moment of the event
  67. }
  68. // SubscribeMeteredPeerEvent registers a subscription for peer life-cycle events
  69. // if metrics collection is enabled.
  70. func SubscribeMeteredPeerEvent(ch chan<- MeteredPeerEvent) event.Subscription {
  71. return meteredPeerFeed.Subscribe(ch)
  72. }
  73. // meteredConn is a wrapper around a net.Conn that meters both the
  74. // inbound and outbound network traffic.
  75. type meteredConn struct {
  76. net.Conn // Network connection to wrap with metering
  77. connected time.Time // Connection time of the peer
  78. ip net.IP // IP address of the peer
  79. id enode.ID // NodeID of the peer
  80. // trafficMetered denotes if the peer is registered in the traffic registries.
  81. // Its value is true if the metered peer count doesn't reach the limit in the
  82. // moment of the peer's connection.
  83. trafficMetered bool
  84. ingressMeter metrics.Meter // Meter for the read bytes of the peer
  85. egressMeter metrics.Meter // Meter for the written bytes of the peer
  86. lock sync.RWMutex // Lock protecting the metered connection's internals
  87. }
  88. // newMeteredConn creates a new metered connection, bumps the ingress or egress
  89. // connection meter and also increases the metered peer count. If the metrics
  90. // system is disabled or the IP address is unspecified, this function returns
  91. // the original object.
  92. func newMeteredConn(conn net.Conn, ingress bool, ip net.IP) net.Conn {
  93. // Short circuit if metrics are disabled
  94. if !metrics.Enabled {
  95. return conn
  96. }
  97. if ip.IsUnspecified() {
  98. log.Warn("Peer IP is unspecified")
  99. return conn
  100. }
  101. // Bump the connection counters and wrap the connection
  102. if ingress {
  103. ingressConnectMeter.Mark(1)
  104. } else {
  105. egressConnectMeter.Mark(1)
  106. }
  107. activePeerGauge.Inc(1)
  108. return &meteredConn{
  109. Conn: conn,
  110. ip: ip,
  111. connected: time.Now(),
  112. }
  113. }
  114. // Read delegates a network read to the underlying connection, bumping the common
  115. // and the peer ingress traffic meters along the way.
  116. func (c *meteredConn) Read(b []byte) (n int, err error) {
  117. n, err = c.Conn.Read(b)
  118. ingressTrafficMeter.Mark(int64(n))
  119. c.lock.RLock()
  120. if c.trafficMetered {
  121. c.ingressMeter.Mark(int64(n))
  122. }
  123. c.lock.RUnlock()
  124. return n, err
  125. }
  126. // Write delegates a network write to the underlying connection, bumping the common
  127. // and the peer egress traffic meters along the way.
  128. func (c *meteredConn) Write(b []byte) (n int, err error) {
  129. n, err = c.Conn.Write(b)
  130. egressTrafficMeter.Mark(int64(n))
  131. c.lock.RLock()
  132. if c.trafficMetered {
  133. c.egressMeter.Mark(int64(n))
  134. }
  135. c.lock.RUnlock()
  136. return n, err
  137. }
  138. // handshakeDone is called when a peer handshake is done. Registers the peer to
  139. // the ingress and the egress traffic registries using the peer's IP and node ID,
  140. // also emits connect event.
  141. func (c *meteredConn) handshakeDone(id enode.ID) {
  142. // TODO (kurkomisi): use the node URL instead of the pure node ID. (the String() method of *Node)
  143. if atomic.AddInt32(&meteredPeerCount, 1) >= MeteredPeerLimit {
  144. // Don't register the peer in the traffic registries.
  145. atomic.AddInt32(&meteredPeerCount, -1)
  146. c.lock.Lock()
  147. c.id, c.trafficMetered = id, false
  148. c.lock.Unlock()
  149. log.Warn("Metered peer count reached the limit")
  150. } else {
  151. key := fmt.Sprintf("%s/%s", c.ip, id.String())
  152. c.lock.Lock()
  153. c.id, c.trafficMetered = id, true
  154. c.ingressMeter = metrics.NewRegisteredMeter(key, PeerIngressRegistry)
  155. c.egressMeter = metrics.NewRegisteredMeter(key, PeerEgressRegistry)
  156. c.lock.Unlock()
  157. }
  158. meteredPeerFeed.Send(MeteredPeerEvent{
  159. Type: PeerConnected,
  160. IP: c.ip,
  161. ID: id,
  162. Elapsed: time.Since(c.connected),
  163. })
  164. }
  165. // Close delegates a close operation to the underlying connection, unregisters
  166. // the peer from the traffic registries and emits close event.
  167. func (c *meteredConn) Close() error {
  168. err := c.Conn.Close()
  169. c.lock.RLock()
  170. if c.id == (enode.ID{}) {
  171. // If the peer disconnects before the handshake.
  172. c.lock.RUnlock()
  173. meteredPeerFeed.Send(MeteredPeerEvent{
  174. Type: PeerHandshakeFailed,
  175. IP: c.ip,
  176. Elapsed: time.Since(c.connected),
  177. })
  178. activePeerGauge.Dec(1)
  179. return err
  180. }
  181. id := c.id
  182. if !c.trafficMetered {
  183. // If the peer isn't registered in the traffic registries.
  184. c.lock.RUnlock()
  185. meteredPeerFeed.Send(MeteredPeerEvent{
  186. Type: PeerDisconnected,
  187. IP: c.ip,
  188. ID: id,
  189. })
  190. activePeerGauge.Dec(1)
  191. return err
  192. }
  193. ingress, egress := uint64(c.ingressMeter.Count()), uint64(c.egressMeter.Count())
  194. c.lock.RUnlock()
  195. // Decrement the metered peer count
  196. atomic.AddInt32(&meteredPeerCount, -1)
  197. // Unregister the peer from the traffic registries
  198. key := fmt.Sprintf("%s/%s", c.ip, id)
  199. PeerIngressRegistry.Unregister(key)
  200. PeerEgressRegistry.Unregister(key)
  201. meteredPeerFeed.Send(MeteredPeerEvent{
  202. Type: PeerDisconnected,
  203. IP: c.ip,
  204. ID: id,
  205. Ingress: ingress,
  206. Egress: egress,
  207. })
  208. activePeerGauge.Dec(1)
  209. return err
  210. }