| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221 |
- //
- // The go-ethereum library is distributed in the hope that it will be useful,
- // but WITHOUT ANY WARRANTY; without even the implied warranty of
- // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- // GNU Lesser General Public License for more details.
- //
- // You should have received a copy of the GNU Lesser General Public License
- // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
- package influxdb
- import (
- "context"
- "fmt"
- "time"
- "github.com/ethereum/go-ethereum/log"
- "github.com/ethereum/go-ethereum/metrics"
- influxdb2 "github.com/influxdata/influxdb-client-go/v2"
- "github.com/influxdata/influxdb-client-go/v2/api"
- )
- type v2Reporter struct {
- reg metrics.Registry
- interval time.Duration
- endpoint string
- token string
- bucket string
- organization string
- namespace string
- tags map[string]string
- client influxdb2.Client
- write api.WriteAPI
- cache map[string]int64
- }
- // InfluxDBWithTags starts a InfluxDB reporter which will post the from the given metrics.Registry at each d interval with the specified tags
- func InfluxDBV2WithTags(r metrics.Registry, d time.Duration, endpoint string, token string, bucket string, organization string, namespace string, tags map[string]string) {
- rep := &v2Reporter{
- reg: r,
- interval: d,
- endpoint: endpoint,
- token: token,
- bucket: bucket,
- organization: organization,
- namespace: namespace,
- tags: tags,
- cache: make(map[string]int64),
- }
- rep.client = influxdb2.NewClient(rep.endpoint, rep.token)
- defer rep.client.Close()
- // async write client
- rep.write = rep.client.WriteAPI(rep.organization, rep.bucket)
- errorsCh := rep.write.Errors()
- // have to handle write errors in a separate goroutine like this b/c the channel is unbuffered and will block writes if not read
- go func() {
- for err := range errorsCh {
- log.Warn("write error", "err", err.Error())
- }
- }()
- rep.run()
- }
- func (r *v2Reporter) run() {
- intervalTicker := time.NewTicker(r.interval)
- pingTicker := time.NewTicker(time.Second * 5)
- for {
- select {
- case <-intervalTicker.C:
- r.send()
- case <-pingTicker.C:
- _, err := r.client.Health(context.Background())
- if err != nil {
- log.Warn("Got error from influxdb client health check", "err", err.Error())
- }
- }
- }
- }
- func (r *v2Reporter) send() {
- r.reg.Each(func(name string, i interface{}) {
- now := time.Now()
- namespace := r.namespace
- switch metric := i.(type) {
- case metrics.Counter:
- v := metric.Count()
- l := r.cache[name]
- measurement := fmt.Sprintf("%s%s.count", namespace, name)
- fields := map[string]interface{}{
- "value": v - l,
- }
- pt := influxdb2.NewPoint(measurement, r.tags, fields, now)
- r.write.WritePoint(pt)
- r.cache[name] = v
- case metrics.Gauge:
- ms := metric.Snapshot()
- measurement := fmt.Sprintf("%s%s.gauge", namespace, name)
- fields := map[string]interface{}{
- "value": ms.Value(),
- }
- pt := influxdb2.NewPoint(measurement, r.tags, fields, now)
- r.write.WritePoint(pt)
- case metrics.GaugeFloat64:
- ms := metric.Snapshot()
- measurement := fmt.Sprintf("%s%s.gauge", namespace, name)
- fields := map[string]interface{}{
- "value": ms.Value(),
- }
- pt := influxdb2.NewPoint(measurement, r.tags, fields, now)
- r.write.WritePoint(pt)
- case metrics.Histogram:
- ms := metric.Snapshot()
- if ms.Count() > 0 {
- ps := ms.Percentiles([]float64{0.5, 0.75, 0.95, 0.99, 0.999, 0.9999})
- measurement := fmt.Sprintf("%s%s.histogram", namespace, name)
- fields := map[string]interface{}{
- "count": ms.Count(),
- "max": ms.Max(),
- "mean": ms.Mean(),
- "min": ms.Min(),
- "stddev": ms.StdDev(),
- "variance": ms.Variance(),
- "p50": ps[0],
- "p75": ps[1],
- "p95": ps[2],
- "p99": ps[3],
- "p999": ps[4],
- "p9999": ps[5],
- }
- pt := influxdb2.NewPoint(measurement, r.tags, fields, now)
- r.write.WritePoint(pt)
- }
- case metrics.Meter:
- ms := metric.Snapshot()
- measurement := fmt.Sprintf("%s%s.meter", namespace, name)
- fields := map[string]interface{}{
- "count": ms.Count(),
- "m1": ms.Rate1(),
- "m5": ms.Rate5(),
- "m15": ms.Rate15(),
- "mean": ms.RateMean(),
- }
- pt := influxdb2.NewPoint(measurement, r.tags, fields, now)
- r.write.WritePoint(pt)
- case metrics.Timer:
- ms := metric.Snapshot()
- ps := ms.Percentiles([]float64{0.5, 0.75, 0.95, 0.99, 0.999, 0.9999})
- measurement := fmt.Sprintf("%s%s.timer", namespace, name)
- fields := map[string]interface{}{
- "count": ms.Count(),
- "max": ms.Max(),
- "mean": ms.Mean(),
- "min": ms.Min(),
- "stddev": ms.StdDev(),
- "variance": ms.Variance(),
- "p50": ps[0],
- "p75": ps[1],
- "p95": ps[2],
- "p99": ps[3],
- "p999": ps[4],
- "p9999": ps[5],
- "m1": ms.Rate1(),
- "m5": ms.Rate5(),
- "m15": ms.Rate15(),
- "meanrate": ms.RateMean(),
- }
- pt := influxdb2.NewPoint(measurement, r.tags, fields, now)
- r.write.WritePoint(pt)
- case metrics.ResettingTimer:
- t := metric.Snapshot()
- if len(t.Values()) > 0 {
- ps := t.Percentiles([]float64{50, 95, 99})
- val := t.Values()
- measurement := fmt.Sprintf("%s%s.span", namespace, name)
- fields := map[string]interface{}{
- "count": len(val),
- "max": val[len(val)-1],
- "mean": t.Mean(),
- "min": val[0],
- "p50": ps[0],
- "p95": ps[1],
- "p99": ps[2],
- }
- pt := influxdb2.NewPoint(measurement, r.tags, fields, now)
- r.write.WritePoint(pt)
- }
- }
- })
- // Force all unwritten data to be sent
- r.write.Flush()
- }
|