| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337 |
- // Copyright (c) 2017 Uber Technologies, Inc.
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- package metrics
- import (
- "sort"
- "sync"
- "sync/atomic"
- "time"
- "github.com/codahale/hdrhistogram"
- )
- // This is intentionally very similar to github.com/codahale/metrics, the
- // main difference being that counters/gauges are scoped to the provider
- // rather than being global (to facilitate testing).
- // A LocalBackend is a metrics provider which aggregates data in-vm, and
- // allows exporting snapshots to shove the data into a remote collector
- type LocalBackend struct {
- cm sync.Mutex
- gm sync.Mutex
- tm sync.Mutex
- counters map[string]*int64
- gauges map[string]*int64
- timers map[string]*localBackendTimer
- stop chan struct{}
- wg sync.WaitGroup
- TagsSep string
- TagKVSep string
- }
- // NewLocalBackend returns a new LocalBackend. The collectionInterval is the histogram
- // time window for each timer.
- func NewLocalBackend(collectionInterval time.Duration) *LocalBackend {
- b := &LocalBackend{
- counters: make(map[string]*int64),
- gauges: make(map[string]*int64),
- timers: make(map[string]*localBackendTimer),
- stop: make(chan struct{}),
- TagsSep: "|",
- TagKVSep: "=",
- }
- if collectionInterval == 0 {
- // Use one histogram time window for all timers
- return b
- }
- b.wg.Add(1)
- go b.runLoop(collectionInterval)
- return b
- }
- // Clear discards accumulated stats
- func (b *LocalBackend) Clear() {
- b.cm.Lock()
- defer b.cm.Unlock()
- b.gm.Lock()
- defer b.gm.Unlock()
- b.tm.Lock()
- defer b.tm.Unlock()
- b.counters = make(map[string]*int64)
- b.gauges = make(map[string]*int64)
- b.timers = make(map[string]*localBackendTimer)
- }
- func (b *LocalBackend) runLoop(collectionInterval time.Duration) {
- defer b.wg.Done()
- ticker := time.NewTicker(collectionInterval)
- for {
- select {
- case <-ticker.C:
- b.tm.Lock()
- timers := make(map[string]*localBackendTimer, len(b.timers))
- for timerName, timer := range b.timers {
- timers[timerName] = timer
- }
- b.tm.Unlock()
- for _, t := range timers {
- t.Lock()
- t.hist.Rotate()
- t.Unlock()
- }
- case <-b.stop:
- ticker.Stop()
- return
- }
- }
- }
- // IncCounter increments a counter value
- func (b *LocalBackend) IncCounter(name string, tags map[string]string, delta int64) {
- name = GetKey(name, tags, b.TagsSep, b.TagKVSep)
- b.cm.Lock()
- defer b.cm.Unlock()
- counter := b.counters[name]
- if counter == nil {
- b.counters[name] = new(int64)
- *b.counters[name] = delta
- return
- }
- atomic.AddInt64(counter, delta)
- }
- // UpdateGauge updates the value of a gauge
- func (b *LocalBackend) UpdateGauge(name string, tags map[string]string, value int64) {
- name = GetKey(name, tags, b.TagsSep, b.TagKVSep)
- b.gm.Lock()
- defer b.gm.Unlock()
- gauge := b.gauges[name]
- if gauge == nil {
- b.gauges[name] = new(int64)
- *b.gauges[name] = value
- return
- }
- atomic.StoreInt64(gauge, value)
- }
- // RecordTimer records a timing duration
- func (b *LocalBackend) RecordTimer(name string, tags map[string]string, d time.Duration) {
- name = GetKey(name, tags, b.TagsSep, b.TagKVSep)
- timer := b.findOrCreateTimer(name)
- timer.Lock()
- timer.hist.Current.RecordValue(int64(d / time.Millisecond))
- timer.Unlock()
- }
- func (b *LocalBackend) findOrCreateTimer(name string) *localBackendTimer {
- b.tm.Lock()
- defer b.tm.Unlock()
- if t, ok := b.timers[name]; ok {
- return t
- }
- t := &localBackendTimer{
- hist: hdrhistogram.NewWindowed(5, 0, int64((5*time.Minute)/time.Millisecond), 1),
- }
- b.timers[name] = t
- return t
- }
- type localBackendTimer struct {
- sync.Mutex
- hist *hdrhistogram.WindowedHistogram
- }
- var (
- percentiles = map[string]float64{
- "P50": 50,
- "P75": 75,
- "P90": 90,
- "P95": 95,
- "P99": 99,
- "P999": 99.9,
- }
- )
- // Snapshot captures a snapshot of the current counter and gauge values
- func (b *LocalBackend) Snapshot() (counters, gauges map[string]int64) {
- b.cm.Lock()
- defer b.cm.Unlock()
- counters = make(map[string]int64, len(b.counters))
- for name, value := range b.counters {
- counters[name] = atomic.LoadInt64(value)
- }
- b.gm.Lock()
- defer b.gm.Unlock()
- gauges = make(map[string]int64, len(b.gauges))
- for name, value := range b.gauges {
- gauges[name] = atomic.LoadInt64(value)
- }
- b.tm.Lock()
- timers := make(map[string]*localBackendTimer)
- for timerName, timer := range b.timers {
- timers[timerName] = timer
- }
- b.tm.Unlock()
- for timerName, timer := range timers {
- timer.Lock()
- hist := timer.hist.Merge()
- timer.Unlock()
- for name, q := range percentiles {
- gauges[timerName+"."+name] = hist.ValueAtQuantile(q)
- }
- }
- return
- }
- // Stop cleanly closes the background goroutine spawned by NewLocalBackend.
- func (b *LocalBackend) Stop() {
- close(b.stop)
- b.wg.Wait()
- }
- // GetKey converts name+tags into a single string of the form
- // "name|tag1=value1|...|tagN=valueN", where tag names are
- // sorted alphabetically.
- func GetKey(name string, tags map[string]string, tagsSep string, tagKVSep string) string {
- keys := make([]string, 0, len(tags))
- for k := range tags {
- keys = append(keys, k)
- }
- sort.Strings(keys)
- key := name
- for _, k := range keys {
- key = key + tagsSep + k + tagKVSep + tags[k]
- }
- return key
- }
- type stats struct {
- name string
- tags map[string]string
- localBackend *LocalBackend
- }
- type localTimer struct {
- stats
- }
- func (l *localTimer) Record(d time.Duration) {
- l.localBackend.RecordTimer(l.name, l.tags, d)
- }
- type localCounter struct {
- stats
- }
- func (l *localCounter) Inc(delta int64) {
- l.localBackend.IncCounter(l.name, l.tags, delta)
- }
- type localGauge struct {
- stats
- }
- func (l *localGauge) Update(value int64) {
- l.localBackend.UpdateGauge(l.name, l.tags, value)
- }
- // LocalFactory stats factory that creates metrics that are stored locally
- type LocalFactory struct {
- *LocalBackend
- namespace string
- tags map[string]string
- }
- // NewLocalFactory returns a new LocalMetricsFactory
- func NewLocalFactory(collectionInterval time.Duration) *LocalFactory {
- return &LocalFactory{
- LocalBackend: NewLocalBackend(collectionInterval),
- }
- }
- // appendTags adds the tags to the namespace tags and returns a combined map.
- func (l *LocalFactory) appendTags(tags map[string]string) map[string]string {
- newTags := make(map[string]string)
- for k, v := range l.tags {
- newTags[k] = v
- }
- for k, v := range tags {
- newTags[k] = v
- }
- return newTags
- }
- func (l *LocalFactory) newNamespace(name string) string {
- if l.namespace == "" {
- return name
- }
- if name == "" {
- return l.namespace
- }
- return l.namespace + "." + name
- }
- // Counter returns a local stats counter
- func (l *LocalFactory) Counter(name string, tags map[string]string) Counter {
- return &localCounter{
- stats{
- name: l.newNamespace(name),
- tags: l.appendTags(tags),
- localBackend: l.LocalBackend,
- },
- }
- }
- // Timer returns a local stats timer.
- func (l *LocalFactory) Timer(name string, tags map[string]string) Timer {
- return &localTimer{
- stats{
- name: l.newNamespace(name),
- tags: l.appendTags(tags),
- localBackend: l.LocalBackend,
- },
- }
- }
- // Gauge returns a local stats gauge.
- func (l *LocalFactory) Gauge(name string, tags map[string]string) Gauge {
- return &localGauge{
- stats{
- name: l.newNamespace(name),
- tags: l.appendTags(tags),
- localBackend: l.LocalBackend,
- },
- }
- }
- // Namespace returns a new namespace.
- func (l *LocalFactory) Namespace(name string, tags map[string]string) Factory {
- return &LocalFactory{
- namespace: l.newNamespace(name),
- tags: l.appendTags(tags),
- LocalBackend: l.LocalBackend,
- }
- }
|