retrieve.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430
  1. // Copyright 2017 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package les
  17. import (
  18. "context"
  19. "fmt"
  20. "sync"
  21. "time"
  22. "github.com/ethereum/go-ethereum/light"
  23. )
  24. var (
  25. retryQueue = time.Millisecond * 100
  26. hardRequestTimeout = time.Second * 10
  27. )
  28. // retrieveManager is a layer on top of requestDistributor which takes care of
  29. // matching replies by request ID and handles timeouts and resends if necessary.
  30. type retrieveManager struct {
  31. dist *requestDistributor
  32. peers *serverPeerSet
  33. softRequestTimeout func() time.Duration
  34. lock sync.RWMutex
  35. sentReqs map[uint64]*sentReq
  36. }
  37. // validatorFunc is a function that processes a reply message
  38. type validatorFunc func(distPeer, *Msg) error
  39. // sentReq represents a request sent and tracked by retrieveManager
  40. type sentReq struct {
  41. rm *retrieveManager
  42. req *distReq
  43. id uint64
  44. validate validatorFunc
  45. eventsCh chan reqPeerEvent
  46. stopCh chan struct{}
  47. stopped bool
  48. err error
  49. lock sync.RWMutex // protect access to sentTo map
  50. sentTo map[distPeer]sentReqToPeer
  51. lastReqQueued bool // last request has been queued but not sent
  52. lastReqSentTo distPeer // if not nil then last request has been sent to given peer but not timed out
  53. reqSrtoCount int // number of requests that reached soft (but not hard) timeout
  54. }
  55. // sentReqToPeer notifies the request-from-peer goroutine (tryRequest) about a response
  56. // delivered by the given peer. Only one delivery is allowed per request per peer,
  57. // after which delivered is set to true, the validity of the response is sent on the
  58. // valid channel and no more responses are accepted.
  59. type sentReqToPeer struct {
  60. delivered, frozen bool
  61. event chan int
  62. }
  63. // reqPeerEvent is sent by the request-from-peer goroutine (tryRequest) to the
  64. // request state machine (retrieveLoop) through the eventsCh channel.
  65. type reqPeerEvent struct {
  66. event int
  67. peer distPeer
  68. }
  69. const (
  70. rpSent = iota // if peer == nil, not sent (no suitable peers)
  71. rpSoftTimeout
  72. rpHardTimeout
  73. rpDeliveredValid
  74. rpDeliveredInvalid
  75. rpNotDelivered
  76. )
  77. // newRetrieveManager creates the retrieve manager
  78. func newRetrieveManager(peers *serverPeerSet, dist *requestDistributor, srto func() time.Duration) *retrieveManager {
  79. return &retrieveManager{
  80. peers: peers,
  81. dist: dist,
  82. sentReqs: make(map[uint64]*sentReq),
  83. softRequestTimeout: srto,
  84. }
  85. }
  86. // retrieve sends a request (to multiple peers if necessary) and waits for an answer
  87. // that is delivered through the deliver function and successfully validated by the
  88. // validator callback. It returns when a valid answer is delivered or the context is
  89. // cancelled.
  90. func (rm *retrieveManager) retrieve(ctx context.Context, reqID uint64, req *distReq, val validatorFunc, shutdown chan struct{}) error {
  91. sentReq := rm.sendReq(reqID, req, val)
  92. select {
  93. case <-sentReq.stopCh:
  94. case <-ctx.Done():
  95. sentReq.stop(ctx.Err())
  96. case <-shutdown:
  97. sentReq.stop(fmt.Errorf("client is shutting down"))
  98. }
  99. return sentReq.getError()
  100. }
  101. // sendReq starts a process that keeps trying to retrieve a valid answer for a
  102. // request from any suitable peers until stopped or succeeded.
  103. func (rm *retrieveManager) sendReq(reqID uint64, req *distReq, val validatorFunc) *sentReq {
  104. r := &sentReq{
  105. rm: rm,
  106. req: req,
  107. id: reqID,
  108. sentTo: make(map[distPeer]sentReqToPeer),
  109. stopCh: make(chan struct{}),
  110. eventsCh: make(chan reqPeerEvent, 10),
  111. validate: val,
  112. }
  113. canSend := req.canSend
  114. req.canSend = func(p distPeer) bool {
  115. // add an extra check to canSend: the request has not been sent to the same peer before
  116. r.lock.RLock()
  117. _, sent := r.sentTo[p]
  118. r.lock.RUnlock()
  119. return !sent && canSend(p)
  120. }
  121. request := req.request
  122. req.request = func(p distPeer) func() {
  123. // before actually sending the request, put an entry into the sentTo map
  124. r.lock.Lock()
  125. r.sentTo[p] = sentReqToPeer{delivered: false, frozen: false, event: make(chan int, 1)}
  126. r.lock.Unlock()
  127. return request(p)
  128. }
  129. rm.lock.Lock()
  130. rm.sentReqs[reqID] = r
  131. rm.lock.Unlock()
  132. go r.retrieveLoop()
  133. return r
  134. }
  135. // requested reports whether the request with given reqid is sent by the retriever.
  136. func (rm *retrieveManager) requested(reqId uint64) bool {
  137. rm.lock.RLock()
  138. defer rm.lock.RUnlock()
  139. _, ok := rm.sentReqs[reqId]
  140. return ok
  141. }
  142. // deliver is called by the LES protocol manager to deliver reply messages to waiting requests
  143. func (rm *retrieveManager) deliver(peer distPeer, msg *Msg) error {
  144. rm.lock.RLock()
  145. req, ok := rm.sentReqs[msg.ReqID]
  146. rm.lock.RUnlock()
  147. if ok {
  148. return req.deliver(peer, msg)
  149. }
  150. return errResp(ErrUnexpectedResponse, "reqID = %v", msg.ReqID)
  151. }
  152. // frozen is called by the LES protocol manager when a server has suspended its service and we
  153. // should not expect an answer for the requests already sent there
  154. func (rm *retrieveManager) frozen(peer distPeer) {
  155. rm.lock.RLock()
  156. defer rm.lock.RUnlock()
  157. for _, req := range rm.sentReqs {
  158. req.frozen(peer)
  159. }
  160. }
  161. // reqStateFn represents a state of the retrieve loop state machine
  162. type reqStateFn func() reqStateFn
  163. // retrieveLoop is the retrieval state machine event loop
  164. func (r *sentReq) retrieveLoop() {
  165. go r.tryRequest()
  166. r.lastReqQueued = true
  167. state := r.stateRequesting
  168. for state != nil {
  169. state = state()
  170. }
  171. r.rm.lock.Lock()
  172. delete(r.rm.sentReqs, r.id)
  173. r.rm.lock.Unlock()
  174. }
  175. // stateRequesting: a request has been queued or sent recently; when it reaches soft timeout,
  176. // a new request is sent to a new peer
  177. func (r *sentReq) stateRequesting() reqStateFn {
  178. select {
  179. case ev := <-r.eventsCh:
  180. r.update(ev)
  181. switch ev.event {
  182. case rpSent:
  183. if ev.peer == nil {
  184. // request send failed, no more suitable peers
  185. if r.waiting() {
  186. // we are already waiting for sent requests which may succeed so keep waiting
  187. return r.stateNoMorePeers
  188. }
  189. // nothing to wait for, no more peers to ask, return with error
  190. r.stop(light.ErrNoPeers)
  191. // no need to go to stopped state because waiting() already returned false
  192. return nil
  193. }
  194. case rpSoftTimeout:
  195. // last request timed out, try asking a new peer
  196. go r.tryRequest()
  197. r.lastReqQueued = true
  198. return r.stateRequesting
  199. case rpDeliveredInvalid, rpNotDelivered:
  200. // if it was the last sent request (set to nil by update) then start a new one
  201. if !r.lastReqQueued && r.lastReqSentTo == nil {
  202. go r.tryRequest()
  203. r.lastReqQueued = true
  204. }
  205. return r.stateRequesting
  206. case rpDeliveredValid:
  207. r.stop(nil)
  208. return r.stateStopped
  209. }
  210. return r.stateRequesting
  211. case <-r.stopCh:
  212. return r.stateStopped
  213. }
  214. }
  215. // stateNoMorePeers: could not send more requests because no suitable peers are available.
  216. // Peers may become suitable for a certain request later or new peers may appear so we
  217. // keep trying.
  218. func (r *sentReq) stateNoMorePeers() reqStateFn {
  219. select {
  220. case <-time.After(retryQueue):
  221. go r.tryRequest()
  222. r.lastReqQueued = true
  223. return r.stateRequesting
  224. case ev := <-r.eventsCh:
  225. r.update(ev)
  226. if ev.event == rpDeliveredValid {
  227. r.stop(nil)
  228. return r.stateStopped
  229. }
  230. if r.waiting() {
  231. return r.stateNoMorePeers
  232. }
  233. r.stop(light.ErrNoPeers)
  234. return nil
  235. case <-r.stopCh:
  236. return r.stateStopped
  237. }
  238. }
  239. // stateStopped: request succeeded or cancelled, just waiting for some peers
  240. // to either answer or time out hard
  241. func (r *sentReq) stateStopped() reqStateFn {
  242. for r.waiting() {
  243. r.update(<-r.eventsCh)
  244. }
  245. return nil
  246. }
  247. // update updates the queued/sent flags and timed out peers counter according to the event
  248. func (r *sentReq) update(ev reqPeerEvent) {
  249. switch ev.event {
  250. case rpSent:
  251. r.lastReqQueued = false
  252. r.lastReqSentTo = ev.peer
  253. case rpSoftTimeout:
  254. r.lastReqSentTo = nil
  255. r.reqSrtoCount++
  256. case rpHardTimeout:
  257. r.reqSrtoCount--
  258. case rpDeliveredValid, rpDeliveredInvalid, rpNotDelivered:
  259. if ev.peer == r.lastReqSentTo {
  260. r.lastReqSentTo = nil
  261. } else {
  262. r.reqSrtoCount--
  263. }
  264. }
  265. }
  266. // waiting returns true if the retrieval mechanism is waiting for an answer from
  267. // any peer
  268. func (r *sentReq) waiting() bool {
  269. return r.lastReqQueued || r.lastReqSentTo != nil || r.reqSrtoCount > 0
  270. }
  271. // tryRequest tries to send the request to a new peer and waits for it to either
  272. // succeed or time out if it has been sent. It also sends the appropriate reqPeerEvent
  273. // messages to the request's event channel.
  274. func (r *sentReq) tryRequest() {
  275. sent := r.rm.dist.queue(r.req)
  276. var p distPeer
  277. select {
  278. case p = <-sent:
  279. case <-r.stopCh:
  280. if r.rm.dist.cancel(r.req) {
  281. p = nil
  282. } else {
  283. p = <-sent
  284. }
  285. }
  286. r.eventsCh <- reqPeerEvent{rpSent, p}
  287. if p == nil {
  288. return
  289. }
  290. hrto := false
  291. r.lock.RLock()
  292. s, ok := r.sentTo[p]
  293. r.lock.RUnlock()
  294. if !ok {
  295. panic(nil)
  296. }
  297. defer func() {
  298. pp, ok := p.(*serverPeer)
  299. if hrto && ok {
  300. pp.Log().Debug("Request timed out hard")
  301. if r.rm.peers != nil {
  302. r.rm.peers.unregister(pp.id)
  303. }
  304. }
  305. }()
  306. select {
  307. case event := <-s.event:
  308. if event == rpNotDelivered {
  309. r.lock.Lock()
  310. delete(r.sentTo, p)
  311. r.lock.Unlock()
  312. }
  313. r.eventsCh <- reqPeerEvent{event, p}
  314. return
  315. case <-time.After(r.rm.softRequestTimeout()):
  316. r.eventsCh <- reqPeerEvent{rpSoftTimeout, p}
  317. }
  318. select {
  319. case event := <-s.event:
  320. if event == rpNotDelivered {
  321. r.lock.Lock()
  322. delete(r.sentTo, p)
  323. r.lock.Unlock()
  324. }
  325. r.eventsCh <- reqPeerEvent{event, p}
  326. case <-time.After(hardRequestTimeout):
  327. hrto = true
  328. r.eventsCh <- reqPeerEvent{rpHardTimeout, p}
  329. }
  330. }
  331. // deliver a reply belonging to this request
  332. func (r *sentReq) deliver(peer distPeer, msg *Msg) error {
  333. r.lock.Lock()
  334. defer r.lock.Unlock()
  335. s, ok := r.sentTo[peer]
  336. if !ok || s.delivered {
  337. return errResp(ErrUnexpectedResponse, "reqID = %v", msg.ReqID)
  338. }
  339. if s.frozen {
  340. return nil
  341. }
  342. valid := r.validate(peer, msg) == nil
  343. r.sentTo[peer] = sentReqToPeer{delivered: true, frozen: false, event: s.event}
  344. if valid {
  345. s.event <- rpDeliveredValid
  346. } else {
  347. s.event <- rpDeliveredInvalid
  348. }
  349. if !valid {
  350. return errResp(ErrInvalidResponse, "reqID = %v", msg.ReqID)
  351. }
  352. return nil
  353. }
  354. // frozen sends a "not delivered" event to the peer event channel belonging to the
  355. // given peer if the request has been sent there, causing the state machine to not
  356. // expect an answer and potentially even send the request to the same peer again
  357. // when canSend allows it.
  358. func (r *sentReq) frozen(peer distPeer) {
  359. r.lock.Lock()
  360. defer r.lock.Unlock()
  361. s, ok := r.sentTo[peer]
  362. if ok && !s.delivered && !s.frozen {
  363. r.sentTo[peer] = sentReqToPeer{delivered: false, frozen: true, event: s.event}
  364. s.event <- rpNotDelivered
  365. }
  366. }
  367. // stop stops the retrieval process and sets an error code that will be returned
  368. // by getError
  369. func (r *sentReq) stop(err error) {
  370. r.lock.Lock()
  371. if !r.stopped {
  372. r.stopped = true
  373. r.err = err
  374. close(r.stopCh)
  375. }
  376. r.lock.Unlock()
  377. }
  378. // getError returns any retrieval error (either internally generated or set by the
  379. // stop function) after stopCh has been closed
  380. func (r *sentReq) getError() error {
  381. return r.err
  382. }