influxdb.go 5.4 KB


  1. package influxdb
  2. import (
  3. "fmt"
  4. "log"
  5. uurl "net/url"
  6. "time"
  7. "github.com/ethereum/go-ethereum/metrics"
  8. "github.com/influxdata/influxdb/client"
  9. )
  10. type reporter struct {
  11. reg metrics.Registry
  12. interval time.Duration
  13. url uurl.URL
  14. database string
  15. username string
  16. password string
  17. namespace string
  18. tags map[string]string
  19. client *client.Client
  20. cache map[string]int64
  21. }
  22. // InfluxDB starts a InfluxDB reporter which will post the from the given metrics.Registry at each d interval.
  23. func InfluxDB(r metrics.Registry, d time.Duration, url, database, username, password, namespace string) {
  24. InfluxDBWithTags(r, d, url, database, username, password, namespace, nil)
  25. }
  26. // InfluxDBWithTags starts a InfluxDB reporter which will post the from the given metrics.Registry at each d interval with the specified tags
  27. func InfluxDBWithTags(r metrics.Registry, d time.Duration, url, database, username, password, namespace string, tags map[string]string) {
  28. u, err := uurl.Parse(url)
  29. if err != nil {
  30. log.Printf("unable to parse InfluxDB url %s. err=%v", url, err)
  31. return
  32. }
  33. rep := &reporter{
  34. reg: r,
  35. interval: d,
  36. url: *u,
  37. database: database,
  38. username: username,
  39. password: password,
  40. namespace: namespace,
  41. tags: tags,
  42. cache: make(map[string]int64),
  43. }
  44. if err := rep.makeClient(); err != nil {
  45. log.Printf("unable to make InfluxDB client. err=%v", err)
  46. return
  47. }
  48. rep.run()
  49. }
  50. func (r *reporter) makeClient() (err error) {
  51. r.client, err = client.NewClient(client.Config{
  52. URL: r.url,
  53. Username: r.username,
  54. Password: r.password,
  55. })
  56. return
  57. }
  58. func (r *reporter) run() {
  59. intervalTicker := time.Tick(r.interval)
  60. pingTicker := time.Tick(time.Second * 5)
  61. for {
  62. select {
  63. case <-intervalTicker:
  64. if err := r.send(); err != nil {
  65. log.Printf("unable to send to InfluxDB. err=%v", err)
  66. }
  67. case <-pingTicker:
  68. _, _, err := r.client.Ping()
  69. if err != nil {
  70. log.Printf("got error while sending a ping to InfluxDB, trying to recreate client. err=%v", err)
  71. if err = r.makeClient(); err != nil {
  72. log.Printf("unable to make InfluxDB client. err=%v", err)
  73. }
  74. }
  75. }
  76. }
  77. }
  78. func (r *reporter) send() error {
  79. var pts []client.Point
  80. r.reg.Each(func(name string, i interface{}) {
  81. now := time.Now()
  82. namespace := r.namespace
  83. switch metric := i.(type) {
  84. case metrics.Counter:
  85. v := metric.Count()
  86. l := r.cache[name]
  87. pts = append(pts, client.Point{
  88. Measurement: fmt.Sprintf("%s%s.count", namespace, name),
  89. Tags: r.tags,
  90. Fields: map[string]interface{}{
  91. "value": v - l,
  92. },
  93. Time: now,
  94. })
  95. r.cache[name] = v
  96. case metrics.Gauge:
  97. ms := metric.Snapshot()
  98. pts = append(pts, client.Point{
  99. Measurement: fmt.Sprintf("%s%s.gauge", namespace, name),
  100. Tags: r.tags,
  101. Fields: map[string]interface{}{
  102. "value": ms.Value(),
  103. },
  104. Time: now,
  105. })
  106. case metrics.GaugeFloat64:
  107. ms := metric.Snapshot()
  108. pts = append(pts, client.Point{
  109. Measurement: fmt.Sprintf("%s%s.gauge", namespace, name),
  110. Tags: r.tags,
  111. Fields: map[string]interface{}{
  112. "value": ms.Value(),
  113. },
  114. Time: now,
  115. })
  116. case metrics.Histogram:
  117. ms := metric.Snapshot()
  118. ps := ms.Percentiles([]float64{0.5, 0.75, 0.95, 0.99, 0.999, 0.9999})
  119. pts = append(pts, client.Point{
  120. Measurement: fmt.Sprintf("%s%s.histogram", namespace, name),
  121. Tags: r.tags,
  122. Fields: map[string]interface{}{
  123. "count": ms.Count(),
  124. "max": ms.Max(),
  125. "mean": ms.Mean(),
  126. "min": ms.Min(),
  127. "stddev": ms.StdDev(),
  128. "variance": ms.Variance(),
  129. "p50": ps[0],
  130. "p75": ps[1],
  131. "p95": ps[2],
  132. "p99": ps[3],
  133. "p999": ps[4],
  134. "p9999": ps[5],
  135. },
  136. Time: now,
  137. })
  138. case metrics.Meter:
  139. ms := metric.Snapshot()
  140. pts = append(pts, client.Point{
  141. Measurement: fmt.Sprintf("%s%s.meter", namespace, name),
  142. Tags: r.tags,
  143. Fields: map[string]interface{}{
  144. "count": ms.Count(),
  145. "m1": ms.Rate1(),
  146. "m5": ms.Rate5(),
  147. "m15": ms.Rate15(),
  148. "mean": ms.RateMean(),
  149. },
  150. Time: now,
  151. })
  152. case metrics.Timer:
  153. ms := metric.Snapshot()
  154. ps := ms.Percentiles([]float64{0.5, 0.75, 0.95, 0.99, 0.999, 0.9999})
  155. pts = append(pts, client.Point{
  156. Measurement: fmt.Sprintf("%s%s.timer", namespace, name),
  157. Tags: r.tags,
  158. Fields: map[string]interface{}{
  159. "count": ms.Count(),
  160. "max": ms.Max(),
  161. "mean": ms.Mean(),
  162. "min": ms.Min(),
  163. "stddev": ms.StdDev(),
  164. "variance": ms.Variance(),
  165. "p50": ps[0],
  166. "p75": ps[1],
  167. "p95": ps[2],
  168. "p99": ps[3],
  169. "p999": ps[4],
  170. "p9999": ps[5],
  171. "m1": ms.Rate1(),
  172. "m5": ms.Rate5(),
  173. "m15": ms.Rate15(),
  174. "meanrate": ms.RateMean(),
  175. },
  176. Time: now,
  177. })
  178. case metrics.ResettingTimer:
  179. t := metric.Snapshot()
  180. if len(t.Values()) > 0 {
  181. ps := t.Percentiles([]float64{50, 95, 99})
  182. val := t.Values()
  183. pts = append(pts, client.Point{
  184. Measurement: fmt.Sprintf("%s%s.span", namespace, name),
  185. Tags: r.tags,
  186. Fields: map[string]interface{}{
  187. "count": len(val),
  188. "max": val[len(val)-1],
  189. "mean": t.Mean(),
  190. "min": val[0],
  191. "p50": ps[0],
  192. "p95": ps[1],
  193. "p99": ps[2],
  194. },
  195. Time: now,
  196. })
  197. }
  198. }
  199. })
  200. bps := client.BatchPoints{
  201. Points: pts,
  202. Database: r.database,
  203. }
  204. _, err := r.client.Write(bps)
  205. return err
  206. }