distributor.go 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296
  1. // Copyright 2017 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package les
  17. import (
  18. "container/list"
  19. "sync"
  20. "time"
  21. "github.com/ethereum/go-ethereum/common/mclock"
  22. )
  23. // requestDistributor implements a mechanism that distributes requests to
  24. // suitable peers, obeying flow control rules and prioritizing them in creation
  25. // order (even when a resend is necessary).
  26. type requestDistributor struct {
  27. clock mclock.Clock
  28. reqQueue *list.List
  29. lastReqOrder uint64
  30. peers map[distPeer]struct{}
  31. peerLock sync.RWMutex
  32. stopChn, loopChn chan struct{}
  33. loopNextSent bool
  34. lock sync.Mutex
  35. }
  36. // distPeer is an LES server peer interface for the request distributor.
  37. // waitBefore returns either the necessary waiting time before sending a request
  38. // with the given upper estimated cost or the estimated remaining relative buffer
  39. // value after sending such a request (in which case the request can be sent
  40. // immediately). At least one of these values is always zero.
  41. type distPeer interface {
  42. waitBefore(uint64) (time.Duration, float64)
  43. canQueue() bool
  44. queueSend(f func())
  45. }
  46. // distReq is the request abstraction used by the distributor. It is based on
  47. // three callback functions:
  48. // - getCost returns the upper estimate of the cost of sending the request to a given peer
  49. // - canSend tells if the server peer is suitable to serve the request
  50. // - request prepares sending the request to the given peer and returns a function that
  51. // does the actual sending. Request order should be preserved but the callback itself should not
  52. // block until it is sent because other peers might still be able to receive requests while
  53. // one of them is blocking. Instead, the returned function is put in the peer's send queue.
  54. type distReq struct {
  55. getCost func(distPeer) uint64
  56. canSend func(distPeer) bool
  57. request func(distPeer) func()
  58. reqOrder uint64
  59. sentChn chan distPeer
  60. element *list.Element
  61. waitForPeers mclock.AbsTime
  62. }
  63. // newRequestDistributor creates a new request distributor
  64. func newRequestDistributor(peers *peerSet, stopChn chan struct{}, clock mclock.Clock) *requestDistributor {
  65. d := &requestDistributor{
  66. clock: clock,
  67. reqQueue: list.New(),
  68. loopChn: make(chan struct{}, 2),
  69. stopChn: stopChn,
  70. peers: make(map[distPeer]struct{}),
  71. }
  72. if peers != nil {
  73. peers.notify(d)
  74. }
  75. go d.loop()
  76. return d
  77. }
  78. // registerPeer implements peerSetNotify
  79. func (d *requestDistributor) registerPeer(p *peer) {
  80. d.peerLock.Lock()
  81. d.peers[p] = struct{}{}
  82. d.peerLock.Unlock()
  83. }
  84. // unregisterPeer implements peerSetNotify
  85. func (d *requestDistributor) unregisterPeer(p *peer) {
  86. d.peerLock.Lock()
  87. delete(d.peers, p)
  88. d.peerLock.Unlock()
  89. }
  90. // registerTestPeer adds a new test peer
  91. func (d *requestDistributor) registerTestPeer(p distPeer) {
  92. d.peerLock.Lock()
  93. d.peers[p] = struct{}{}
  94. d.peerLock.Unlock()
  95. }
  96. // distMaxWait is the maximum waiting time after which further necessary waiting
  97. // times are recalculated based on new feedback from the servers
  98. const distMaxWait = time.Millisecond * 50
  99. // waitForPeers is the time window in which a request does not fail even if it
  100. // has no suitable peers to send to at the moment
  101. const waitForPeers = time.Second * 3
  102. // main event loop
  103. func (d *requestDistributor) loop() {
  104. for {
  105. select {
  106. case <-d.stopChn:
  107. d.lock.Lock()
  108. elem := d.reqQueue.Front()
  109. for elem != nil {
  110. req := elem.Value.(*distReq)
  111. close(req.sentChn)
  112. req.sentChn = nil
  113. elem = elem.Next()
  114. }
  115. d.lock.Unlock()
  116. return
  117. case <-d.loopChn:
  118. d.lock.Lock()
  119. d.loopNextSent = false
  120. loop:
  121. for {
  122. peer, req, wait := d.nextRequest()
  123. if req != nil && wait == 0 {
  124. chn := req.sentChn // save sentChn because remove sets it to nil
  125. d.remove(req)
  126. send := req.request(peer)
  127. if send != nil {
  128. peer.queueSend(send)
  129. }
  130. chn <- peer
  131. close(chn)
  132. } else {
  133. if wait == 0 {
  134. // no request to send and nothing to wait for; the next
  135. // queued request will wake up the loop
  136. break loop
  137. }
  138. d.loopNextSent = true // a "next" signal has been sent, do not send another one until this one has been received
  139. if wait > distMaxWait {
  140. // waiting times may be reduced by incoming request replies, if it is too long, recalculate it periodically
  141. wait = distMaxWait
  142. }
  143. go func() {
  144. d.clock.Sleep(wait)
  145. d.loopChn <- struct{}{}
  146. }()
  147. break loop
  148. }
  149. }
  150. d.lock.Unlock()
  151. }
  152. }
  153. }
  154. // selectPeerItem represents a peer to be selected for a request by weightedRandomSelect
  155. type selectPeerItem struct {
  156. peer distPeer
  157. req *distReq
  158. weight int64
  159. }
  160. // Weight implements wrsItem interface
  161. func (sp selectPeerItem) Weight() int64 {
  162. return sp.weight
  163. }
  164. // nextRequest returns the next possible request from any peer, along with the
  165. // associated peer and necessary waiting time
  166. func (d *requestDistributor) nextRequest() (distPeer, *distReq, time.Duration) {
  167. checkedPeers := make(map[distPeer]struct{})
  168. elem := d.reqQueue.Front()
  169. var (
  170. bestWait time.Duration
  171. sel *weightedRandomSelect
  172. )
  173. d.peerLock.RLock()
  174. defer d.peerLock.RUnlock()
  175. peerCount := len(d.peers)
  176. for (len(checkedPeers) < peerCount || elem == d.reqQueue.Front()) && elem != nil {
  177. req := elem.Value.(*distReq)
  178. canSend := false
  179. now := d.clock.Now()
  180. if req.waitForPeers > now {
  181. canSend = true
  182. wait := time.Duration(req.waitForPeers - now)
  183. if bestWait == 0 || wait < bestWait {
  184. bestWait = wait
  185. }
  186. }
  187. for peer := range d.peers {
  188. if _, ok := checkedPeers[peer]; !ok && peer.canQueue() && req.canSend(peer) {
  189. canSend = true
  190. cost := req.getCost(peer)
  191. wait, bufRemain := peer.waitBefore(cost)
  192. if wait == 0 {
  193. if sel == nil {
  194. sel = newWeightedRandomSelect()
  195. }
  196. sel.update(selectPeerItem{peer: peer, req: req, weight: int64(bufRemain*1000000) + 1})
  197. } else {
  198. if bestWait == 0 || wait < bestWait {
  199. bestWait = wait
  200. }
  201. }
  202. checkedPeers[peer] = struct{}{}
  203. }
  204. }
  205. next := elem.Next()
  206. if !canSend && elem == d.reqQueue.Front() {
  207. close(req.sentChn)
  208. d.remove(req)
  209. }
  210. elem = next
  211. }
  212. if sel != nil {
  213. c := sel.choose().(selectPeerItem)
  214. return c.peer, c.req, 0
  215. }
  216. return nil, nil, bestWait
  217. }
  218. // queue adds a request to the distribution queue, returns a channel where the
  219. // receiving peer is sent once the request has been sent (request callback returned).
  220. // If the request is cancelled or timed out without suitable peers, the channel is
  221. // closed without sending any peer references to it.
  222. func (d *requestDistributor) queue(r *distReq) chan distPeer {
  223. d.lock.Lock()
  224. defer d.lock.Unlock()
  225. if r.reqOrder == 0 {
  226. d.lastReqOrder++
  227. r.reqOrder = d.lastReqOrder
  228. r.waitForPeers = d.clock.Now() + mclock.AbsTime(waitForPeers)
  229. }
  230. back := d.reqQueue.Back()
  231. if back == nil || r.reqOrder > back.Value.(*distReq).reqOrder {
  232. r.element = d.reqQueue.PushBack(r)
  233. } else {
  234. before := d.reqQueue.Front()
  235. for before.Value.(*distReq).reqOrder < r.reqOrder {
  236. before = before.Next()
  237. }
  238. r.element = d.reqQueue.InsertBefore(r, before)
  239. }
  240. if !d.loopNextSent {
  241. d.loopNextSent = true
  242. d.loopChn <- struct{}{}
  243. }
  244. r.sentChn = make(chan distPeer, 1)
  245. return r.sentChn
  246. }
  247. // cancel removes a request from the queue if it has not been sent yet (returns
  248. // false if it has been sent already). It is guaranteed that the callback functions
  249. // will not be called after cancel returns.
  250. func (d *requestDistributor) cancel(r *distReq) bool {
  251. d.lock.Lock()
  252. defer d.lock.Unlock()
  253. if r.sentChn == nil {
  254. return false
  255. }
  256. close(r.sentChn)
  257. d.remove(r)
  258. return true
  259. }
  260. // remove removes a request from the queue
  261. func (d *requestDistributor) remove(r *distReq) {
  262. r.sentChn = nil
  263. if r.element != nil {
  264. d.reqQueue.Remove(r.element)
  265. r.element = nil
  266. }
  267. }