distributor.go 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313
  1. // Copyright 2017 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package les
  17. import (
  18. "container/list"
  19. "sync"
  20. "time"
  21. "github.com/ethereum/go-ethereum/common/mclock"
  22. )
  23. // requestDistributor implements a mechanism that distributes requests to
  24. // suitable peers, obeying flow control rules and prioritizing them in creation
  25. // order (even when a resend is necessary).
  26. type requestDistributor struct {
  27. clock mclock.Clock
  28. reqQueue *list.List
  29. lastReqOrder uint64
  30. peers map[distPeer]struct{}
  31. peerLock sync.RWMutex
  32. loopChn chan struct{}
  33. loopNextSent bool
  34. lock sync.Mutex
  35. closeCh chan struct{}
  36. wg sync.WaitGroup
  37. }
  38. // distPeer is an LES server peer interface for the request distributor.
  39. // waitBefore returns either the necessary waiting time before sending a request
  40. // with the given upper estimated cost or the estimated remaining relative buffer
  41. // value after sending such a request (in which case the request can be sent
  42. // immediately). At least one of these values is always zero.
  43. type distPeer interface {
  44. waitBefore(uint64) (time.Duration, float64)
  45. canQueue() bool
  46. queueSend(f func()) bool
  47. }
  48. // distReq is the request abstraction used by the distributor. It is based on
  49. // three callback functions:
  50. // - getCost returns the upper estimate of the cost of sending the request to a given peer
  51. // - canSend tells if the server peer is suitable to serve the request
  52. // - request prepares sending the request to the given peer and returns a function that
  53. // does the actual sending. Request order should be preserved but the callback itself should not
  54. // block until it is sent because other peers might still be able to receive requests while
  55. // one of them is blocking. Instead, the returned function is put in the peer's send queue.
  56. type distReq struct {
  57. getCost func(distPeer) uint64
  58. canSend func(distPeer) bool
  59. request func(distPeer) func()
  60. reqOrder uint64
  61. sentChn chan distPeer
  62. element *list.Element
  63. waitForPeers mclock.AbsTime
  64. enterQueue mclock.AbsTime
  65. }
  66. // newRequestDistributor creates a new request distributor
  67. func newRequestDistributor(peers *serverPeerSet, clock mclock.Clock) *requestDistributor {
  68. d := &requestDistributor{
  69. clock: clock,
  70. reqQueue: list.New(),
  71. loopChn: make(chan struct{}, 2),
  72. closeCh: make(chan struct{}),
  73. peers: make(map[distPeer]struct{}),
  74. }
  75. if peers != nil {
  76. peers.subscribe(d)
  77. }
  78. d.wg.Add(1)
  79. go d.loop()
  80. return d
  81. }
  82. // registerPeer implements peerSetNotify
  83. func (d *requestDistributor) registerPeer(p *serverPeer) {
  84. d.peerLock.Lock()
  85. d.peers[p] = struct{}{}
  86. d.peerLock.Unlock()
  87. }
  88. // unregisterPeer implements peerSetNotify
  89. func (d *requestDistributor) unregisterPeer(p *serverPeer) {
  90. d.peerLock.Lock()
  91. delete(d.peers, p)
  92. d.peerLock.Unlock()
  93. }
  94. // registerTestPeer adds a new test peer
  95. func (d *requestDistributor) registerTestPeer(p distPeer) {
  96. d.peerLock.Lock()
  97. d.peers[p] = struct{}{}
  98. d.peerLock.Unlock()
  99. }
  100. var (
  101. // distMaxWait is the maximum waiting time after which further necessary waiting
  102. // times are recalculated based on new feedback from the servers
  103. distMaxWait = time.Millisecond * 50
  104. // waitForPeers is the time window in which a request does not fail even if it
  105. // has no suitable peers to send to at the moment
  106. waitForPeers = time.Second * 3
  107. )
  108. // main event loop
  109. func (d *requestDistributor) loop() {
  110. defer d.wg.Done()
  111. for {
  112. select {
  113. case <-d.closeCh:
  114. d.lock.Lock()
  115. elem := d.reqQueue.Front()
  116. for elem != nil {
  117. req := elem.Value.(*distReq)
  118. close(req.sentChn)
  119. req.sentChn = nil
  120. elem = elem.Next()
  121. }
  122. d.lock.Unlock()
  123. return
  124. case <-d.loopChn:
  125. d.lock.Lock()
  126. d.loopNextSent = false
  127. loop:
  128. for {
  129. peer, req, wait := d.nextRequest()
  130. if req != nil && wait == 0 {
  131. chn := req.sentChn // save sentChn because remove sets it to nil
  132. d.remove(req)
  133. send := req.request(peer)
  134. if send != nil {
  135. peer.queueSend(send)
  136. requestSendDelay.Update(time.Duration(d.clock.Now() - req.enterQueue))
  137. }
  138. chn <- peer
  139. close(chn)
  140. } else {
  141. if wait == 0 {
  142. // no request to send and nothing to wait for; the next
  143. // queued request will wake up the loop
  144. break loop
  145. }
  146. d.loopNextSent = true // a "next" signal has been sent, do not send another one until this one has been received
  147. if wait > distMaxWait {
  148. // waiting times may be reduced by incoming request replies, if it is too long, recalculate it periodically
  149. wait = distMaxWait
  150. }
  151. go func() {
  152. d.clock.Sleep(wait)
  153. d.loopChn <- struct{}{}
  154. }()
  155. break loop
  156. }
  157. }
  158. d.lock.Unlock()
  159. }
  160. }
  161. }
  162. // selectPeerItem represents a peer to be selected for a request by weightedRandomSelect
  163. type selectPeerItem struct {
  164. peer distPeer
  165. req *distReq
  166. weight int64
  167. }
  168. // Weight implements wrsItem interface
  169. func (sp selectPeerItem) Weight() int64 {
  170. return sp.weight
  171. }
  172. // nextRequest returns the next possible request from any peer, along with the
  173. // associated peer and necessary waiting time
  174. func (d *requestDistributor) nextRequest() (distPeer, *distReq, time.Duration) {
  175. checkedPeers := make(map[distPeer]struct{})
  176. elem := d.reqQueue.Front()
  177. var (
  178. bestWait time.Duration
  179. sel *weightedRandomSelect
  180. )
  181. d.peerLock.RLock()
  182. defer d.peerLock.RUnlock()
  183. peerCount := len(d.peers)
  184. for (len(checkedPeers) < peerCount || elem == d.reqQueue.Front()) && elem != nil {
  185. req := elem.Value.(*distReq)
  186. canSend := false
  187. now := d.clock.Now()
  188. if req.waitForPeers > now {
  189. canSend = true
  190. wait := time.Duration(req.waitForPeers - now)
  191. if bestWait == 0 || wait < bestWait {
  192. bestWait = wait
  193. }
  194. }
  195. for peer := range d.peers {
  196. if _, ok := checkedPeers[peer]; !ok && peer.canQueue() && req.canSend(peer) {
  197. canSend = true
  198. cost := req.getCost(peer)
  199. wait, bufRemain := peer.waitBefore(cost)
  200. if wait == 0 {
  201. if sel == nil {
  202. sel = newWeightedRandomSelect()
  203. }
  204. sel.update(selectPeerItem{peer: peer, req: req, weight: int64(bufRemain*1000000) + 1})
  205. } else {
  206. if bestWait == 0 || wait < bestWait {
  207. bestWait = wait
  208. }
  209. }
  210. checkedPeers[peer] = struct{}{}
  211. }
  212. }
  213. next := elem.Next()
  214. if !canSend && elem == d.reqQueue.Front() {
  215. close(req.sentChn)
  216. d.remove(req)
  217. }
  218. elem = next
  219. }
  220. if sel != nil {
  221. c := sel.choose().(selectPeerItem)
  222. return c.peer, c.req, 0
  223. }
  224. return nil, nil, bestWait
  225. }
  226. // queue adds a request to the distribution queue, returns a channel where the
  227. // receiving peer is sent once the request has been sent (request callback returned).
  228. // If the request is cancelled or timed out without suitable peers, the channel is
  229. // closed without sending any peer references to it.
  230. func (d *requestDistributor) queue(r *distReq) chan distPeer {
  231. d.lock.Lock()
  232. defer d.lock.Unlock()
  233. if r.reqOrder == 0 {
  234. d.lastReqOrder++
  235. r.reqOrder = d.lastReqOrder
  236. r.waitForPeers = d.clock.Now() + mclock.AbsTime(waitForPeers)
  237. }
  238. // Assign the timestamp when the request is queued no matter it's
  239. // a new one or re-queued one.
  240. r.enterQueue = d.clock.Now()
  241. back := d.reqQueue.Back()
  242. if back == nil || r.reqOrder > back.Value.(*distReq).reqOrder {
  243. r.element = d.reqQueue.PushBack(r)
  244. } else {
  245. before := d.reqQueue.Front()
  246. for before.Value.(*distReq).reqOrder < r.reqOrder {
  247. before = before.Next()
  248. }
  249. r.element = d.reqQueue.InsertBefore(r, before)
  250. }
  251. if !d.loopNextSent {
  252. d.loopNextSent = true
  253. d.loopChn <- struct{}{}
  254. }
  255. r.sentChn = make(chan distPeer, 1)
  256. return r.sentChn
  257. }
  258. // cancel removes a request from the queue if it has not been sent yet (returns
  259. // false if it has been sent already). It is guaranteed that the callback functions
  260. // will not be called after cancel returns.
  261. func (d *requestDistributor) cancel(r *distReq) bool {
  262. d.lock.Lock()
  263. defer d.lock.Unlock()
  264. if r.sentChn == nil {
  265. return false
  266. }
  267. close(r.sentChn)
  268. d.remove(r)
  269. return true
  270. }
  271. // remove removes a request from the queue
  272. func (d *requestDistributor) remove(r *distReq) {
  273. r.sentChn = nil
  274. if r.element != nil {
  275. d.reqQueue.Remove(r.element)
  276. r.element = nil
  277. }
  278. }
  279. func (d *requestDistributor) close() {
  280. close(d.closeCh)
  281. d.wg.Wait()
  282. }