influxdbv2.go 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221
  1. //
  2. // The go-ethereum library is distributed in the hope that it will be useful,
  3. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  4. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  5. // GNU Lesser General Public License for more details.
  6. //
  7. // You should have received a copy of the GNU Lesser General Public License
  8. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  9. package influxdb
  10. import (
  11. "context"
  12. "fmt"
  13. "time"
  14. "github.com/ethereum/go-ethereum/log"
  15. "github.com/ethereum/go-ethereum/metrics"
  16. influxdb2 "github.com/influxdata/influxdb-client-go/v2"
  17. "github.com/influxdata/influxdb-client-go/v2/api"
  18. )
  19. type v2Reporter struct {
  20. reg metrics.Registry
  21. interval time.Duration
  22. endpoint string
  23. token string
  24. bucket string
  25. organization string
  26. namespace string
  27. tags map[string]string
  28. client influxdb2.Client
  29. write api.WriteAPI
  30. cache map[string]int64
  31. }
  32. // InfluxDBWithTags starts a InfluxDB reporter which will post the from the given metrics.Registry at each d interval with the specified tags
  33. func InfluxDBV2WithTags(r metrics.Registry, d time.Duration, endpoint string, token string, bucket string, organization string, namespace string, tags map[string]string) {
  34. rep := &v2Reporter{
  35. reg: r,
  36. interval: d,
  37. endpoint: endpoint,
  38. token: token,
  39. bucket: bucket,
  40. organization: organization,
  41. namespace: namespace,
  42. tags: tags,
  43. cache: make(map[string]int64),
  44. }
  45. rep.client = influxdb2.NewClient(rep.endpoint, rep.token)
  46. defer rep.client.Close()
  47. // async write client
  48. rep.write = rep.client.WriteAPI(rep.organization, rep.bucket)
  49. errorsCh := rep.write.Errors()
  50. // have to handle write errors in a separate goroutine like this b/c the channel is unbuffered and will block writes if not read
  51. go func() {
  52. for err := range errorsCh {
  53. log.Warn("write error", "err", err.Error())
  54. }
  55. }()
  56. rep.run()
  57. }
  58. func (r *v2Reporter) run() {
  59. intervalTicker := time.NewTicker(r.interval)
  60. pingTicker := time.NewTicker(time.Second * 5)
  61. for {
  62. select {
  63. case <-intervalTicker.C:
  64. r.send()
  65. case <-pingTicker.C:
  66. _, err := r.client.Health(context.Background())
  67. if err != nil {
  68. log.Warn("Got error from influxdb client health check", "err", err.Error())
  69. }
  70. }
  71. }
  72. }
  73. func (r *v2Reporter) send() {
  74. r.reg.Each(func(name string, i interface{}) {
  75. now := time.Now()
  76. namespace := r.namespace
  77. switch metric := i.(type) {
  78. case metrics.Counter:
  79. v := metric.Count()
  80. l := r.cache[name]
  81. measurement := fmt.Sprintf("%s%s.count", namespace, name)
  82. fields := map[string]interface{}{
  83. "value": v - l,
  84. }
  85. pt := influxdb2.NewPoint(measurement, r.tags, fields, now)
  86. r.write.WritePoint(pt)
  87. r.cache[name] = v
  88. case metrics.Gauge:
  89. ms := metric.Snapshot()
  90. measurement := fmt.Sprintf("%s%s.gauge", namespace, name)
  91. fields := map[string]interface{}{
  92. "value": ms.Value(),
  93. }
  94. pt := influxdb2.NewPoint(measurement, r.tags, fields, now)
  95. r.write.WritePoint(pt)
  96. case metrics.GaugeFloat64:
  97. ms := metric.Snapshot()
  98. measurement := fmt.Sprintf("%s%s.gauge", namespace, name)
  99. fields := map[string]interface{}{
  100. "value": ms.Value(),
  101. }
  102. pt := influxdb2.NewPoint(measurement, r.tags, fields, now)
  103. r.write.WritePoint(pt)
  104. case metrics.Histogram:
  105. ms := metric.Snapshot()
  106. if ms.Count() > 0 {
  107. ps := ms.Percentiles([]float64{0.5, 0.75, 0.95, 0.99, 0.999, 0.9999})
  108. measurement := fmt.Sprintf("%s%s.histogram", namespace, name)
  109. fields := map[string]interface{}{
  110. "count": ms.Count(),
  111. "max": ms.Max(),
  112. "mean": ms.Mean(),
  113. "min": ms.Min(),
  114. "stddev": ms.StdDev(),
  115. "variance": ms.Variance(),
  116. "p50": ps[0],
  117. "p75": ps[1],
  118. "p95": ps[2],
  119. "p99": ps[3],
  120. "p999": ps[4],
  121. "p9999": ps[5],
  122. }
  123. pt := influxdb2.NewPoint(measurement, r.tags, fields, now)
  124. r.write.WritePoint(pt)
  125. }
  126. case metrics.Meter:
  127. ms := metric.Snapshot()
  128. measurement := fmt.Sprintf("%s%s.meter", namespace, name)
  129. fields := map[string]interface{}{
  130. "count": ms.Count(),
  131. "m1": ms.Rate1(),
  132. "m5": ms.Rate5(),
  133. "m15": ms.Rate15(),
  134. "mean": ms.RateMean(),
  135. }
  136. pt := influxdb2.NewPoint(measurement, r.tags, fields, now)
  137. r.write.WritePoint(pt)
  138. case metrics.Timer:
  139. ms := metric.Snapshot()
  140. ps := ms.Percentiles([]float64{0.5, 0.75, 0.95, 0.99, 0.999, 0.9999})
  141. measurement := fmt.Sprintf("%s%s.timer", namespace, name)
  142. fields := map[string]interface{}{
  143. "count": ms.Count(),
  144. "max": ms.Max(),
  145. "mean": ms.Mean(),
  146. "min": ms.Min(),
  147. "stddev": ms.StdDev(),
  148. "variance": ms.Variance(),
  149. "p50": ps[0],
  150. "p75": ps[1],
  151. "p95": ps[2],
  152. "p99": ps[3],
  153. "p999": ps[4],
  154. "p9999": ps[5],
  155. "m1": ms.Rate1(),
  156. "m5": ms.Rate5(),
  157. "m15": ms.Rate15(),
  158. "meanrate": ms.RateMean(),
  159. }
  160. pt := influxdb2.NewPoint(measurement, r.tags, fields, now)
  161. r.write.WritePoint(pt)
  162. case metrics.ResettingTimer:
  163. t := metric.Snapshot()
  164. if len(t.Values()) > 0 {
  165. ps := t.Percentiles([]float64{50, 95, 99})
  166. val := t.Values()
  167. measurement := fmt.Sprintf("%s%s.span", namespace, name)
  168. fields := map[string]interface{}{
  169. "count": len(val),
  170. "max": val[len(val)-1],
  171. "mean": t.Mean(),
  172. "min": val[0],
  173. "p50": ps[0],
  174. "p95": ps[1],
  175. "p99": ps[2],
  176. }
  177. pt := influxdb2.NewPoint(measurement, r.tags, fields, now)
  178. r.write.WritePoint(pt)
  179. }
  180. }
  181. })
  182. // Force all unwritten data to be sent
  183. r.write.Flush()
  184. }