influxdb.go 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257
  1. package influxdb
  2. import (
  3. "fmt"
  4. uurl "net/url"
  5. "time"
  6. "github.com/ethereum/go-ethereum/log"
  7. "github.com/ethereum/go-ethereum/metrics"
  8. "github.com/influxdata/influxdb/client"
  9. )
  10. type reporter struct {
  11. reg metrics.Registry
  12. interval time.Duration
  13. url uurl.URL
  14. database string
  15. username string
  16. password string
  17. namespace string
  18. tags map[string]string
  19. client *client.Client
  20. cache map[string]int64
  21. }
  22. // InfluxDB starts a InfluxDB reporter which will post the from the given metrics.Registry at each d interval.
  23. func InfluxDB(r metrics.Registry, d time.Duration, url, database, username, password, namespace string) {
  24. InfluxDBWithTags(r, d, url, database, username, password, namespace, nil)
  25. }
  26. // InfluxDBWithTags starts a InfluxDB reporter which will post the from the given metrics.Registry at each d interval with the specified tags
  27. func InfluxDBWithTags(r metrics.Registry, d time.Duration, url, database, username, password, namespace string, tags map[string]string) {
  28. u, err := uurl.Parse(url)
  29. if err != nil {
  30. log.Warn("Unable to parse InfluxDB", "url", url, "err", err)
  31. return
  32. }
  33. rep := &reporter{
  34. reg: r,
  35. interval: d,
  36. url: *u,
  37. database: database,
  38. username: username,
  39. password: password,
  40. namespace: namespace,
  41. tags: tags,
  42. cache: make(map[string]int64),
  43. }
  44. if err := rep.makeClient(); err != nil {
  45. log.Warn("Unable to make InfluxDB client", "err", err)
  46. return
  47. }
  48. rep.run()
  49. }
  50. // InfluxDBWithTagsOnce runs once an InfluxDB reporter and post the given metrics.Registry with the specified tags
  51. func InfluxDBWithTagsOnce(r metrics.Registry, url, database, username, password, namespace string, tags map[string]string) error {
  52. u, err := uurl.Parse(url)
  53. if err != nil {
  54. return fmt.Errorf("unable to parse InfluxDB. url: %s, err: %v", url, err)
  55. }
  56. rep := &reporter{
  57. reg: r,
  58. url: *u,
  59. database: database,
  60. username: username,
  61. password: password,
  62. namespace: namespace,
  63. tags: tags,
  64. cache: make(map[string]int64),
  65. }
  66. if err := rep.makeClient(); err != nil {
  67. return fmt.Errorf("unable to make InfluxDB client. err: %v", err)
  68. }
  69. if err := rep.send(); err != nil {
  70. return fmt.Errorf("unable to send to InfluxDB. err: %v", err)
  71. }
  72. return nil
  73. }
  74. func (r *reporter) makeClient() (err error) {
  75. r.client, err = client.NewClient(client.Config{
  76. URL: r.url,
  77. Username: r.username,
  78. Password: r.password,
  79. Timeout: 10 * time.Second,
  80. })
  81. return
  82. }
  83. func (r *reporter) run() {
  84. intervalTicker := time.NewTicker(r.interval)
  85. pingTicker := time.NewTicker(time.Second * 5)
  86. for {
  87. select {
  88. case <-intervalTicker.C:
  89. if err := r.send(); err != nil {
  90. log.Warn("Unable to send to InfluxDB", "err", err)
  91. }
  92. case <-pingTicker.C:
  93. _, _, err := r.client.Ping()
  94. if err != nil {
  95. log.Warn("Got error while sending a ping to InfluxDB, trying to recreate client", "err", err)
  96. if err = r.makeClient(); err != nil {
  97. log.Warn("Unable to make InfluxDB client", "err", err)
  98. }
  99. }
  100. }
  101. }
  102. }
  103. func (r *reporter) send() error {
  104. var pts []client.Point
  105. r.reg.Each(func(name string, i interface{}) {
  106. now := time.Now()
  107. namespace := r.namespace
  108. switch metric := i.(type) {
  109. case metrics.Counter:
  110. count := metric.Count()
  111. pts = append(pts, client.Point{
  112. Measurement: fmt.Sprintf("%s%s.count", namespace, name),
  113. Tags: r.tags,
  114. Fields: map[string]interface{}{
  115. "value": count,
  116. },
  117. Time: now,
  118. })
  119. case metrics.Gauge:
  120. ms := metric.Snapshot()
  121. pts = append(pts, client.Point{
  122. Measurement: fmt.Sprintf("%s%s.gauge", namespace, name),
  123. Tags: r.tags,
  124. Fields: map[string]interface{}{
  125. "value": ms.Value(),
  126. },
  127. Time: now,
  128. })
  129. case metrics.GaugeFloat64:
  130. ms := metric.Snapshot()
  131. pts = append(pts, client.Point{
  132. Measurement: fmt.Sprintf("%s%s.gauge", namespace, name),
  133. Tags: r.tags,
  134. Fields: map[string]interface{}{
  135. "value": ms.Value(),
  136. },
  137. Time: now,
  138. })
  139. case metrics.Histogram:
  140. ms := metric.Snapshot()
  141. if ms.Count() > 0 {
  142. ps := ms.Percentiles([]float64{0.5, 0.75, 0.95, 0.99, 0.999, 0.9999})
  143. pts = append(pts, client.Point{
  144. Measurement: fmt.Sprintf("%s%s.histogram", namespace, name),
  145. Tags: r.tags,
  146. Fields: map[string]interface{}{
  147. "count": ms.Count(),
  148. "max": ms.Max(),
  149. "mean": ms.Mean(),
  150. "min": ms.Min(),
  151. "stddev": ms.StdDev(),
  152. "variance": ms.Variance(),
  153. "p50": ps[0],
  154. "p75": ps[1],
  155. "p95": ps[2],
  156. "p99": ps[3],
  157. "p999": ps[4],
  158. "p9999": ps[5],
  159. },
  160. Time: now,
  161. })
  162. }
  163. case metrics.Meter:
  164. ms := metric.Snapshot()
  165. pts = append(pts, client.Point{
  166. Measurement: fmt.Sprintf("%s%s.meter", namespace, name),
  167. Tags: r.tags,
  168. Fields: map[string]interface{}{
  169. "count": ms.Count(),
  170. "m1": ms.Rate1(),
  171. "m5": ms.Rate5(),
  172. "m15": ms.Rate15(),
  173. "mean": ms.RateMean(),
  174. },
  175. Time: now,
  176. })
  177. case metrics.Timer:
  178. ms := metric.Snapshot()
  179. ps := ms.Percentiles([]float64{0.5, 0.75, 0.95, 0.99, 0.999, 0.9999})
  180. pts = append(pts, client.Point{
  181. Measurement: fmt.Sprintf("%s%s.timer", namespace, name),
  182. Tags: r.tags,
  183. Fields: map[string]interface{}{
  184. "count": ms.Count(),
  185. "max": ms.Max(),
  186. "mean": ms.Mean(),
  187. "min": ms.Min(),
  188. "stddev": ms.StdDev(),
  189. "variance": ms.Variance(),
  190. "p50": ps[0],
  191. "p75": ps[1],
  192. "p95": ps[2],
  193. "p99": ps[3],
  194. "p999": ps[4],
  195. "p9999": ps[5],
  196. "m1": ms.Rate1(),
  197. "m5": ms.Rate5(),
  198. "m15": ms.Rate15(),
  199. "meanrate": ms.RateMean(),
  200. },
  201. Time: now,
  202. })
  203. case metrics.ResettingTimer:
  204. t := metric.Snapshot()
  205. if len(t.Values()) > 0 {
  206. ps := t.Percentiles([]float64{50, 95, 99})
  207. val := t.Values()
  208. pts = append(pts, client.Point{
  209. Measurement: fmt.Sprintf("%s%s.span", namespace, name),
  210. Tags: r.tags,
  211. Fields: map[string]interface{}{
  212. "count": len(val),
  213. "max": val[len(val)-1],
  214. "mean": t.Mean(),
  215. "min": val[0],
  216. "p50": ps[0],
  217. "p95": ps[1],
  218. "p99": ps[2],
  219. },
  220. Time: now,
  221. })
  222. }
  223. }
  224. })
  225. bps := client.BatchPoints{
  226. Points: pts,
  227. Database: r.database,
  228. }
  229. _, err := r.client.Write(bps)
  230. return err
  231. }