metrics.go 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233
  1. // Copyright 2015 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. // Contains the meters and timers used by the networking layer.
  17. package p2p
  18. import (
  19. "fmt"
  20. "net"
  21. "sync"
  22. "sync/atomic"
  23. "time"
  24. "github.com/ethereum/go-ethereum/p2p/enode"
  25. "github.com/ethereum/go-ethereum/event"
  26. "github.com/ethereum/go-ethereum/log"
  27. "github.com/ethereum/go-ethereum/metrics"
  28. )
  29. const (
  30. MetricsInboundConnects = "p2p/InboundConnects" // Name for the registered inbound connects meter
  31. MetricsInboundTraffic = "p2p/InboundTraffic" // Name for the registered inbound traffic meter
  32. MetricsOutboundConnects = "p2p/OutboundConnects" // Name for the registered outbound connects meter
  33. MetricsOutboundTraffic = "p2p/OutboundTraffic" // Name for the registered outbound traffic meter
  34. MeteredPeerLimit = 1024 // This amount of peers are individually metered
  35. )
  36. var (
  37. ingressConnectMeter = metrics.NewRegisteredMeter(MetricsInboundConnects, nil) // Meter counting the ingress connections
  38. ingressTrafficMeter = metrics.NewRegisteredMeter(MetricsInboundTraffic, nil) // Meter metering the cumulative ingress traffic
  39. egressConnectMeter = metrics.NewRegisteredMeter(MetricsOutboundConnects, nil) // Meter counting the egress connections
  40. egressTrafficMeter = metrics.NewRegisteredMeter(MetricsOutboundTraffic, nil) // Meter metering the cumulative egress traffic
  41. PeerIngressRegistry = metrics.NewPrefixedChildRegistry(metrics.EphemeralRegistry, MetricsInboundTraffic+"/") // Registry containing the peer ingress
  42. PeerEgressRegistry = metrics.NewPrefixedChildRegistry(metrics.EphemeralRegistry, MetricsOutboundTraffic+"/") // Registry containing the peer egress
  43. meteredPeerFeed event.Feed // Event feed for peer metrics
  44. meteredPeerCount int32 // Actually stored peer connection count
  45. )
  46. // MeteredPeerEventType is the type of peer events emitted by a metered connection.
  47. type MeteredPeerEventType int
  48. const (
  49. // PeerConnected is the type of event emitted when a peer successfully
  50. // made the handshake.
  51. PeerConnected MeteredPeerEventType = iota
  52. // PeerDisconnected is the type of event emitted when a peer disconnects.
  53. PeerDisconnected
  54. // PeerHandshakeFailed is the type of event emitted when a peer fails to
  55. // make the handshake or disconnects before the handshake.
  56. PeerHandshakeFailed
  57. )
  58. // MeteredPeerEvent is an event emitted when peers connect or disconnect.
  59. type MeteredPeerEvent struct {
  60. Type MeteredPeerEventType // Type of peer event
  61. IP net.IP // IP address of the peer
  62. ID enode.ID // NodeID of the peer
  63. Elapsed time.Duration // Time elapsed between the connection and the handshake/disconnection
  64. Ingress uint64 // Ingress count at the moment of the event
  65. Egress uint64 // Egress count at the moment of the event
  66. }
  67. // SubscribeMeteredPeerEvent registers a subscription for peer life-cycle events
  68. // if metrics collection is enabled.
  69. func SubscribeMeteredPeerEvent(ch chan<- MeteredPeerEvent) event.Subscription {
  70. return meteredPeerFeed.Subscribe(ch)
  71. }
  72. // meteredConn is a wrapper around a net.Conn that meters both the
  73. // inbound and outbound network traffic.
  74. type meteredConn struct {
  75. net.Conn // Network connection to wrap with metering
  76. connected time.Time // Connection time of the peer
  77. ip net.IP // IP address of the peer
  78. id enode.ID // NodeID of the peer
  79. // trafficMetered denotes if the peer is registered in the traffic registries.
  80. // Its value is true if the metered peer count doesn't reach the limit in the
  81. // moment of the peer's connection.
  82. trafficMetered bool
  83. ingressMeter metrics.Meter // Meter for the read bytes of the peer
  84. egressMeter metrics.Meter // Meter for the written bytes of the peer
  85. lock sync.RWMutex // Lock protecting the metered connection's internals
  86. }
  87. // newMeteredConn creates a new metered connection, bumps the ingress or egress
  88. // connection meter and also increases the metered peer count. If the metrics
  89. // system is disabled or the IP address is unspecified, this function returns
  90. // the original object.
  91. func newMeteredConn(conn net.Conn, ingress bool, ip net.IP) net.Conn {
  92. // Short circuit if metrics are disabled
  93. if !metrics.Enabled {
  94. return conn
  95. }
  96. if ip.IsUnspecified() {
  97. log.Warn("Peer IP is unspecified")
  98. return conn
  99. }
  100. // Bump the connection counters and wrap the connection
  101. if ingress {
  102. ingressConnectMeter.Mark(1)
  103. } else {
  104. egressConnectMeter.Mark(1)
  105. }
  106. return &meteredConn{
  107. Conn: conn,
  108. ip: ip,
  109. connected: time.Now(),
  110. }
  111. }
  112. // Read delegates a network read to the underlying connection, bumping the common
  113. // and the peer ingress traffic meters along the way.
  114. func (c *meteredConn) Read(b []byte) (n int, err error) {
  115. n, err = c.Conn.Read(b)
  116. ingressTrafficMeter.Mark(int64(n))
  117. c.lock.RLock()
  118. if c.trafficMetered {
  119. c.ingressMeter.Mark(int64(n))
  120. }
  121. c.lock.RUnlock()
  122. return n, err
  123. }
  124. // Write delegates a network write to the underlying connection, bumping the common
  125. // and the peer egress traffic meters along the way.
  126. func (c *meteredConn) Write(b []byte) (n int, err error) {
  127. n, err = c.Conn.Write(b)
  128. egressTrafficMeter.Mark(int64(n))
  129. c.lock.RLock()
  130. if c.trafficMetered {
  131. c.egressMeter.Mark(int64(n))
  132. }
  133. c.lock.RUnlock()
  134. return n, err
  135. }
  136. // handshakeDone is called when a peer handshake is done. Registers the peer to
  137. // the ingress and the egress traffic registries using the peer's IP and node ID,
  138. // also emits connect event.
  139. func (c *meteredConn) handshakeDone(id enode.ID) {
  140. // TODO (kurkomisi): use the node URL instead of the pure node ID. (the String() method of *Node)
  141. if atomic.AddInt32(&meteredPeerCount, 1) >= MeteredPeerLimit {
  142. // Don't register the peer in the traffic registries.
  143. atomic.AddInt32(&meteredPeerCount, -1)
  144. c.lock.Lock()
  145. c.id, c.trafficMetered = id, false
  146. c.lock.Unlock()
  147. log.Warn("Metered peer count reached the limit")
  148. } else {
  149. key := fmt.Sprintf("%s/%s", c.ip, id.String())
  150. c.lock.Lock()
  151. c.id, c.trafficMetered = id, true
  152. c.ingressMeter = metrics.NewRegisteredMeter(key, PeerIngressRegistry)
  153. c.egressMeter = metrics.NewRegisteredMeter(key, PeerEgressRegistry)
  154. c.lock.Unlock()
  155. }
  156. meteredPeerFeed.Send(MeteredPeerEvent{
  157. Type: PeerConnected,
  158. IP: c.ip,
  159. ID: id,
  160. Elapsed: time.Since(c.connected),
  161. })
  162. }
  163. // Close delegates a close operation to the underlying connection, unregisters
  164. // the peer from the traffic registries and emits close event.
  165. func (c *meteredConn) Close() error {
  166. err := c.Conn.Close()
  167. c.lock.RLock()
  168. if c.id == (enode.ID{}) {
  169. // If the peer disconnects before the handshake.
  170. c.lock.RUnlock()
  171. meteredPeerFeed.Send(MeteredPeerEvent{
  172. Type: PeerHandshakeFailed,
  173. IP: c.ip,
  174. Elapsed: time.Since(c.connected),
  175. })
  176. return err
  177. }
  178. id := c.id
  179. if !c.trafficMetered {
  180. // If the peer isn't registered in the traffic registries.
  181. c.lock.RUnlock()
  182. meteredPeerFeed.Send(MeteredPeerEvent{
  183. Type: PeerDisconnected,
  184. IP: c.ip,
  185. ID: id,
  186. })
  187. return err
  188. }
  189. ingress, egress := uint64(c.ingressMeter.Count()), uint64(c.egressMeter.Count())
  190. c.lock.RUnlock()
  191. // Decrement the metered peer count
  192. atomic.AddInt32(&meteredPeerCount, -1)
  193. // Unregister the peer from the traffic registries
  194. key := fmt.Sprintf("%s/%s", c.ip, id)
  195. PeerIngressRegistry.Unregister(key)
  196. PeerEgressRegistry.Unregister(key)
  197. meteredPeerFeed.Send(MeteredPeerEvent{
  198. Type: PeerDisconnected,
  199. IP: c.ip,
  200. ID: id,
  201. Ingress: ingress,
  202. Egress: egress,
  203. })
  204. return err
  205. }