distributor_test.go 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192
  1. // Copyright 2017 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. // Package light implements on-demand retrieval capable state and chain objects
  17. // for the Ethereum Light Client.
  18. package les
  19. import (
  20. "math/rand"
  21. "sync"
  22. "testing"
  23. "time"
  24. )
  25. type testDistReq struct {
  26. cost, procTime, order uint64
  27. canSendTo map[*testDistPeer]struct{}
  28. }
  29. func (r *testDistReq) getCost(dp distPeer) uint64 {
  30. return r.cost
  31. }
  32. func (r *testDistReq) canSend(dp distPeer) bool {
  33. _, ok := r.canSendTo[dp.(*testDistPeer)]
  34. return ok
  35. }
  36. func (r *testDistReq) request(dp distPeer) func() {
  37. return func() { dp.(*testDistPeer).send(r) }
  38. }
  39. type testDistPeer struct {
  40. sent []*testDistReq
  41. sumCost uint64
  42. lock sync.RWMutex
  43. }
  44. func (p *testDistPeer) send(r *testDistReq) {
  45. p.lock.Lock()
  46. defer p.lock.Unlock()
  47. p.sent = append(p.sent, r)
  48. p.sumCost += r.cost
  49. }
  50. func (p *testDistPeer) worker(t *testing.T, checkOrder bool, stop chan struct{}) {
  51. var last uint64
  52. for {
  53. wait := time.Millisecond
  54. p.lock.Lock()
  55. if len(p.sent) > 0 {
  56. rq := p.sent[0]
  57. wait = time.Duration(rq.procTime)
  58. p.sumCost -= rq.cost
  59. if checkOrder {
  60. if rq.order <= last {
  61. t.Errorf("Requests processed in wrong order")
  62. }
  63. last = rq.order
  64. }
  65. p.sent = p.sent[1:]
  66. }
  67. p.lock.Unlock()
  68. select {
  69. case <-stop:
  70. return
  71. case <-time.After(wait):
  72. }
  73. }
  74. }
  75. const (
  76. testDistBufLimit = 10000000
  77. testDistMaxCost = 1000000
  78. testDistPeerCount = 5
  79. testDistReqCount = 50000
  80. testDistMaxResendCount = 3
  81. )
  82. func (p *testDistPeer) waitBefore(cost uint64) (time.Duration, float64) {
  83. p.lock.RLock()
  84. sumCost := p.sumCost + cost
  85. p.lock.RUnlock()
  86. if sumCost < testDistBufLimit {
  87. return 0, float64(testDistBufLimit-sumCost) / float64(testDistBufLimit)
  88. } else {
  89. return time.Duration(sumCost - testDistBufLimit), 0
  90. }
  91. }
  92. func (p *testDistPeer) canQueue() bool {
  93. return true
  94. }
  95. func (p *testDistPeer) queueSend(f func()) {
  96. f()
  97. }
  98. func TestRequestDistributor(t *testing.T) {
  99. testRequestDistributor(t, false)
  100. }
  101. func TestRequestDistributorResend(t *testing.T) {
  102. testRequestDistributor(t, true)
  103. }
  104. func testRequestDistributor(t *testing.T, resend bool) {
  105. stop := make(chan struct{})
  106. defer close(stop)
  107. var peers [testDistPeerCount]*testDistPeer
  108. for i, _ := range peers {
  109. peers[i] = &testDistPeer{}
  110. go peers[i].worker(t, !resend, stop)
  111. }
  112. dist := newRequestDistributor(func() map[distPeer]struct{} {
  113. m := make(map[distPeer]struct{})
  114. for _, peer := range peers {
  115. m[peer] = struct{}{}
  116. }
  117. return m
  118. }, stop)
  119. var wg sync.WaitGroup
  120. for i := 1; i <= testDistReqCount; i++ {
  121. cost := uint64(rand.Int63n(testDistMaxCost))
  122. procTime := uint64(rand.Int63n(int64(cost + 1)))
  123. rq := &testDistReq{
  124. cost: cost,
  125. procTime: procTime,
  126. order: uint64(i),
  127. canSendTo: make(map[*testDistPeer]struct{}),
  128. }
  129. for _, peer := range peers {
  130. if rand.Intn(2) != 0 {
  131. rq.canSendTo[peer] = struct{}{}
  132. }
  133. }
  134. wg.Add(1)
  135. req := &distReq{
  136. getCost: rq.getCost,
  137. canSend: rq.canSend,
  138. request: rq.request,
  139. }
  140. chn := dist.queue(req)
  141. go func() {
  142. cnt := 1
  143. if resend && len(rq.canSendTo) != 0 {
  144. cnt = rand.Intn(testDistMaxResendCount) + 1
  145. }
  146. for i := 0; i < cnt; i++ {
  147. if i != 0 {
  148. chn = dist.queue(req)
  149. }
  150. p := <-chn
  151. if p == nil {
  152. if len(rq.canSendTo) != 0 {
  153. t.Errorf("Request that could have been sent was dropped")
  154. }
  155. } else {
  156. peer := p.(*testDistPeer)
  157. if _, ok := rq.canSendTo[peer]; !ok {
  158. t.Errorf("Request sent to wrong peer")
  159. }
  160. }
  161. }
  162. wg.Done()
  163. }()
  164. if rand.Intn(1000) == 0 {
  165. time.Sleep(time.Duration(rand.Intn(5000000)))
  166. }
  167. }
  168. wg.Wait()
  169. }