feed_test.go 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294
  1. // Copyright 2016 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package event
  17. import (
  18. "fmt"
  19. "reflect"
  20. "sync"
  21. "testing"
  22. "time"
  23. )
  24. func TestFeedPanics(t *testing.T) {
  25. {
  26. var f Feed
  27. f.Send(int(2))
  28. want := feedTypeError{op: "Send", got: reflect.TypeOf(uint64(0)), want: reflect.TypeOf(int(0))}
  29. if err := checkPanic(want, func() { f.Send(uint64(2)) }); err != nil {
  30. t.Error(err)
  31. }
  32. }
  33. {
  34. var f Feed
  35. ch := make(chan int)
  36. f.Subscribe(ch)
  37. want := feedTypeError{op: "Send", got: reflect.TypeOf(uint64(0)), want: reflect.TypeOf(int(0))}
  38. if err := checkPanic(want, func() { f.Send(uint64(2)) }); err != nil {
  39. t.Error(err)
  40. }
  41. }
  42. {
  43. var f Feed
  44. f.Send(int(2))
  45. want := feedTypeError{op: "Subscribe", got: reflect.TypeOf(make(chan uint64)), want: reflect.TypeOf(make(chan<- int))}
  46. if err := checkPanic(want, func() { f.Subscribe(make(chan uint64)) }); err != nil {
  47. t.Error(err)
  48. }
  49. }
  50. {
  51. var f Feed
  52. if err := checkPanic(errBadChannel, func() { f.Subscribe(make(<-chan int)) }); err != nil {
  53. t.Error(err)
  54. }
  55. }
  56. {
  57. var f Feed
  58. if err := checkPanic(errBadChannel, func() { f.Subscribe(int(0)) }); err != nil {
  59. t.Error(err)
  60. }
  61. }
  62. }
  63. func checkPanic(want error, fn func()) (err error) {
  64. defer func() {
  65. panic := recover()
  66. if panic == nil {
  67. err = fmt.Errorf("didn't panic")
  68. } else if !reflect.DeepEqual(panic, want) {
  69. err = fmt.Errorf("panicked with wrong error: got %q, want %q", panic, want)
  70. }
  71. }()
  72. fn()
  73. return nil
  74. }
  75. func TestFeed(t *testing.T) {
  76. var feed Feed
  77. var done, subscribed sync.WaitGroup
  78. subscriber := func(i int) {
  79. defer done.Done()
  80. subchan := make(chan int)
  81. sub := feed.Subscribe(subchan)
  82. timeout := time.NewTimer(2 * time.Second)
  83. subscribed.Done()
  84. select {
  85. case v := <-subchan:
  86. if v != 1 {
  87. t.Errorf("%d: received value %d, want 1", i, v)
  88. }
  89. case <-timeout.C:
  90. t.Errorf("%d: receive timeout", i)
  91. }
  92. sub.Unsubscribe()
  93. select {
  94. case _, ok := <-sub.Err():
  95. if ok {
  96. t.Errorf("%d: error channel not closed after unsubscribe", i)
  97. }
  98. case <-timeout.C:
  99. t.Errorf("%d: unsubscribe timeout", i)
  100. }
  101. }
  102. const n = 1000
  103. done.Add(n)
  104. subscribed.Add(n)
  105. for i := 0; i < n; i++ {
  106. go subscriber(i)
  107. }
  108. subscribed.Wait()
  109. if nsent := feed.Send(1); nsent != n {
  110. t.Errorf("first send delivered %d times, want %d", nsent, n)
  111. }
  112. if nsent := feed.Send(2); nsent != 0 {
  113. t.Errorf("second send delivered %d times, want 0", nsent)
  114. }
  115. done.Wait()
  116. }
  117. func TestFeedSubscribeSameChannel(t *testing.T) {
  118. var (
  119. feed Feed
  120. done sync.WaitGroup
  121. ch = make(chan int)
  122. sub1 = feed.Subscribe(ch)
  123. sub2 = feed.Subscribe(ch)
  124. _ = feed.Subscribe(ch)
  125. )
  126. expectSends := func(value, n int) {
  127. if nsent := feed.Send(value); nsent != n {
  128. t.Errorf("send delivered %d times, want %d", nsent, n)
  129. }
  130. done.Done()
  131. }
  132. expectRecv := func(wantValue, n int) {
  133. for i := 0; i < n; i++ {
  134. if v := <-ch; v != wantValue {
  135. t.Errorf("received %d, want %d", v, wantValue)
  136. }
  137. }
  138. }
  139. done.Add(1)
  140. go expectSends(1, 3)
  141. expectRecv(1, 3)
  142. done.Wait()
  143. sub1.Unsubscribe()
  144. done.Add(1)
  145. go expectSends(2, 2)
  146. expectRecv(2, 2)
  147. done.Wait()
  148. sub2.Unsubscribe()
  149. done.Add(1)
  150. go expectSends(3, 1)
  151. expectRecv(3, 1)
  152. done.Wait()
  153. }
  154. func TestFeedSubscribeBlockedPost(t *testing.T) {
  155. var (
  156. feed Feed
  157. nsends = 2000
  158. ch1 = make(chan int)
  159. ch2 = make(chan int)
  160. wg sync.WaitGroup
  161. )
  162. defer wg.Wait()
  163. feed.Subscribe(ch1)
  164. wg.Add(nsends)
  165. for i := 0; i < nsends; i++ {
  166. go func() {
  167. feed.Send(99)
  168. wg.Done()
  169. }()
  170. }
  171. sub2 := feed.Subscribe(ch2)
  172. defer sub2.Unsubscribe()
  173. // We're done when ch1 has received N times.
  174. // The number of receives on ch2 depends on scheduling.
  175. for i := 0; i < nsends; {
  176. select {
  177. case <-ch1:
  178. i++
  179. case <-ch2:
  180. }
  181. }
  182. }
  183. func TestFeedUnsubscribeBlockedPost(t *testing.T) {
  184. var (
  185. feed Feed
  186. nsends = 200
  187. chans = make([]chan int, 2000)
  188. subs = make([]Subscription, len(chans))
  189. bchan = make(chan int)
  190. bsub = feed.Subscribe(bchan)
  191. wg sync.WaitGroup
  192. )
  193. for i := range chans {
  194. chans[i] = make(chan int, nsends)
  195. }
  196. // Queue up some Sends. None of these can make progress while bchan isn't read.
  197. wg.Add(nsends)
  198. for i := 0; i < nsends; i++ {
  199. go func() {
  200. feed.Send(99)
  201. wg.Done()
  202. }()
  203. }
  204. // Subscribe the other channels.
  205. for i, ch := range chans {
  206. subs[i] = feed.Subscribe(ch)
  207. }
  208. // Unsubscribe them again.
  209. for _, sub := range subs {
  210. sub.Unsubscribe()
  211. }
  212. // Unblock the Sends.
  213. bsub.Unsubscribe()
  214. wg.Wait()
  215. }
  216. func TestFeedUnsubscribeFromInbox(t *testing.T) {
  217. var (
  218. feed Feed
  219. ch1 = make(chan int)
  220. ch2 = make(chan int)
  221. sub1 = feed.Subscribe(ch1)
  222. sub2 = feed.Subscribe(ch1)
  223. sub3 = feed.Subscribe(ch2)
  224. )
  225. if len(feed.inbox) != 3 {
  226. t.Errorf("inbox length != 3 after subscribe")
  227. }
  228. if len(feed.sendCases) != 1 {
  229. t.Errorf("sendCases is non-empty after unsubscribe")
  230. }
  231. sub1.Unsubscribe()
  232. sub2.Unsubscribe()
  233. sub3.Unsubscribe()
  234. if len(feed.inbox) != 0 {
  235. t.Errorf("inbox is non-empty after unsubscribe")
  236. }
  237. if len(feed.sendCases) != 1 {
  238. t.Errorf("sendCases is non-empty after unsubscribe")
  239. }
  240. }
  241. func BenchmarkFeedSend1000(b *testing.B) {
  242. var (
  243. done sync.WaitGroup
  244. feed Feed
  245. nsubs = 1000
  246. )
  247. subscriber := func(ch <-chan int) {
  248. for i := 0; i < b.N; i++ {
  249. <-ch
  250. }
  251. done.Done()
  252. }
  253. done.Add(nsubs)
  254. for i := 0; i < nsubs; i++ {
  255. ch := make(chan int, 200)
  256. feed.Subscribe(ch)
  257. go subscriber(ch)
  258. }
  259. // The actual benchmark.
  260. b.ResetTimer()
  261. for i := 0; i < b.N; i++ {
  262. if feed.Send(i) != nsubs {
  263. panic("wrong number of sends")
  264. }
  265. }
  266. b.StopTimer()
  267. done.Wait()
  268. }