subscription_test.go 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156
  1. // Copyright 2016 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package event
  17. import (
  18. "context"
  19. "errors"
  20. "fmt"
  21. "reflect"
  22. "testing"
  23. "time"
  24. )
  25. var errInts = errors.New("error in subscribeInts")
  26. func subscribeInts(max, fail int, c chan<- int) Subscription {
  27. return NewSubscription(func(quit <-chan struct{}) error {
  28. for i := 0; i < max; i++ {
  29. if i >= fail {
  30. return errInts
  31. }
  32. select {
  33. case c <- i:
  34. case <-quit:
  35. return nil
  36. }
  37. }
  38. return nil
  39. })
  40. }
  41. func TestNewSubscriptionError(t *testing.T) {
  42. t.Parallel()
  43. channel := make(chan int)
  44. sub := subscribeInts(10, 2, channel)
  45. loop:
  46. for want := 0; want < 10; want++ {
  47. select {
  48. case got := <-channel:
  49. if got != want {
  50. t.Fatalf("wrong int %d, want %d", got, want)
  51. }
  52. case err := <-sub.Err():
  53. if err != errInts {
  54. t.Fatalf("wrong error: got %q, want %q", err, errInts)
  55. }
  56. if want != 2 {
  57. t.Fatalf("got errInts at int %d, should be received at 2", want)
  58. }
  59. break loop
  60. }
  61. }
  62. sub.Unsubscribe()
  63. err, ok := <-sub.Err()
  64. if err != nil {
  65. t.Fatal("got non-nil error after Unsubscribe")
  66. }
  67. if ok {
  68. t.Fatal("channel still open after Unsubscribe")
  69. }
  70. }
  71. func TestResubscribe(t *testing.T) {
  72. t.Parallel()
  73. var i int
  74. nfails := 6
  75. sub := Resubscribe(100*time.Millisecond, func(ctx context.Context) (Subscription, error) {
  76. // fmt.Printf("call #%d @ %v\n", i, time.Now())
  77. i++
  78. if i == 2 {
  79. // Delay the second failure a bit to reset the resubscribe interval.
  80. time.Sleep(200 * time.Millisecond)
  81. }
  82. if i < nfails {
  83. return nil, errors.New("oops")
  84. }
  85. sub := NewSubscription(func(unsubscribed <-chan struct{}) error { return nil })
  86. return sub, nil
  87. })
  88. <-sub.Err()
  89. if i != nfails {
  90. t.Fatalf("resubscribe function called %d times, want %d times", i, nfails)
  91. }
  92. }
  93. func TestResubscribeAbort(t *testing.T) {
  94. t.Parallel()
  95. done := make(chan error, 1)
  96. sub := Resubscribe(0, func(ctx context.Context) (Subscription, error) {
  97. select {
  98. case <-ctx.Done():
  99. done <- nil
  100. case <-time.After(2 * time.Second):
  101. done <- errors.New("context given to resubscribe function not canceled within 2s")
  102. }
  103. return nil, nil
  104. })
  105. sub.Unsubscribe()
  106. if err := <-done; err != nil {
  107. t.Fatal(err)
  108. }
  109. }
  110. func TestResubscribeWithErrorHandler(t *testing.T) {
  111. t.Parallel()
  112. var i int
  113. nfails := 6
  114. subErrs := make([]string, 0)
  115. sub := ResubscribeErr(100*time.Millisecond, func(ctx context.Context, lastErr error) (Subscription, error) {
  116. i++
  117. var lastErrVal string
  118. if lastErr != nil {
  119. lastErrVal = lastErr.Error()
  120. }
  121. subErrs = append(subErrs, lastErrVal)
  122. sub := NewSubscription(func(unsubscribed <-chan struct{}) error {
  123. if i < nfails {
  124. return fmt.Errorf("err-%v", i)
  125. } else {
  126. return nil
  127. }
  128. })
  129. return sub, nil
  130. })
  131. <-sub.Err()
  132. if i != nfails {
  133. t.Fatalf("resubscribe function called %d times, want %d times", i, nfails)
  134. }
  135. expectedSubErrs := []string{"", "err-1", "err-2", "err-3", "err-4", "err-5"}
  136. if !reflect.DeepEqual(subErrs, expectedSubErrs) {
  137. t.Fatalf("unexpected subscription errors %v, want %v", subErrs, expectedSubErrs)
  138. }
  139. }