| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259 |
- package influxdb
- import (
- "fmt"
- uurl "net/url"
- "time"
- "github.com/ethereum/go-ethereum/log"
- "github.com/ethereum/go-ethereum/metrics"
- "github.com/influxdata/influxdb/client"
- )
- type reporter struct {
- reg metrics.Registry
- interval time.Duration
- url uurl.URL
- database string
- username string
- password string
- namespace string
- tags map[string]string
- client *client.Client
- cache map[string]int64
- }
- // InfluxDB starts a InfluxDB reporter which will post the from the given metrics.Registry at each d interval.
- func InfluxDB(r metrics.Registry, d time.Duration, url, database, username, password, namespace string) {
- InfluxDBWithTags(r, d, url, database, username, password, namespace, nil)
- }
- // InfluxDBWithTags starts a InfluxDB reporter which will post the from the given metrics.Registry at each d interval with the specified tags
- func InfluxDBWithTags(r metrics.Registry, d time.Duration, url, database, username, password, namespace string, tags map[string]string) {
- u, err := uurl.Parse(url)
- if err != nil {
- log.Warn("Unable to parse InfluxDB", "url", url, "err", err)
- return
- }
- rep := &reporter{
- reg: r,
- interval: d,
- url: *u,
- database: database,
- username: username,
- password: password,
- namespace: namespace,
- tags: tags,
- cache: make(map[string]int64),
- }
- if err := rep.makeClient(); err != nil {
- log.Warn("Unable to make InfluxDB client", "err", err)
- return
- }
- rep.run()
- }
- // InfluxDBWithTagsOnce runs once an InfluxDB reporter and post the given metrics.Registry with the specified tags
- func InfluxDBWithTagsOnce(r metrics.Registry, url, database, username, password, namespace string, tags map[string]string) error {
- u, err := uurl.Parse(url)
- if err != nil {
- return fmt.Errorf("unable to parse InfluxDB. url: %s, err: %v", url, err)
- }
- rep := &reporter{
- reg: r,
- url: *u,
- database: database,
- username: username,
- password: password,
- namespace: namespace,
- tags: tags,
- cache: make(map[string]int64),
- }
- if err := rep.makeClient(); err != nil {
- return fmt.Errorf("unable to make InfluxDB client. err: %v", err)
- }
- if err := rep.send(); err != nil {
- return fmt.Errorf("unable to send to InfluxDB. err: %v", err)
- }
- return nil
- }
- func (r *reporter) makeClient() (err error) {
- r.client, err = client.NewClient(client.Config{
- URL: r.url,
- Username: r.username,
- Password: r.password,
- Timeout: 10 * time.Second,
- })
- return
- }
- func (r *reporter) run() {
- intervalTicker := time.Tick(r.interval)
- pingTicker := time.Tick(time.Second * 5)
- for {
- select {
- case <-intervalTicker:
- if err := r.send(); err != nil {
- log.Warn("Unable to send to InfluxDB", "err", err)
- }
- case <-pingTicker:
- _, _, err := r.client.Ping()
- if err != nil {
- log.Warn("Got error while sending a ping to InfluxDB, trying to recreate client", "err", err)
- if err = r.makeClient(); err != nil {
- log.Warn("Unable to make InfluxDB client", "err", err)
- }
- }
- }
- }
- }
- func (r *reporter) send() error {
- var pts []client.Point
- r.reg.Each(func(name string, i interface{}) {
- now := time.Now()
- namespace := r.namespace
- switch metric := i.(type) {
- case metrics.Counter:
- v := metric.Count()
- l := r.cache[name]
- pts = append(pts, client.Point{
- Measurement: fmt.Sprintf("%s%s.count", namespace, name),
- Tags: r.tags,
- Fields: map[string]interface{}{
- "value": v - l,
- },
- Time: now,
- })
- r.cache[name] = v
- case metrics.Gauge:
- ms := metric.Snapshot()
- pts = append(pts, client.Point{
- Measurement: fmt.Sprintf("%s%s.gauge", namespace, name),
- Tags: r.tags,
- Fields: map[string]interface{}{
- "value": ms.Value(),
- },
- Time: now,
- })
- case metrics.GaugeFloat64:
- ms := metric.Snapshot()
- pts = append(pts, client.Point{
- Measurement: fmt.Sprintf("%s%s.gauge", namespace, name),
- Tags: r.tags,
- Fields: map[string]interface{}{
- "value": ms.Value(),
- },
- Time: now,
- })
- case metrics.Histogram:
- ms := metric.Snapshot()
- if ms.Count() > 0 {
- ps := ms.Percentiles([]float64{0.5, 0.75, 0.95, 0.99, 0.999, 0.9999})
- pts = append(pts, client.Point{
- Measurement: fmt.Sprintf("%s%s.histogram", namespace, name),
- Tags: r.tags,
- Fields: map[string]interface{}{
- "count": ms.Count(),
- "max": ms.Max(),
- "mean": ms.Mean(),
- "min": ms.Min(),
- "stddev": ms.StdDev(),
- "variance": ms.Variance(),
- "p50": ps[0],
- "p75": ps[1],
- "p95": ps[2],
- "p99": ps[3],
- "p999": ps[4],
- "p9999": ps[5],
- },
- Time: now,
- })
- }
- case metrics.Meter:
- ms := metric.Snapshot()
- pts = append(pts, client.Point{
- Measurement: fmt.Sprintf("%s%s.meter", namespace, name),
- Tags: r.tags,
- Fields: map[string]interface{}{
- "count": ms.Count(),
- "m1": ms.Rate1(),
- "m5": ms.Rate5(),
- "m15": ms.Rate15(),
- "mean": ms.RateMean(),
- },
- Time: now,
- })
- case metrics.Timer:
- ms := metric.Snapshot()
- ps := ms.Percentiles([]float64{0.5, 0.75, 0.95, 0.99, 0.999, 0.9999})
- pts = append(pts, client.Point{
- Measurement: fmt.Sprintf("%s%s.timer", namespace, name),
- Tags: r.tags,
- Fields: map[string]interface{}{
- "count": ms.Count(),
- "max": ms.Max(),
- "mean": ms.Mean(),
- "min": ms.Min(),
- "stddev": ms.StdDev(),
- "variance": ms.Variance(),
- "p50": ps[0],
- "p75": ps[1],
- "p95": ps[2],
- "p99": ps[3],
- "p999": ps[4],
- "p9999": ps[5],
- "m1": ms.Rate1(),
- "m5": ms.Rate5(),
- "m15": ms.Rate15(),
- "meanrate": ms.RateMean(),
- },
- Time: now,
- })
- case metrics.ResettingTimer:
- t := metric.Snapshot()
- if len(t.Values()) > 0 {
- ps := t.Percentiles([]float64{50, 95, 99})
- val := t.Values()
- pts = append(pts, client.Point{
- Measurement: fmt.Sprintf("%s%s.span", namespace, name),
- Tags: r.tags,
- Fields: map[string]interface{}{
- "count": len(val),
- "max": val[len(val)-1],
- "mean": t.Mean(),
- "min": val[0],
- "p50": ps[0],
- "p95": ps[1],
- "p99": ps[2],
- },
- Time: now,
- })
- }
- }
- })
- bps := client.BatchPoints{
- Points: pts,
- Database: r.database,
- }
- _, err := r.client.Write(bps)
- return err
- }
|