event_test.go 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192
  1. // Copyright 2014 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package event
  17. import (
  18. "math/rand"
  19. "sync"
  20. "testing"
  21. "time"
  22. )
  23. type testEvent int
  24. func TestSub(t *testing.T) {
  25. mux := new(TypeMux)
  26. defer mux.Stop()
  27. sub := mux.Subscribe(testEvent(0))
  28. go func() {
  29. if err := mux.Post(testEvent(5)); err != nil {
  30. t.Errorf("Post returned unexpected error: %v", err)
  31. }
  32. }()
  33. ev := <-sub.Chan()
  34. if ev.Data.(testEvent) != testEvent(5) {
  35. t.Errorf("Got %v (%T), expected event %v (%T)",
  36. ev, ev, testEvent(5), testEvent(5))
  37. }
  38. }
  39. func TestMuxErrorAfterStop(t *testing.T) {
  40. mux := new(TypeMux)
  41. mux.Stop()
  42. sub := mux.Subscribe(testEvent(0))
  43. if _, isopen := <-sub.Chan(); isopen {
  44. t.Errorf("subscription channel was not closed")
  45. }
  46. if err := mux.Post(testEvent(0)); err != ErrMuxClosed {
  47. t.Errorf("Post error mismatch, got: %s, expected: %s", err, ErrMuxClosed)
  48. }
  49. }
  50. func TestUnsubscribeUnblockPost(t *testing.T) {
  51. mux := new(TypeMux)
  52. defer mux.Stop()
  53. sub := mux.Subscribe(testEvent(0))
  54. unblocked := make(chan bool)
  55. go func() {
  56. mux.Post(testEvent(5))
  57. unblocked <- true
  58. }()
  59. select {
  60. case <-unblocked:
  61. t.Errorf("Post returned before Unsubscribe")
  62. default:
  63. sub.Unsubscribe()
  64. <-unblocked
  65. }
  66. }
  67. func TestSubscribeDuplicateType(t *testing.T) {
  68. mux := new(TypeMux)
  69. expected := "event: duplicate type event.testEvent in Subscribe"
  70. defer func() {
  71. err := recover()
  72. if err == nil {
  73. t.Errorf("Subscribe didn't panic for duplicate type")
  74. } else if err != expected {
  75. t.Errorf("panic mismatch: got %#v, expected %#v", err, expected)
  76. }
  77. }()
  78. mux.Subscribe(testEvent(1), testEvent(2))
  79. }
  80. func TestMuxConcurrent(t *testing.T) {
  81. rand.Seed(time.Now().Unix())
  82. mux := new(TypeMux)
  83. defer mux.Stop()
  84. recv := make(chan int)
  85. poster := func() {
  86. for {
  87. err := mux.Post(testEvent(0))
  88. if err != nil {
  89. return
  90. }
  91. }
  92. }
  93. sub := func(i int) {
  94. time.Sleep(time.Duration(rand.Intn(99)) * time.Millisecond)
  95. sub := mux.Subscribe(testEvent(0))
  96. <-sub.Chan()
  97. sub.Unsubscribe()
  98. recv <- i
  99. }
  100. go poster()
  101. go poster()
  102. go poster()
  103. nsubs := 1000
  104. for i := 0; i < nsubs; i++ {
  105. go sub(i)
  106. }
  107. // wait until everyone has been served
  108. counts := make(map[int]int, nsubs)
  109. for i := 0; i < nsubs; i++ {
  110. counts[<-recv]++
  111. }
  112. for i, count := range counts {
  113. if count != 1 {
  114. t.Errorf("receiver %d called %d times, expected only 1 call", i, count)
  115. }
  116. }
  117. }
  118. func emptySubscriber(mux *TypeMux, types ...interface{}) {
  119. s := mux.Subscribe(testEvent(0))
  120. go func() {
  121. for _ = range s.Chan() {
  122. }
  123. }()
  124. }
  125. func BenchmarkPost3(b *testing.B) {
  126. var mux = new(TypeMux)
  127. defer mux.Stop()
  128. emptySubscriber(mux, testEvent(0))
  129. emptySubscriber(mux, testEvent(0))
  130. emptySubscriber(mux, testEvent(0))
  131. for i := 0; i < b.N; i++ {
  132. mux.Post(testEvent(0))
  133. }
  134. }
  135. func BenchmarkPostConcurrent(b *testing.B) {
  136. var mux = new(TypeMux)
  137. defer mux.Stop()
  138. emptySubscriber(mux, testEvent(0))
  139. emptySubscriber(mux, testEvent(0))
  140. emptySubscriber(mux, testEvent(0))
  141. var wg sync.WaitGroup
  142. poster := func() {
  143. for i := 0; i < b.N; i++ {
  144. mux.Post(testEvent(0))
  145. }
  146. wg.Done()
  147. }
  148. wg.Add(5)
  149. for i := 0; i < 5; i++ {
  150. go poster()
  151. }
  152. wg.Wait()
  153. }
  154. // for comparison
  155. func BenchmarkChanSend(b *testing.B) {
  156. c := make(chan interface{})
  157. closed := make(chan struct{})
  158. go func() {
  159. for _ = range c {
  160. }
  161. }()
  162. for i := 0; i < b.N; i++ {
  163. select {
  164. case c <- i:
  165. case <-closed:
  166. }
  167. }
  168. }