csvlogger.go 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227
  1. // Copyright 2019 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package csvlogger
  17. import (
  18. "fmt"
  19. "os"
  20. "sync"
  21. "time"
  22. "github.com/ethereum/go-ethereum/common/mclock"
  23. "github.com/ethereum/go-ethereum/log"
  24. )
  25. // Logger is a metrics/events logger that writes logged values and events into a comma separated file
  26. type Logger struct {
  27. file *os.File
  28. started mclock.AbsTime
  29. channels []*Channel
  30. period time.Duration
  31. stopCh, stopped chan struct{}
  32. storeCh chan string
  33. eventHeader string
  34. }
  35. // NewLogger creates a new Logger
  36. func NewLogger(fileName string, updatePeriod time.Duration, eventHeader string) *Logger {
  37. if fileName == "" {
  38. return nil
  39. }
  40. f, err := os.Create(fileName)
  41. if err != nil {
  42. log.Error("Error creating log file", "name", fileName, "error", err)
  43. return nil
  44. }
  45. return &Logger{
  46. file: f,
  47. period: updatePeriod,
  48. stopCh: make(chan struct{}),
  49. storeCh: make(chan string, 1),
  50. eventHeader: eventHeader,
  51. }
  52. }
  53. // NewChannel creates a new value logger channel that writes values in a single
  54. // column. If the relative change of the value is bigger than the given threshold
  55. // then a new line is added immediately (threshold can also be 0).
  56. func (l *Logger) NewChannel(name string, threshold float64) *Channel {
  57. if l == nil {
  58. return nil
  59. }
  60. c := &Channel{
  61. logger: l,
  62. name: name,
  63. threshold: threshold,
  64. }
  65. l.channels = append(l.channels, c)
  66. return c
  67. }
  68. // NewMinMaxChannel creates a new value logger channel that writes the minimum and
  69. // maximum of the tracked value in two columns. It never triggers adding a new line.
  70. // If zeroDefault is true then 0 is written to both min and max columns if no update
  71. // was given during the last period. If it is false then the last update will appear
  72. // in both columns.
  73. func (l *Logger) NewMinMaxChannel(name string, zeroDefault bool) *Channel {
  74. if l == nil {
  75. return nil
  76. }
  77. c := &Channel{
  78. logger: l,
  79. name: name,
  80. minmax: true,
  81. mmZeroDefault: zeroDefault,
  82. }
  83. l.channels = append(l.channels, c)
  84. return c
  85. }
  86. func (l *Logger) store(event string) {
  87. s := fmt.Sprintf("%g", float64(mclock.Now()-l.started)/1000000000)
  88. for _, ch := range l.channels {
  89. s += ", " + ch.store()
  90. }
  91. if event != "" {
  92. s += ", " + event
  93. }
  94. l.file.WriteString(s + "\n")
  95. }
  96. // Start writes the header line and starts the logger
  97. func (l *Logger) Start() {
  98. if l == nil {
  99. return
  100. }
  101. l.started = mclock.Now()
  102. s := "Time"
  103. for _, ch := range l.channels {
  104. s += ", " + ch.header()
  105. }
  106. if l.eventHeader != "" {
  107. s += ", " + l.eventHeader
  108. }
  109. l.file.WriteString(s + "\n")
  110. go func() {
  111. timer := time.NewTimer(l.period)
  112. for {
  113. select {
  114. case <-timer.C:
  115. l.store("")
  116. timer.Reset(l.period)
  117. case event := <-l.storeCh:
  118. l.store(event)
  119. if !timer.Stop() {
  120. <-timer.C
  121. }
  122. timer.Reset(l.period)
  123. case <-l.stopCh:
  124. close(l.stopped)
  125. return
  126. }
  127. }
  128. }()
  129. }
  130. // Stop stops the logger and closes the file
  131. func (l *Logger) Stop() {
  132. if l == nil {
  133. return
  134. }
  135. l.stopped = make(chan struct{})
  136. close(l.stopCh)
  137. <-l.stopped
  138. l.file.Close()
  139. }
  140. // Event immediately adds a new line and adds the given event string in the last column
  141. func (l *Logger) Event(event string) {
  142. if l == nil {
  143. return
  144. }
  145. select {
  146. case l.storeCh <- event:
  147. case <-l.stopCh:
  148. }
  149. }
  150. // Channel represents a logger channel tracking a single value
  151. type Channel struct {
  152. logger *Logger
  153. lock sync.Mutex
  154. name string
  155. threshold, storeMin, storeMax, lastValue, min, max float64
  156. minmax, mmSet, mmZeroDefault bool
  157. }
  158. // Update updates the tracked value
  159. func (lc *Channel) Update(value float64) {
  160. if lc == nil {
  161. return
  162. }
  163. lc.lock.Lock()
  164. defer lc.lock.Unlock()
  165. lc.lastValue = value
  166. if lc.minmax {
  167. if value > lc.max || !lc.mmSet {
  168. lc.max = value
  169. }
  170. if value < lc.min || !lc.mmSet {
  171. lc.min = value
  172. }
  173. lc.mmSet = true
  174. } else {
  175. if value < lc.storeMin || value > lc.storeMax {
  176. select {
  177. case lc.logger.storeCh <- "":
  178. default:
  179. }
  180. }
  181. }
  182. }
  183. func (lc *Channel) store() (s string) {
  184. lc.lock.Lock()
  185. defer lc.lock.Unlock()
  186. if lc.minmax {
  187. s = fmt.Sprintf("%g, %g", lc.min, lc.max)
  188. lc.mmSet = false
  189. if lc.mmZeroDefault {
  190. lc.min = 0
  191. } else {
  192. lc.min = lc.lastValue
  193. }
  194. lc.max = lc.min
  195. } else {
  196. s = fmt.Sprintf("%g", lc.lastValue)
  197. lc.storeMin = lc.lastValue * (1 - lc.threshold)
  198. lc.storeMax = lc.lastValue * (1 + lc.threshold)
  199. if lc.lastValue < 0 {
  200. lc.storeMin, lc.storeMax = lc.storeMax, lc.storeMin
  201. }
  202. }
  203. return
  204. }
  205. func (lc *Channel) header() string {
  206. if lc.minmax {
  207. return lc.name + " (min), " + lc.name + " (max)"
  208. }
  209. return lc.name
  210. }