influxdb.go 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259
  1. package influxdb
  2. import (
  3. "fmt"
  4. uurl "net/url"
  5. "time"
  6. "github.com/ethereum/go-ethereum/log"
  7. "github.com/ethereum/go-ethereum/metrics"
  8. "github.com/influxdata/influxdb/client"
  9. )
  10. type reporter struct {
  11. reg metrics.Registry
  12. interval time.Duration
  13. url uurl.URL
  14. database string
  15. username string
  16. password string
  17. namespace string
  18. tags map[string]string
  19. client *client.Client
  20. cache map[string]int64
  21. }
  22. // InfluxDB starts a InfluxDB reporter which will post the from the given metrics.Registry at each d interval.
  23. func InfluxDB(r metrics.Registry, d time.Duration, url, database, username, password, namespace string) {
  24. InfluxDBWithTags(r, d, url, database, username, password, namespace, nil)
  25. }
  26. // InfluxDBWithTags starts a InfluxDB reporter which will post the from the given metrics.Registry at each d interval with the specified tags
  27. func InfluxDBWithTags(r metrics.Registry, d time.Duration, url, database, username, password, namespace string, tags map[string]string) {
  28. u, err := uurl.Parse(url)
  29. if err != nil {
  30. log.Warn("Unable to parse InfluxDB", "url", url, "err", err)
  31. return
  32. }
  33. rep := &reporter{
  34. reg: r,
  35. interval: d,
  36. url: *u,
  37. database: database,
  38. username: username,
  39. password: password,
  40. namespace: namespace,
  41. tags: tags,
  42. cache: make(map[string]int64),
  43. }
  44. if err := rep.makeClient(); err != nil {
  45. log.Warn("Unable to make InfluxDB client", "err", err)
  46. return
  47. }
  48. rep.run()
  49. }
  50. // InfluxDBWithTagsOnce runs once an InfluxDB reporter and post the given metrics.Registry with the specified tags
  51. func InfluxDBWithTagsOnce(r metrics.Registry, url, database, username, password, namespace string, tags map[string]string) error {
  52. u, err := uurl.Parse(url)
  53. if err != nil {
  54. return fmt.Errorf("unable to parse InfluxDB. url: %s, err: %v", url, err)
  55. }
  56. rep := &reporter{
  57. reg: r,
  58. url: *u,
  59. database: database,
  60. username: username,
  61. password: password,
  62. namespace: namespace,
  63. tags: tags,
  64. cache: make(map[string]int64),
  65. }
  66. if err := rep.makeClient(); err != nil {
  67. return fmt.Errorf("unable to make InfluxDB client. err: %v", err)
  68. }
  69. if err := rep.send(); err != nil {
  70. return fmt.Errorf("unable to send to InfluxDB. err: %v", err)
  71. }
  72. return nil
  73. }
  74. func (r *reporter) makeClient() (err error) {
  75. r.client, err = client.NewClient(client.Config{
  76. URL: r.url,
  77. Username: r.username,
  78. Password: r.password,
  79. Timeout: 10 * time.Second,
  80. })
  81. return
  82. }
  83. func (r *reporter) run() {
  84. intervalTicker := time.Tick(r.interval)
  85. pingTicker := time.Tick(time.Second * 5)
  86. for {
  87. select {
  88. case <-intervalTicker:
  89. if err := r.send(); err != nil {
  90. log.Warn("Unable to send to InfluxDB", "err", err)
  91. }
  92. case <-pingTicker:
  93. _, _, err := r.client.Ping()
  94. if err != nil {
  95. log.Warn("Got error while sending a ping to InfluxDB, trying to recreate client", "err", err)
  96. if err = r.makeClient(); err != nil {
  97. log.Warn("Unable to make InfluxDB client", "err", err)
  98. }
  99. }
  100. }
  101. }
  102. }
  103. func (r *reporter) send() error {
  104. var pts []client.Point
  105. r.reg.Each(func(name string, i interface{}) {
  106. now := time.Now()
  107. namespace := r.namespace
  108. switch metric := i.(type) {
  109. case metrics.Counter:
  110. v := metric.Count()
  111. l := r.cache[name]
  112. pts = append(pts, client.Point{
  113. Measurement: fmt.Sprintf("%s%s.count", namespace, name),
  114. Tags: r.tags,
  115. Fields: map[string]interface{}{
  116. "value": v - l,
  117. },
  118. Time: now,
  119. })
  120. r.cache[name] = v
  121. case metrics.Gauge:
  122. ms := metric.Snapshot()
  123. pts = append(pts, client.Point{
  124. Measurement: fmt.Sprintf("%s%s.gauge", namespace, name),
  125. Tags: r.tags,
  126. Fields: map[string]interface{}{
  127. "value": ms.Value(),
  128. },
  129. Time: now,
  130. })
  131. case metrics.GaugeFloat64:
  132. ms := metric.Snapshot()
  133. pts = append(pts, client.Point{
  134. Measurement: fmt.Sprintf("%s%s.gauge", namespace, name),
  135. Tags: r.tags,
  136. Fields: map[string]interface{}{
  137. "value": ms.Value(),
  138. },
  139. Time: now,
  140. })
  141. case metrics.Histogram:
  142. ms := metric.Snapshot()
  143. if ms.Count() > 0 {
  144. ps := ms.Percentiles([]float64{0.5, 0.75, 0.95, 0.99, 0.999, 0.9999})
  145. pts = append(pts, client.Point{
  146. Measurement: fmt.Sprintf("%s%s.histogram", namespace, name),
  147. Tags: r.tags,
  148. Fields: map[string]interface{}{
  149. "count": ms.Count(),
  150. "max": ms.Max(),
  151. "mean": ms.Mean(),
  152. "min": ms.Min(),
  153. "stddev": ms.StdDev(),
  154. "variance": ms.Variance(),
  155. "p50": ps[0],
  156. "p75": ps[1],
  157. "p95": ps[2],
  158. "p99": ps[3],
  159. "p999": ps[4],
  160. "p9999": ps[5],
  161. },
  162. Time: now,
  163. })
  164. }
  165. case metrics.Meter:
  166. ms := metric.Snapshot()
  167. pts = append(pts, client.Point{
  168. Measurement: fmt.Sprintf("%s%s.meter", namespace, name),
  169. Tags: r.tags,
  170. Fields: map[string]interface{}{
  171. "count": ms.Count(),
  172. "m1": ms.Rate1(),
  173. "m5": ms.Rate5(),
  174. "m15": ms.Rate15(),
  175. "mean": ms.RateMean(),
  176. },
  177. Time: now,
  178. })
  179. case metrics.Timer:
  180. ms := metric.Snapshot()
  181. ps := ms.Percentiles([]float64{0.5, 0.75, 0.95, 0.99, 0.999, 0.9999})
  182. pts = append(pts, client.Point{
  183. Measurement: fmt.Sprintf("%s%s.timer", namespace, name),
  184. Tags: r.tags,
  185. Fields: map[string]interface{}{
  186. "count": ms.Count(),
  187. "max": ms.Max(),
  188. "mean": ms.Mean(),
  189. "min": ms.Min(),
  190. "stddev": ms.StdDev(),
  191. "variance": ms.Variance(),
  192. "p50": ps[0],
  193. "p75": ps[1],
  194. "p95": ps[2],
  195. "p99": ps[3],
  196. "p999": ps[4],
  197. "p9999": ps[5],
  198. "m1": ms.Rate1(),
  199. "m5": ms.Rate5(),
  200. "m15": ms.Rate15(),
  201. "meanrate": ms.RateMean(),
  202. },
  203. Time: now,
  204. })
  205. case metrics.ResettingTimer:
  206. t := metric.Snapshot()
  207. if len(t.Values()) > 0 {
  208. ps := t.Percentiles([]float64{50, 95, 99})
  209. val := t.Values()
  210. pts = append(pts, client.Point{
  211. Measurement: fmt.Sprintf("%s%s.span", namespace, name),
  212. Tags: r.tags,
  213. Fields: map[string]interface{}{
  214. "count": len(val),
  215. "max": val[len(val)-1],
  216. "mean": t.Mean(),
  217. "min": val[0],
  218. "p50": ps[0],
  219. "p95": ps[1],
  220. "p99": ps[2],
  221. },
  222. Time: now,
  223. })
  224. }
  225. }
  226. })
  227. bps := client.BatchPoints{
  228. Points: pts,
  229. Database: r.database,
  230. }
  231. _, err := r.client.Write(bps)
  232. return err
  233. }