meter.go 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297
  1. package metrics
  2. import (
  3. "sync"
  4. "sync/atomic"
  5. "time"
  6. )
  7. // Meters count events to produce exponentially-weighted moving average rates
  8. // at one-, five-, and fifteen-minutes and a mean rate.
  9. type Meter interface {
  10. Count() int64
  11. Mark(int64)
  12. Rate1() float64
  13. Rate5() float64
  14. Rate15() float64
  15. RateMean() float64
  16. Snapshot() Meter
  17. Stop()
  18. }
  19. // GetOrRegisterMeter returns an existing Meter or constructs and registers a
  20. // new StandardMeter.
  21. // Be sure to unregister the meter from the registry once it is of no use to
  22. // allow for garbage collection.
  23. func GetOrRegisterMeter(name string, r Registry) Meter {
  24. if nil == r {
  25. r = DefaultRegistry
  26. }
  27. return r.GetOrRegister(name, NewMeter).(Meter)
  28. }
  29. // GetOrRegisterMeterForced returns an existing Meter or constructs and registers a
  30. // new StandardMeter no matter the global switch is enabled or not.
  31. // Be sure to unregister the meter from the registry once it is of no use to
  32. // allow for garbage collection.
  33. func GetOrRegisterMeterForced(name string, r Registry) Meter {
  34. if nil == r {
  35. r = DefaultRegistry
  36. }
  37. return r.GetOrRegister(name, NewMeterForced).(Meter)
  38. }
  39. // NewMeter constructs a new StandardMeter and launches a goroutine.
  40. // Be sure to call Stop() once the meter is of no use to allow for garbage collection.
  41. func NewMeter() Meter {
  42. if !Enabled {
  43. return NilMeter{}
  44. }
  45. m := newStandardMeter()
  46. arbiter.Lock()
  47. defer arbiter.Unlock()
  48. arbiter.meters[m] = struct{}{}
  49. if !arbiter.started {
  50. arbiter.started = true
  51. go arbiter.tick()
  52. }
  53. return m
  54. }
  55. // NewMeterForced constructs a new StandardMeter and launches a goroutine no matter
  56. // the global switch is enabled or not.
  57. // Be sure to call Stop() once the meter is of no use to allow for garbage collection.
  58. func NewMeterForced() Meter {
  59. m := newStandardMeter()
  60. arbiter.Lock()
  61. defer arbiter.Unlock()
  62. arbiter.meters[m] = struct{}{}
  63. if !arbiter.started {
  64. arbiter.started = true
  65. go arbiter.tick()
  66. }
  67. return m
  68. }
  69. // NewRegisteredMeter constructs and registers a new StandardMeter
  70. // and launches a goroutine.
  71. // Be sure to unregister the meter from the registry once it is of no use to
  72. // allow for garbage collection.
  73. func NewRegisteredMeter(name string, r Registry) Meter {
  74. c := NewMeter()
  75. if nil == r {
  76. r = DefaultRegistry
  77. }
  78. r.Register(name, c)
  79. return c
  80. }
  81. // NewRegisteredMeterForced constructs and registers a new StandardMeter
  82. // and launches a goroutine no matter the global switch is enabled or not.
  83. // Be sure to unregister the meter from the registry once it is of no use to
  84. // allow for garbage collection.
  85. func NewRegisteredMeterForced(name string, r Registry) Meter {
  86. c := NewMeterForced()
  87. if nil == r {
  88. r = DefaultRegistry
  89. }
  90. r.Register(name, c)
  91. return c
  92. }
  93. // MeterSnapshot is a read-only copy of another Meter.
  94. type MeterSnapshot struct {
  95. count int64
  96. temp int64
  97. rate1, rate5, rate15, rateMean float64
  98. }
  99. // Count returns the count of events at the time the snapshot was taken.
  100. func (m *MeterSnapshot) Count() int64 { return m.count }
  101. // Mark panics.
  102. func (*MeterSnapshot) Mark(n int64) {
  103. panic("Mark called on a MeterSnapshot")
  104. }
  105. // Rate1 returns the one-minute moving average rate of events per second at the
  106. // time the snapshot was taken.
  107. func (m *MeterSnapshot) Rate1() float64 { return m.rate1 }
  108. // Rate5 returns the five-minute moving average rate of events per second at
  109. // the time the snapshot was taken.
  110. func (m *MeterSnapshot) Rate5() float64 { return m.rate5 }
  111. // Rate15 returns the fifteen-minute moving average rate of events per second
  112. // at the time the snapshot was taken.
  113. func (m *MeterSnapshot) Rate15() float64 { return m.rate15 }
  114. // RateMean returns the meter's mean rate of events per second at the time the
  115. // snapshot was taken.
  116. func (m *MeterSnapshot) RateMean() float64 { return m.rateMean }
  117. // Snapshot returns the snapshot.
  118. func (m *MeterSnapshot) Snapshot() Meter { return m }
  119. // Stop is a no-op.
  120. func (m *MeterSnapshot) Stop() {}
  121. // NilMeter is a no-op Meter.
  122. type NilMeter struct{}
  123. // Count is a no-op.
  124. func (NilMeter) Count() int64 { return 0 }
  125. // Mark is a no-op.
  126. func (NilMeter) Mark(n int64) {}
  127. // Rate1 is a no-op.
  128. func (NilMeter) Rate1() float64 { return 0.0 }
  129. // Rate5 is a no-op.
  130. func (NilMeter) Rate5() float64 { return 0.0 }
  131. // Rate15 is a no-op.
  132. func (NilMeter) Rate15() float64 { return 0.0 }
  133. // RateMean is a no-op.
  134. func (NilMeter) RateMean() float64 { return 0.0 }
  135. // Snapshot is a no-op.
  136. func (NilMeter) Snapshot() Meter { return NilMeter{} }
  137. // Stop is a no-op.
  138. func (NilMeter) Stop() {}
  139. // StandardMeter is the standard implementation of a Meter.
  140. type StandardMeter struct {
  141. lock sync.RWMutex
  142. snapshot *MeterSnapshot
  143. a1, a5, a15 EWMA
  144. startTime time.Time
  145. stopped uint32
  146. }
  147. func newStandardMeter() *StandardMeter {
  148. return &StandardMeter{
  149. snapshot: &MeterSnapshot{},
  150. a1: NewEWMA1(),
  151. a5: NewEWMA5(),
  152. a15: NewEWMA15(),
  153. startTime: time.Now(),
  154. }
  155. }
  156. // Stop stops the meter, Mark() will be a no-op if you use it after being stopped.
  157. func (m *StandardMeter) Stop() {
  158. stopped := atomic.SwapUint32(&m.stopped, 1)
  159. if stopped != 1 {
  160. arbiter.Lock()
  161. delete(arbiter.meters, m)
  162. arbiter.Unlock()
  163. }
  164. }
  165. // Count returns the number of events recorded.
  166. // It updates the meter to be as accurate as possible
  167. func (m *StandardMeter) Count() int64 {
  168. m.lock.Lock()
  169. defer m.lock.Unlock()
  170. m.updateMeter()
  171. return m.snapshot.count
  172. }
  173. // Mark records the occurrence of n events.
  174. func (m *StandardMeter) Mark(n int64) {
  175. atomic.AddInt64(&m.snapshot.temp, n)
  176. }
  177. // Rate1 returns the one-minute moving average rate of events per second.
  178. func (m *StandardMeter) Rate1() float64 {
  179. m.lock.RLock()
  180. defer m.lock.RUnlock()
  181. return m.snapshot.rate1
  182. }
  183. // Rate5 returns the five-minute moving average rate of events per second.
  184. func (m *StandardMeter) Rate5() float64 {
  185. m.lock.RLock()
  186. defer m.lock.RUnlock()
  187. return m.snapshot.rate5
  188. }
  189. // Rate15 returns the fifteen-minute moving average rate of events per second.
  190. func (m *StandardMeter) Rate15() float64 {
  191. m.lock.RLock()
  192. defer m.lock.RUnlock()
  193. return m.snapshot.rate15
  194. }
  195. // RateMean returns the meter's mean rate of events per second.
  196. func (m *StandardMeter) RateMean() float64 {
  197. m.lock.RLock()
  198. defer m.lock.RUnlock()
  199. return m.snapshot.rateMean
  200. }
  201. // Snapshot returns a read-only copy of the meter.
  202. func (m *StandardMeter) Snapshot() Meter {
  203. m.lock.RLock()
  204. snapshot := *m.snapshot
  205. m.lock.RUnlock()
  206. return &snapshot
  207. }
  208. func (m *StandardMeter) updateSnapshot() {
  209. // should run with write lock held on m.lock
  210. snapshot := m.snapshot
  211. snapshot.rate1 = m.a1.Rate()
  212. snapshot.rate5 = m.a5.Rate()
  213. snapshot.rate15 = m.a15.Rate()
  214. snapshot.rateMean = float64(snapshot.count) / time.Since(m.startTime).Seconds()
  215. }
  216. func (m *StandardMeter) updateMeter() {
  217. // should only run with write lock held on m.lock
  218. n := atomic.LoadInt64(&m.snapshot.temp)
  219. m.snapshot.count += n
  220. m.a1.Update(n)
  221. m.a5.Update(n)
  222. m.a15.Update(n)
  223. }
  224. func (m *StandardMeter) tick() {
  225. m.lock.Lock()
  226. defer m.lock.Unlock()
  227. m.updateMeter()
  228. m.a1.Tick()
  229. m.a5.Tick()
  230. m.a15.Tick()
  231. m.updateSnapshot()
  232. }
  233. // meterArbiter ticks meters every 5s from a single goroutine.
  234. // meters are references in a set for future stopping.
  235. type meterArbiter struct {
  236. sync.RWMutex
  237. started bool
  238. meters map[*StandardMeter]struct{}
  239. ticker *time.Ticker
  240. }
  241. var arbiter = meterArbiter{ticker: time.NewTicker(5 * time.Second), meters: make(map[*StandardMeter]struct{})}
  242. // Ticks meters on the scheduled interval
  243. func (ma *meterArbiter) tick() {
  244. for range ma.ticker.C {
  245. ma.tickMeters()
  246. }
  247. }
  248. func (ma *meterArbiter) tickMeters() {
  249. ma.RLock()
  250. defer ma.RUnlock()
  251. for meter := range ma.meters {
  252. meter.tick()
  253. }
  254. }