feed_test.go 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334
  1. // Copyright 2016 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package event
  17. import (
  18. "fmt"
  19. "reflect"
  20. "sync"
  21. "testing"
  22. "time"
  23. )
  24. func TestFeedPanics(t *testing.T) {
  25. {
  26. var f Feed
  27. f.Send(2)
  28. want := feedTypeError{op: "Send", got: reflect.TypeOf(uint64(0)), want: reflect.TypeOf(0)}
  29. if err := checkPanic(want, func() { f.Send(uint64(2)) }); err != nil {
  30. t.Error(err)
  31. }
  32. }
  33. {
  34. var f Feed
  35. ch := make(chan int)
  36. f.Subscribe(ch)
  37. want := feedTypeError{op: "Send", got: reflect.TypeOf(uint64(0)), want: reflect.TypeOf(0)}
  38. if err := checkPanic(want, func() { f.Send(uint64(2)) }); err != nil {
  39. t.Error(err)
  40. }
  41. }
  42. {
  43. var f Feed
  44. f.Send(2)
  45. want := feedTypeError{op: "Subscribe", got: reflect.TypeOf(make(chan uint64)), want: reflect.TypeOf(make(chan<- int))}
  46. if err := checkPanic(want, func() { f.Subscribe(make(chan uint64)) }); err != nil {
  47. t.Error(err)
  48. }
  49. }
  50. {
  51. var f Feed
  52. if err := checkPanic(errBadChannel, func() { f.Subscribe(make(<-chan int)) }); err != nil {
  53. t.Error(err)
  54. }
  55. }
  56. {
  57. var f Feed
  58. if err := checkPanic(errBadChannel, func() { f.Subscribe(0) }); err != nil {
  59. t.Error(err)
  60. }
  61. }
  62. }
  63. func checkPanic(want error, fn func()) (err error) {
  64. defer func() {
  65. panic := recover()
  66. if panic == nil {
  67. err = fmt.Errorf("didn't panic")
  68. } else if !reflect.DeepEqual(panic, want) {
  69. err = fmt.Errorf("panicked with wrong error: got %q, want %q", panic, want)
  70. }
  71. }()
  72. fn()
  73. return nil
  74. }
  75. func TestFeed(t *testing.T) {
  76. var feed Feed
  77. var done, subscribed sync.WaitGroup
  78. subscriber := func(i int) {
  79. defer done.Done()
  80. subchan := make(chan int)
  81. sub := feed.Subscribe(subchan)
  82. timeout := time.NewTimer(2 * time.Second)
  83. defer timeout.Stop()
  84. subscribed.Done()
  85. select {
  86. case v := <-subchan:
  87. if v != 1 {
  88. t.Errorf("%d: received value %d, want 1", i, v)
  89. }
  90. case <-timeout.C:
  91. t.Errorf("%d: receive timeout", i)
  92. }
  93. sub.Unsubscribe()
  94. select {
  95. case _, ok := <-sub.Err():
  96. if ok {
  97. t.Errorf("%d: error channel not closed after unsubscribe", i)
  98. }
  99. case <-timeout.C:
  100. t.Errorf("%d: unsubscribe timeout", i)
  101. }
  102. }
  103. const n = 1000
  104. done.Add(n)
  105. subscribed.Add(n)
  106. for i := 0; i < n; i++ {
  107. go subscriber(i)
  108. }
  109. subscribed.Wait()
  110. if nsent := feed.Send(1); nsent != n {
  111. t.Errorf("first send delivered %d times, want %d", nsent, n)
  112. }
  113. if nsent := feed.Send(2); nsent != 0 {
  114. t.Errorf("second send delivered %d times, want 0", nsent)
  115. }
  116. done.Wait()
  117. }
  118. func TestFeedSubscribeSameChannel(t *testing.T) {
  119. var (
  120. feed Feed
  121. done sync.WaitGroup
  122. ch = make(chan int)
  123. sub1 = feed.Subscribe(ch)
  124. sub2 = feed.Subscribe(ch)
  125. _ = feed.Subscribe(ch)
  126. )
  127. expectSends := func(value, n int) {
  128. if nsent := feed.Send(value); nsent != n {
  129. t.Errorf("send delivered %d times, want %d", nsent, n)
  130. }
  131. done.Done()
  132. }
  133. expectRecv := func(wantValue, n int) {
  134. for i := 0; i < n; i++ {
  135. if v := <-ch; v != wantValue {
  136. t.Errorf("received %d, want %d", v, wantValue)
  137. }
  138. }
  139. }
  140. done.Add(1)
  141. go expectSends(1, 3)
  142. expectRecv(1, 3)
  143. done.Wait()
  144. sub1.Unsubscribe()
  145. done.Add(1)
  146. go expectSends(2, 2)
  147. expectRecv(2, 2)
  148. done.Wait()
  149. sub2.Unsubscribe()
  150. done.Add(1)
  151. go expectSends(3, 1)
  152. expectRecv(3, 1)
  153. done.Wait()
  154. }
  155. func TestFeedSubscribeBlockedPost(t *testing.T) {
  156. var (
  157. feed Feed
  158. nsends = 2000
  159. ch1 = make(chan int)
  160. ch2 = make(chan int)
  161. wg sync.WaitGroup
  162. )
  163. defer wg.Wait()
  164. feed.Subscribe(ch1)
  165. wg.Add(nsends)
  166. for i := 0; i < nsends; i++ {
  167. go func() {
  168. feed.Send(99)
  169. wg.Done()
  170. }()
  171. }
  172. sub2 := feed.Subscribe(ch2)
  173. defer sub2.Unsubscribe()
  174. // We're done when ch1 has received N times.
  175. // The number of receives on ch2 depends on scheduling.
  176. for i := 0; i < nsends; {
  177. select {
  178. case <-ch1:
  179. i++
  180. case <-ch2:
  181. }
  182. }
  183. }
  184. func TestFeedUnsubscribeBlockedPost(t *testing.T) {
  185. var (
  186. feed Feed
  187. nsends = 200
  188. chans = make([]chan int, 2000)
  189. subs = make([]Subscription, len(chans))
  190. bchan = make(chan int)
  191. bsub = feed.Subscribe(bchan)
  192. wg sync.WaitGroup
  193. )
  194. for i := range chans {
  195. chans[i] = make(chan int, nsends)
  196. }
  197. // Queue up some Sends. None of these can make progress while bchan isn't read.
  198. wg.Add(nsends)
  199. for i := 0; i < nsends; i++ {
  200. go func() {
  201. feed.Send(99)
  202. wg.Done()
  203. }()
  204. }
  205. // Subscribe the other channels.
  206. for i, ch := range chans {
  207. subs[i] = feed.Subscribe(ch)
  208. }
  209. // Unsubscribe them again.
  210. for _, sub := range subs {
  211. sub.Unsubscribe()
  212. }
  213. // Unblock the Sends.
  214. bsub.Unsubscribe()
  215. wg.Wait()
  216. }
  217. // Checks that unsubscribing a channel during Send works even if that
  218. // channel has already been sent on.
  219. func TestFeedUnsubscribeSentChan(t *testing.T) {
  220. var (
  221. feed Feed
  222. ch1 = make(chan int)
  223. ch2 = make(chan int)
  224. sub1 = feed.Subscribe(ch1)
  225. sub2 = feed.Subscribe(ch2)
  226. wg sync.WaitGroup
  227. )
  228. defer sub2.Unsubscribe()
  229. wg.Add(1)
  230. go func() {
  231. feed.Send(0)
  232. wg.Done()
  233. }()
  234. // Wait for the value on ch1.
  235. <-ch1
  236. // Unsubscribe ch1, removing it from the send cases.
  237. sub1.Unsubscribe()
  238. // Receive ch2, finishing Send.
  239. <-ch2
  240. wg.Wait()
  241. // Send again. This should send to ch2 only, so the wait group will unblock
  242. // as soon as a value is received on ch2.
  243. wg.Add(1)
  244. go func() {
  245. feed.Send(0)
  246. wg.Done()
  247. }()
  248. <-ch2
  249. wg.Wait()
  250. }
  251. func TestFeedUnsubscribeFromInbox(t *testing.T) {
  252. var (
  253. feed Feed
  254. ch1 = make(chan int)
  255. ch2 = make(chan int)
  256. sub1 = feed.Subscribe(ch1)
  257. sub2 = feed.Subscribe(ch1)
  258. sub3 = feed.Subscribe(ch2)
  259. )
  260. if len(feed.inbox) != 3 {
  261. t.Errorf("inbox length != 3 after subscribe")
  262. }
  263. if len(feed.sendCases) != 1 {
  264. t.Errorf("sendCases is non-empty after unsubscribe")
  265. }
  266. sub1.Unsubscribe()
  267. sub2.Unsubscribe()
  268. sub3.Unsubscribe()
  269. if len(feed.inbox) != 0 {
  270. t.Errorf("inbox is non-empty after unsubscribe")
  271. }
  272. if len(feed.sendCases) != 1 {
  273. t.Errorf("sendCases is non-empty after unsubscribe")
  274. }
  275. }
  276. func BenchmarkFeedSend1000(b *testing.B) {
  277. var (
  278. done sync.WaitGroup
  279. feed Feed
  280. nsubs = 1000
  281. )
  282. subscriber := func(ch <-chan int) {
  283. for i := 0; i < b.N; i++ {
  284. <-ch
  285. }
  286. done.Done()
  287. }
  288. done.Add(nsubs)
  289. for i := 0; i < nsubs; i++ {
  290. ch := make(chan int, 200)
  291. feed.Subscribe(ch)
  292. go subscriber(ch)
  293. }
  294. // The actual benchmark.
  295. b.ResetTimer()
  296. for i := 0; i < b.N; i++ {
  297. if feed.Send(i) != nsubs {
  298. panic("wrong number of sends")
  299. }
  300. }
  301. b.StopTimer()
  302. done.Wait()
  303. }