feed_test.go 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226
  1. // Copyright 2016 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package event
  17. import (
  18. "fmt"
  19. "reflect"
  20. "sync"
  21. "testing"
  22. "time"
  23. )
  24. func TestFeedPanics(t *testing.T) {
  25. {
  26. var f Feed
  27. f.Send(int(2))
  28. want := feedTypeError{op: "Send", got: reflect.TypeOf(uint64(0)), want: reflect.TypeOf(int(0))}
  29. if err := checkPanic(want, func() { f.Send(uint64(2)) }); err != nil {
  30. t.Error(err)
  31. }
  32. }
  33. {
  34. var f Feed
  35. ch := make(chan int)
  36. f.Subscribe(ch)
  37. want := feedTypeError{op: "Send", got: reflect.TypeOf(uint64(0)), want: reflect.TypeOf(int(0))}
  38. if err := checkPanic(want, func() { f.Send(uint64(2)) }); err != nil {
  39. t.Error(err)
  40. }
  41. }
  42. {
  43. var f Feed
  44. f.Send(int(2))
  45. want := feedTypeError{op: "Subscribe", got: reflect.TypeOf(make(chan uint64)), want: reflect.TypeOf(make(chan<- int))}
  46. if err := checkPanic(want, func() { f.Subscribe(make(chan uint64)) }); err != nil {
  47. t.Error(err)
  48. }
  49. }
  50. {
  51. var f Feed
  52. if err := checkPanic(errBadChannel, func() { f.Subscribe(make(<-chan int)) }); err != nil {
  53. t.Error(err)
  54. }
  55. }
  56. {
  57. var f Feed
  58. if err := checkPanic(errBadChannel, func() { f.Subscribe(int(0)) }); err != nil {
  59. t.Error(err)
  60. }
  61. }
  62. }
  63. func checkPanic(want error, fn func()) (err error) {
  64. defer func() {
  65. panic := recover()
  66. if panic == nil {
  67. err = fmt.Errorf("didn't panic")
  68. } else if !reflect.DeepEqual(panic, want) {
  69. err = fmt.Errorf("panicked with wrong error: got %q, want %q", panic, want)
  70. }
  71. }()
  72. fn()
  73. return nil
  74. }
  75. func TestFeed(t *testing.T) {
  76. var feed Feed
  77. var done, subscribed sync.WaitGroup
  78. subscriber := func(i int) {
  79. defer done.Done()
  80. subchan := make(chan int)
  81. sub := feed.Subscribe(subchan)
  82. timeout := time.NewTimer(2 * time.Second)
  83. subscribed.Done()
  84. select {
  85. case v := <-subchan:
  86. if v != 1 {
  87. t.Errorf("%d: received value %d, want 1", i, v)
  88. }
  89. case <-timeout.C:
  90. t.Errorf("%d: receive timeout", i)
  91. }
  92. sub.Unsubscribe()
  93. select {
  94. case _, ok := <-sub.Err():
  95. if ok {
  96. t.Errorf("%d: error channel not closed after unsubscribe", i)
  97. }
  98. case <-timeout.C:
  99. t.Errorf("%d: unsubscribe timeout", i)
  100. }
  101. }
  102. const n = 1000
  103. done.Add(n)
  104. subscribed.Add(n)
  105. for i := 0; i < n; i++ {
  106. go subscriber(i)
  107. }
  108. subscribed.Wait()
  109. if nsent := feed.Send(1); nsent != n {
  110. t.Errorf("first send delivered %d times, want %d", nsent, n)
  111. }
  112. if nsent := feed.Send(2); nsent != 0 {
  113. t.Errorf("second send delivered %d times, want 0", nsent)
  114. }
  115. done.Wait()
  116. }
  117. func TestFeedSubscribeSameChannel(t *testing.T) {
  118. var (
  119. feed Feed
  120. done sync.WaitGroup
  121. ch = make(chan int)
  122. sub1 = feed.Subscribe(ch)
  123. sub2 = feed.Subscribe(ch)
  124. _ = feed.Subscribe(ch)
  125. )
  126. expectSends := func(value, n int) {
  127. if nsent := feed.Send(value); nsent != n {
  128. t.Errorf("send delivered %d times, want %d", nsent, n)
  129. }
  130. done.Done()
  131. }
  132. expectRecv := func(wantValue, n int) {
  133. for i := 0; i < n; i++ {
  134. if v := <-ch; v != wantValue {
  135. t.Errorf("received %d, want %d", v, wantValue)
  136. }
  137. }
  138. }
  139. done.Add(1)
  140. go expectSends(1, 3)
  141. expectRecv(1, 3)
  142. done.Wait()
  143. sub1.Unsubscribe()
  144. done.Add(1)
  145. go expectSends(2, 2)
  146. expectRecv(2, 2)
  147. done.Wait()
  148. sub2.Unsubscribe()
  149. done.Add(1)
  150. go expectSends(3, 1)
  151. expectRecv(3, 1)
  152. done.Wait()
  153. }
  154. func TestFeedUnsubscribeFromInbox(t *testing.T) {
  155. var (
  156. feed Feed
  157. ch1 = make(chan int)
  158. ch2 = make(chan int)
  159. sub1 = feed.Subscribe(ch1)
  160. sub2 = feed.Subscribe(ch1)
  161. sub3 = feed.Subscribe(ch2)
  162. )
  163. if len(feed.inbox) != 3 {
  164. t.Errorf("inbox length != 3 after subscribe")
  165. }
  166. if len(feed.sendCases) != 1 {
  167. t.Errorf("sendCases is non-empty after unsubscribe")
  168. }
  169. sub1.Unsubscribe()
  170. sub2.Unsubscribe()
  171. sub3.Unsubscribe()
  172. if len(feed.inbox) != 0 {
  173. t.Errorf("inbox is non-empty after unsubscribe")
  174. }
  175. if len(feed.sendCases) != 1 {
  176. t.Errorf("sendCases is non-empty after unsubscribe")
  177. }
  178. }
  179. func BenchmarkFeedSend1000(b *testing.B) {
  180. var (
  181. done sync.WaitGroup
  182. feed Feed
  183. nsubs = 1000
  184. )
  185. subscriber := func(ch <-chan int) {
  186. for i := 0; i < b.N; i++ {
  187. <-ch
  188. }
  189. done.Done()
  190. }
  191. done.Add(nsubs)
  192. for i := 0; i < nsubs; i++ {
  193. ch := make(chan int, 200)
  194. feed.Subscribe(ch)
  195. go subscriber(ch)
  196. }
  197. // The actual benchmark.
  198. b.ResetTimer()
  199. for i := 0; i < b.N; i++ {
  200. if feed.Send(i) != nsubs {
  201. panic("wrong number of sends")
  202. }
  203. }
  204. b.StopTimer()
  205. done.Wait()
  206. }