sys.go 2.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126
  1. package logger
  2. import (
  3. "fmt"
  4. "sync"
  5. )
  6. type stdMsg struct {
  7. level LogLevel
  8. msg string
  9. }
  10. type jsonMsg []byte
  11. func (m jsonMsg) Level() LogLevel {
  12. return 0
  13. }
  14. func (m jsonMsg) String() string {
  15. return string(m)
  16. }
  17. type LogMsg interface {
  18. Level() LogLevel
  19. fmt.Stringer
  20. }
  21. func (m stdMsg) Level() LogLevel {
  22. return m.level
  23. }
  24. func (m stdMsg) String() string {
  25. return m.msg
  26. }
  27. var (
  28. logMessageC = make(chan LogMsg)
  29. addSystemC = make(chan LogSystem)
  30. flushC = make(chan chan struct{})
  31. resetC = make(chan chan struct{})
  32. )
  33. func init() {
  34. go dispatchLoop()
  35. }
  36. // each system can buffer this many messages before
  37. // blocking incoming log messages.
  38. const sysBufferSize = 500
  39. func dispatchLoop() {
  40. var (
  41. systems []LogSystem
  42. systemIn []chan LogMsg
  43. systemWG sync.WaitGroup
  44. )
  45. bootSystem := func(sys LogSystem) {
  46. in := make(chan LogMsg, sysBufferSize)
  47. systemIn = append(systemIn, in)
  48. systemWG.Add(1)
  49. go sysLoop(sys, in, &systemWG)
  50. }
  51. for {
  52. select {
  53. case msg := <-logMessageC:
  54. for _, c := range systemIn {
  55. c <- msg
  56. }
  57. case sys := <-addSystemC:
  58. systems = append(systems, sys)
  59. bootSystem(sys)
  60. case waiter := <-resetC:
  61. // reset means terminate all systems
  62. for _, c := range systemIn {
  63. close(c)
  64. }
  65. systems = nil
  66. systemIn = nil
  67. systemWG.Wait()
  68. close(waiter)
  69. case waiter := <-flushC:
  70. // flush means reboot all systems
  71. for _, c := range systemIn {
  72. close(c)
  73. }
  74. systemIn = nil
  75. systemWG.Wait()
  76. for _, sys := range systems {
  77. bootSystem(sys)
  78. }
  79. close(waiter)
  80. }
  81. }
  82. }
  83. func sysLoop(sys LogSystem, in <-chan LogMsg, wg *sync.WaitGroup) {
  84. for msg := range in {
  85. sys.LogPrint(msg)
  86. }
  87. wg.Done()
  88. }
  89. // Reset removes all active log systems.
  90. // It blocks until all current messages have been delivered.
  91. func Reset() {
  92. waiter := make(chan struct{})
  93. resetC <- waiter
  94. <-waiter
  95. }
  96. // Flush waits until all current log messages have been dispatched to
  97. // the active log systems.
  98. func Flush() {
  99. waiter := make(chan struct{})
  100. flushC <- waiter
  101. <-waiter
  102. }
  103. // AddLogSystem starts printing messages to the given LogSystem.
  104. func AddLogSystem(sys LogSystem) {
  105. addSystemC <- sys
  106. }