meter.go 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301
  1. package metrics
  2. import (
  3. "sync"
  4. "sync/atomic"
  5. "time"
  6. )
  7. // Meters count events to produce exponentially-weighted moving average rates
  8. // at one-, five-, and fifteen-minutes and a mean rate.
  9. type Meter interface {
  10. Count() int64
  11. Mark(int64)
  12. Rate1() float64
  13. Rate5() float64
  14. Rate15() float64
  15. RateMean() float64
  16. Snapshot() Meter
  17. Stop()
  18. }
  19. // GetOrRegisterMeter returns an existing Meter or constructs and registers a
  20. // new StandardMeter.
  21. // Be sure to unregister the meter from the registry once it is of no use to
  22. // allow for garbage collection.
  23. func GetOrRegisterMeter(name string, r Registry) Meter {
  24. if nil == r {
  25. r = DefaultRegistry
  26. }
  27. return r.GetOrRegister(name, NewMeter).(Meter)
  28. }
  29. // GetOrRegisterMeterForced returns an existing Meter or constructs and registers a
  30. // new StandardMeter no matter the global switch is enabled or not.
  31. // Be sure to unregister the meter from the registry once it is of no use to
  32. // allow for garbage collection.
  33. func GetOrRegisterMeterForced(name string, r Registry) Meter {
  34. if nil == r {
  35. r = DefaultRegistry
  36. }
  37. return r.GetOrRegister(name, NewMeterForced).(Meter)
  38. }
  39. // NewMeter constructs a new StandardMeter and launches a goroutine.
  40. // Be sure to call Stop() once the meter is of no use to allow for garbage collection.
  41. func NewMeter() Meter {
  42. if !Enabled {
  43. return NilMeter{}
  44. }
  45. m := newStandardMeter()
  46. arbiter.Lock()
  47. defer arbiter.Unlock()
  48. arbiter.meters[m] = struct{}{}
  49. if !arbiter.started {
  50. arbiter.started = true
  51. go arbiter.tick()
  52. }
  53. return m
  54. }
  55. // NewMeterForced constructs a new StandardMeter and launches a goroutine no matter
  56. // the global switch is enabled or not.
  57. // Be sure to call Stop() once the meter is of no use to allow for garbage collection.
  58. func NewMeterForced() Meter {
  59. m := newStandardMeter()
  60. arbiter.Lock()
  61. defer arbiter.Unlock()
  62. arbiter.meters[m] = struct{}{}
  63. if !arbiter.started {
  64. arbiter.started = true
  65. go arbiter.tick()
  66. }
  67. return m
  68. }
  69. // NewRegisteredMeter constructs and registers a new StandardMeter
  70. // and launches a goroutine.
  71. // Be sure to unregister the meter from the registry once it is of no use to
  72. // allow for garbage collection.
  73. func NewRegisteredMeter(name string, r Registry) Meter {
  74. c := NewMeter()
  75. if nil == r {
  76. r = DefaultRegistry
  77. }
  78. r.Register(name, c)
  79. return c
  80. }
  81. // NewRegisteredMeterForced constructs and registers a new StandardMeter
  82. // and launches a goroutine no matter the global switch is enabled or not.
  83. // Be sure to unregister the meter from the registry once it is of no use to
  84. // allow for garbage collection.
  85. func NewRegisteredMeterForced(name string, r Registry) Meter {
  86. c := NewMeterForced()
  87. if nil == r {
  88. r = DefaultRegistry
  89. }
  90. r.Register(name, c)
  91. return c
  92. }
  93. // MeterSnapshot is a read-only copy of another Meter.
  94. type MeterSnapshot struct {
  95. // WARNING: The `temp` field is accessed atomically.
  96. // On 32 bit platforms, only 64-bit aligned fields can be atomic. The struct is
  97. // guaranteed to be so aligned, so take advantage of that. For more information,
  98. // see https://golang.org/pkg/sync/atomic/#pkg-note-BUG.
  99. temp int64
  100. count int64
  101. rate1, rate5, rate15, rateMean float64
  102. }
  103. // Count returns the count of events at the time the snapshot was taken.
  104. func (m *MeterSnapshot) Count() int64 { return m.count }
  105. // Mark panics.
  106. func (*MeterSnapshot) Mark(n int64) {
  107. panic("Mark called on a MeterSnapshot")
  108. }
  109. // Rate1 returns the one-minute moving average rate of events per second at the
  110. // time the snapshot was taken.
  111. func (m *MeterSnapshot) Rate1() float64 { return m.rate1 }
  112. // Rate5 returns the five-minute moving average rate of events per second at
  113. // the time the snapshot was taken.
  114. func (m *MeterSnapshot) Rate5() float64 { return m.rate5 }
  115. // Rate15 returns the fifteen-minute moving average rate of events per second
  116. // at the time the snapshot was taken.
  117. func (m *MeterSnapshot) Rate15() float64 { return m.rate15 }
  118. // RateMean returns the meter's mean rate of events per second at the time the
  119. // snapshot was taken.
  120. func (m *MeterSnapshot) RateMean() float64 { return m.rateMean }
  121. // Snapshot returns the snapshot.
  122. func (m *MeterSnapshot) Snapshot() Meter { return m }
  123. // Stop is a no-op.
  124. func (m *MeterSnapshot) Stop() {}
  125. // NilMeter is a no-op Meter.
  126. type NilMeter struct{}
  127. // Count is a no-op.
  128. func (NilMeter) Count() int64 { return 0 }
  129. // Mark is a no-op.
  130. func (NilMeter) Mark(n int64) {}
  131. // Rate1 is a no-op.
  132. func (NilMeter) Rate1() float64 { return 0.0 }
  133. // Rate5 is a no-op.
  134. func (NilMeter) Rate5() float64 { return 0.0 }
  135. // Rate15 is a no-op.
  136. func (NilMeter) Rate15() float64 { return 0.0 }
  137. // RateMean is a no-op.
  138. func (NilMeter) RateMean() float64 { return 0.0 }
  139. // Snapshot is a no-op.
  140. func (NilMeter) Snapshot() Meter { return NilMeter{} }
  141. // Stop is a no-op.
  142. func (NilMeter) Stop() {}
  143. // StandardMeter is the standard implementation of a Meter.
  144. type StandardMeter struct {
  145. lock sync.RWMutex
  146. snapshot *MeterSnapshot
  147. a1, a5, a15 EWMA
  148. startTime time.Time
  149. stopped uint32
  150. }
  151. func newStandardMeter() *StandardMeter {
  152. return &StandardMeter{
  153. snapshot: &MeterSnapshot{},
  154. a1: NewEWMA1(),
  155. a5: NewEWMA5(),
  156. a15: NewEWMA15(),
  157. startTime: time.Now(),
  158. }
  159. }
  160. // Stop stops the meter, Mark() will be a no-op if you use it after being stopped.
  161. func (m *StandardMeter) Stop() {
  162. stopped := atomic.SwapUint32(&m.stopped, 1)
  163. if stopped != 1 {
  164. arbiter.Lock()
  165. delete(arbiter.meters, m)
  166. arbiter.Unlock()
  167. }
  168. }
  169. // Count returns the number of events recorded.
  170. // It updates the meter to be as accurate as possible
  171. func (m *StandardMeter) Count() int64 {
  172. m.lock.Lock()
  173. defer m.lock.Unlock()
  174. m.updateMeter()
  175. return m.snapshot.count
  176. }
  177. // Mark records the occurrence of n events.
  178. func (m *StandardMeter) Mark(n int64) {
  179. atomic.AddInt64(&m.snapshot.temp, n)
  180. }
  181. // Rate1 returns the one-minute moving average rate of events per second.
  182. func (m *StandardMeter) Rate1() float64 {
  183. m.lock.RLock()
  184. defer m.lock.RUnlock()
  185. return m.snapshot.rate1
  186. }
  187. // Rate5 returns the five-minute moving average rate of events per second.
  188. func (m *StandardMeter) Rate5() float64 {
  189. m.lock.RLock()
  190. defer m.lock.RUnlock()
  191. return m.snapshot.rate5
  192. }
  193. // Rate15 returns the fifteen-minute moving average rate of events per second.
  194. func (m *StandardMeter) Rate15() float64 {
  195. m.lock.RLock()
  196. defer m.lock.RUnlock()
  197. return m.snapshot.rate15
  198. }
  199. // RateMean returns the meter's mean rate of events per second.
  200. func (m *StandardMeter) RateMean() float64 {
  201. m.lock.RLock()
  202. defer m.lock.RUnlock()
  203. return m.snapshot.rateMean
  204. }
  205. // Snapshot returns a read-only copy of the meter.
  206. func (m *StandardMeter) Snapshot() Meter {
  207. m.lock.RLock()
  208. snapshot := *m.snapshot
  209. m.lock.RUnlock()
  210. return &snapshot
  211. }
  212. func (m *StandardMeter) updateSnapshot() {
  213. // should run with write lock held on m.lock
  214. snapshot := m.snapshot
  215. snapshot.rate1 = m.a1.Rate()
  216. snapshot.rate5 = m.a5.Rate()
  217. snapshot.rate15 = m.a15.Rate()
  218. snapshot.rateMean = float64(snapshot.count) / time.Since(m.startTime).Seconds()
  219. }
  220. func (m *StandardMeter) updateMeter() {
  221. // should only run with write lock held on m.lock
  222. n := atomic.SwapInt64(&m.snapshot.temp, 0)
  223. m.snapshot.count += n
  224. m.a1.Update(n)
  225. m.a5.Update(n)
  226. m.a15.Update(n)
  227. }
  228. func (m *StandardMeter) tick() {
  229. m.lock.Lock()
  230. defer m.lock.Unlock()
  231. m.updateMeter()
  232. m.a1.Tick()
  233. m.a5.Tick()
  234. m.a15.Tick()
  235. m.updateSnapshot()
  236. }
  237. // meterArbiter ticks meters every 5s from a single goroutine.
  238. // meters are references in a set for future stopping.
  239. type meterArbiter struct {
  240. sync.RWMutex
  241. started bool
  242. meters map[*StandardMeter]struct{}
  243. ticker *time.Ticker
  244. }
  245. var arbiter = meterArbiter{ticker: time.NewTicker(5 * time.Second), meters: make(map[*StandardMeter]struct{})}
  246. // Ticks meters on the scheduled interval
  247. func (ma *meterArbiter) tick() {
  248. for range ma.ticker.C {
  249. ma.tickMeters()
  250. }
  251. }
  252. func (ma *meterArbiter) tickMeters() {
  253. ma.RLock()
  254. defer ma.RUnlock()
  255. for meter := range ma.meters {
  256. meter.tick()
  257. }
  258. }