async_file_writer.go 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214
  1. package log
  2. import (
  3. "errors"
  4. "fmt"
  5. "os"
  6. "path/filepath"
  7. "sync"
  8. "sync/atomic"
  9. "time"
  10. )
  11. type HourTicker struct {
  12. stop chan struct{}
  13. C <-chan time.Time
  14. }
  15. func NewHourTicker() *HourTicker {
  16. ht := &HourTicker{
  17. stop: make(chan struct{}),
  18. }
  19. ht.C = ht.Ticker()
  20. return ht
  21. }
  22. func (ht *HourTicker) Stop() {
  23. ht.stop <- struct{}{}
  24. }
  25. func (ht *HourTicker) Ticker() <-chan time.Time {
  26. ch := make(chan time.Time)
  27. go func() {
  28. hour := time.Now().Hour()
  29. ticker := time.NewTicker(time.Second)
  30. defer ticker.Stop()
  31. for {
  32. select {
  33. case t := <-ticker.C:
  34. if t.Hour() != hour {
  35. ch <- t
  36. hour = t.Hour()
  37. }
  38. case <-ht.stop:
  39. return
  40. }
  41. }
  42. }()
  43. return ch
  44. }
  45. type AsyncFileWriter struct {
  46. filePath string
  47. fd *os.File
  48. wg sync.WaitGroup
  49. started int32
  50. buf chan []byte
  51. stop chan struct{}
  52. hourTicker *HourTicker
  53. }
  54. func NewAsyncFileWriter(filePath string, bufSize int64) *AsyncFileWriter {
  55. absFilePath, err := filepath.Abs(filePath)
  56. if err != nil {
  57. panic(fmt.Sprintf("get file path of logger error. filePath=%s, err=%s", filePath, err))
  58. }
  59. return &AsyncFileWriter{
  60. filePath: absFilePath,
  61. buf: make(chan []byte, bufSize),
  62. stop: make(chan struct{}),
  63. hourTicker: NewHourTicker(),
  64. }
  65. }
  66. func (w *AsyncFileWriter) initLogFile() error {
  67. var (
  68. fd *os.File
  69. err error
  70. )
  71. realFilePath := w.timeFilePath(w.filePath)
  72. fd, err = os.OpenFile(realFilePath, os.O_CREATE|os.O_APPEND|os.O_RDWR, 0644)
  73. if err != nil {
  74. return err
  75. }
  76. w.fd = fd
  77. _, err = os.Lstat(w.filePath)
  78. if err == nil || os.IsExist(err) {
  79. err = os.Remove(w.filePath)
  80. if err != nil {
  81. return err
  82. }
  83. }
  84. err = os.Symlink(realFilePath, w.filePath)
  85. if err != nil {
  86. return err
  87. }
  88. return nil
  89. }
  90. func (w *AsyncFileWriter) Start() error {
  91. if !atomic.CompareAndSwapInt32(&w.started, 0, 1) {
  92. return errors.New("logger has already been started")
  93. }
  94. err := w.initLogFile()
  95. if err != nil {
  96. return err
  97. }
  98. w.wg.Add(1)
  99. go func() {
  100. defer func() {
  101. atomic.StoreInt32(&w.started, 0)
  102. w.flushBuffer()
  103. w.flushAndClose()
  104. w.wg.Done()
  105. }()
  106. for {
  107. select {
  108. case msg, ok := <-w.buf:
  109. if !ok {
  110. fmt.Fprintln(os.Stderr, "buf channel has been closed.")
  111. return
  112. }
  113. w.SyncWrite(msg)
  114. case <-w.stop:
  115. return
  116. }
  117. }
  118. }()
  119. return nil
  120. }
  121. func (w *AsyncFileWriter) flushBuffer() {
  122. for {
  123. select {
  124. case msg := <-w.buf:
  125. w.SyncWrite(msg)
  126. default:
  127. return
  128. }
  129. }
  130. }
  131. func (w *AsyncFileWriter) SyncWrite(msg []byte) {
  132. w.rotateFile()
  133. if w.fd != nil {
  134. w.fd.Write(msg)
  135. }
  136. }
  137. func (w *AsyncFileWriter) rotateFile() {
  138. select {
  139. case <-w.hourTicker.C:
  140. if err := w.flushAndClose(); err != nil {
  141. fmt.Fprintf(os.Stderr, "flush and close file error. err=%s", err)
  142. }
  143. if err := w.initLogFile(); err != nil {
  144. fmt.Fprintf(os.Stderr, "init log file error. err=%s", err)
  145. }
  146. default:
  147. }
  148. }
  149. func (w *AsyncFileWriter) Stop() {
  150. w.stop <- struct{}{}
  151. w.wg.Wait()
  152. w.hourTicker.Stop()
  153. }
  154. func (w *AsyncFileWriter) Write(msg []byte) (n int, err error) {
  155. // TODO(wuzhenxing): for the underlying array may change, is there a better way to avoid copying slice?
  156. buf := make([]byte, len(msg))
  157. copy(buf, msg)
  158. select {
  159. case w.buf <- buf:
  160. default:
  161. }
  162. return 0, nil
  163. }
  164. func (w *AsyncFileWriter) Flush() error {
  165. if w.fd == nil {
  166. return nil
  167. }
  168. return w.fd.Sync()
  169. }
  170. func (w *AsyncFileWriter) flushAndClose() error {
  171. if w.fd == nil {
  172. return nil
  173. }
  174. err := w.fd.Sync()
  175. if err != nil {
  176. return err
  177. }
  178. return w.fd.Close()
  179. }
  180. func (w *AsyncFileWriter) timeFilePath(filePath string) string {
  181. return filePath + "." + time.Now().Format("2006-01-02_15")
  182. }