retrieve.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435
  1. // Copyright 2017 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package les
  17. import (
  18. "context"
  19. "crypto/rand"
  20. "encoding/binary"
  21. "fmt"
  22. "sync"
  23. "time"
  24. "github.com/ethereum/go-ethereum/light"
  25. )
  26. var (
  27. retryQueue = time.Millisecond * 100
  28. hardRequestTimeout = time.Second * 10
  29. )
  30. // retrieveManager is a layer on top of requestDistributor which takes care of
  31. // matching replies by request ID and handles timeouts and resends if necessary.
  32. type retrieveManager struct {
  33. dist *requestDistributor
  34. peers *serverPeerSet
  35. softRequestTimeout func() time.Duration
  36. lock sync.RWMutex
  37. sentReqs map[uint64]*sentReq
  38. }
  39. // validatorFunc is a function that processes a reply message
  40. type validatorFunc func(distPeer, *Msg) error
  41. // sentReq represents a request sent and tracked by retrieveManager
  42. type sentReq struct {
  43. rm *retrieveManager
  44. req *distReq
  45. id uint64
  46. validate validatorFunc
  47. eventsCh chan reqPeerEvent
  48. stopCh chan struct{}
  49. stopped bool
  50. err error
  51. lock sync.RWMutex // protect access to sentTo map
  52. sentTo map[distPeer]sentReqToPeer
  53. lastReqQueued bool // last request has been queued but not sent
  54. lastReqSentTo distPeer // if not nil then last request has been sent to given peer but not timed out
  55. reqSrtoCount int // number of requests that reached soft (but not hard) timeout
  56. }
  57. // sentReqToPeer notifies the request-from-peer goroutine (tryRequest) about a response
  58. // delivered by the given peer. Only one delivery is allowed per request per peer,
  59. // after which delivered is set to true, the validity of the response is sent on the
  60. // valid channel and no more responses are accepted.
  61. type sentReqToPeer struct {
  62. delivered, frozen bool
  63. event chan int
  64. }
  65. // reqPeerEvent is sent by the request-from-peer goroutine (tryRequest) to the
  66. // request state machine (retrieveLoop) through the eventsCh channel.
  67. type reqPeerEvent struct {
  68. event int
  69. peer distPeer
  70. }
  71. const (
  72. rpSent = iota // if peer == nil, not sent (no suitable peers)
  73. rpSoftTimeout
  74. rpHardTimeout
  75. rpDeliveredValid
  76. rpDeliveredInvalid
  77. rpNotDelivered
  78. )
  79. // newRetrieveManager creates the retrieve manager
  80. func newRetrieveManager(peers *serverPeerSet, dist *requestDistributor, srto func() time.Duration) *retrieveManager {
  81. return &retrieveManager{
  82. peers: peers,
  83. dist: dist,
  84. sentReqs: make(map[uint64]*sentReq),
  85. softRequestTimeout: srto,
  86. }
  87. }
  88. // retrieve sends a request (to multiple peers if necessary) and waits for an answer
  89. // that is delivered through the deliver function and successfully validated by the
  90. // validator callback. It returns when a valid answer is delivered or the context is
  91. // cancelled.
  92. func (rm *retrieveManager) retrieve(ctx context.Context, reqID uint64, req *distReq, val validatorFunc, shutdown chan struct{}) error {
  93. sentReq := rm.sendReq(reqID, req, val)
  94. select {
  95. case <-sentReq.stopCh:
  96. case <-ctx.Done():
  97. sentReq.stop(ctx.Err())
  98. case <-shutdown:
  99. sentReq.stop(fmt.Errorf("client is shutting down"))
  100. }
  101. return sentReq.getError()
  102. }
  103. // sendReq starts a process that keeps trying to retrieve a valid answer for a
  104. // request from any suitable peers until stopped or succeeded.
  105. func (rm *retrieveManager) sendReq(reqID uint64, req *distReq, val validatorFunc) *sentReq {
  106. r := &sentReq{
  107. rm: rm,
  108. req: req,
  109. id: reqID,
  110. sentTo: make(map[distPeer]sentReqToPeer),
  111. stopCh: make(chan struct{}),
  112. eventsCh: make(chan reqPeerEvent, 10),
  113. validate: val,
  114. }
  115. canSend := req.canSend
  116. req.canSend = func(p distPeer) bool {
  117. // add an extra check to canSend: the request has not been sent to the same peer before
  118. r.lock.RLock()
  119. _, sent := r.sentTo[p]
  120. r.lock.RUnlock()
  121. return !sent && canSend(p)
  122. }
  123. request := req.request
  124. req.request = func(p distPeer) func() {
  125. // before actually sending the request, put an entry into the sentTo map
  126. r.lock.Lock()
  127. r.sentTo[p] = sentReqToPeer{delivered: false, frozen: false, event: make(chan int, 1)}
  128. r.lock.Unlock()
  129. return request(p)
  130. }
  131. rm.lock.Lock()
  132. rm.sentReqs[reqID] = r
  133. rm.lock.Unlock()
  134. go r.retrieveLoop()
  135. return r
  136. }
  137. // deliver is called by the LES protocol manager to deliver reply messages to waiting requests
  138. func (rm *retrieveManager) deliver(peer distPeer, msg *Msg) error {
  139. rm.lock.RLock()
  140. req, ok := rm.sentReqs[msg.ReqID]
  141. rm.lock.RUnlock()
  142. if ok {
  143. return req.deliver(peer, msg)
  144. }
  145. return errResp(ErrUnexpectedResponse, "reqID = %v", msg.ReqID)
  146. }
  147. // frozen is called by the LES protocol manager when a server has suspended its service and we
  148. // should not expect an answer for the requests already sent there
  149. func (rm *retrieveManager) frozen(peer distPeer) {
  150. rm.lock.RLock()
  151. defer rm.lock.RUnlock()
  152. for _, req := range rm.sentReqs {
  153. req.frozen(peer)
  154. }
  155. }
  156. // reqStateFn represents a state of the retrieve loop state machine
  157. type reqStateFn func() reqStateFn
  158. // retrieveLoop is the retrieval state machine event loop
  159. func (r *sentReq) retrieveLoop() {
  160. go r.tryRequest()
  161. r.lastReqQueued = true
  162. state := r.stateRequesting
  163. for state != nil {
  164. state = state()
  165. }
  166. r.rm.lock.Lock()
  167. delete(r.rm.sentReqs, r.id)
  168. r.rm.lock.Unlock()
  169. }
  170. // stateRequesting: a request has been queued or sent recently; when it reaches soft timeout,
  171. // a new request is sent to a new peer
  172. func (r *sentReq) stateRequesting() reqStateFn {
  173. select {
  174. case ev := <-r.eventsCh:
  175. r.update(ev)
  176. switch ev.event {
  177. case rpSent:
  178. if ev.peer == nil {
  179. // request send failed, no more suitable peers
  180. if r.waiting() {
  181. // we are already waiting for sent requests which may succeed so keep waiting
  182. return r.stateNoMorePeers
  183. }
  184. // nothing to wait for, no more peers to ask, return with error
  185. r.stop(light.ErrNoPeers)
  186. // no need to go to stopped state because waiting() already returned false
  187. return nil
  188. }
  189. case rpSoftTimeout:
  190. // last request timed out, try asking a new peer
  191. go r.tryRequest()
  192. r.lastReqQueued = true
  193. return r.stateRequesting
  194. case rpDeliveredInvalid, rpNotDelivered:
  195. // if it was the last sent request (set to nil by update) then start a new one
  196. if !r.lastReqQueued && r.lastReqSentTo == nil {
  197. go r.tryRequest()
  198. r.lastReqQueued = true
  199. }
  200. return r.stateRequesting
  201. case rpDeliveredValid:
  202. r.stop(nil)
  203. return r.stateStopped
  204. }
  205. return r.stateRequesting
  206. case <-r.stopCh:
  207. return r.stateStopped
  208. }
  209. }
  210. // stateNoMorePeers: could not send more requests because no suitable peers are available.
  211. // Peers may become suitable for a certain request later or new peers may appear so we
  212. // keep trying.
  213. func (r *sentReq) stateNoMorePeers() reqStateFn {
  214. select {
  215. case <-time.After(retryQueue):
  216. go r.tryRequest()
  217. r.lastReqQueued = true
  218. return r.stateRequesting
  219. case ev := <-r.eventsCh:
  220. r.update(ev)
  221. if ev.event == rpDeliveredValid {
  222. r.stop(nil)
  223. return r.stateStopped
  224. }
  225. if r.waiting() {
  226. return r.stateNoMorePeers
  227. }
  228. r.stop(light.ErrNoPeers)
  229. return nil
  230. case <-r.stopCh:
  231. return r.stateStopped
  232. }
  233. }
  234. // stateStopped: request succeeded or cancelled, just waiting for some peers
  235. // to either answer or time out hard
  236. func (r *sentReq) stateStopped() reqStateFn {
  237. for r.waiting() {
  238. r.update(<-r.eventsCh)
  239. }
  240. return nil
  241. }
  242. // update updates the queued/sent flags and timed out peers counter according to the event
  243. func (r *sentReq) update(ev reqPeerEvent) {
  244. switch ev.event {
  245. case rpSent:
  246. r.lastReqQueued = false
  247. r.lastReqSentTo = ev.peer
  248. case rpSoftTimeout:
  249. r.lastReqSentTo = nil
  250. r.reqSrtoCount++
  251. case rpHardTimeout:
  252. r.reqSrtoCount--
  253. case rpDeliveredValid, rpDeliveredInvalid, rpNotDelivered:
  254. if ev.peer == r.lastReqSentTo {
  255. r.lastReqSentTo = nil
  256. } else {
  257. r.reqSrtoCount--
  258. }
  259. }
  260. }
  261. // waiting returns true if the retrieval mechanism is waiting for an answer from
  262. // any peer
  263. func (r *sentReq) waiting() bool {
  264. return r.lastReqQueued || r.lastReqSentTo != nil || r.reqSrtoCount > 0
  265. }
  266. // tryRequest tries to send the request to a new peer and waits for it to either
  267. // succeed or time out if it has been sent. It also sends the appropriate reqPeerEvent
  268. // messages to the request's event channel.
  269. func (r *sentReq) tryRequest() {
  270. sent := r.rm.dist.queue(r.req)
  271. var p distPeer
  272. select {
  273. case p = <-sent:
  274. case <-r.stopCh:
  275. if r.rm.dist.cancel(r.req) {
  276. p = nil
  277. } else {
  278. p = <-sent
  279. }
  280. }
  281. r.eventsCh <- reqPeerEvent{rpSent, p}
  282. if p == nil {
  283. return
  284. }
  285. hrto := false
  286. r.lock.RLock()
  287. s, ok := r.sentTo[p]
  288. r.lock.RUnlock()
  289. if !ok {
  290. panic(nil)
  291. }
  292. defer func() {
  293. // send feedback to server pool and remove peer if hard timeout happened
  294. pp, ok := p.(*serverPeer)
  295. if hrto && ok {
  296. pp.Log().Debug("Request timed out hard")
  297. if r.rm.peers != nil {
  298. r.rm.peers.unregister(pp.id)
  299. }
  300. }
  301. r.lock.Lock()
  302. delete(r.sentTo, p)
  303. r.lock.Unlock()
  304. }()
  305. select {
  306. case event := <-s.event:
  307. if event == rpNotDelivered {
  308. r.lock.Lock()
  309. delete(r.sentTo, p)
  310. r.lock.Unlock()
  311. }
  312. r.eventsCh <- reqPeerEvent{event, p}
  313. return
  314. case <-time.After(r.rm.softRequestTimeout()):
  315. r.eventsCh <- reqPeerEvent{rpSoftTimeout, p}
  316. }
  317. select {
  318. case event := <-s.event:
  319. if event == rpNotDelivered {
  320. r.lock.Lock()
  321. delete(r.sentTo, p)
  322. r.lock.Unlock()
  323. }
  324. r.eventsCh <- reqPeerEvent{event, p}
  325. case <-time.After(hardRequestTimeout):
  326. hrto = true
  327. r.eventsCh <- reqPeerEvent{rpHardTimeout, p}
  328. }
  329. }
  330. // deliver a reply belonging to this request
  331. func (r *sentReq) deliver(peer distPeer, msg *Msg) error {
  332. r.lock.Lock()
  333. defer r.lock.Unlock()
  334. s, ok := r.sentTo[peer]
  335. if !ok || s.delivered {
  336. return errResp(ErrUnexpectedResponse, "reqID = %v", msg.ReqID)
  337. }
  338. if s.frozen {
  339. return nil
  340. }
  341. valid := r.validate(peer, msg) == nil
  342. r.sentTo[peer] = sentReqToPeer{delivered: true, frozen: false, event: s.event}
  343. if valid {
  344. s.event <- rpDeliveredValid
  345. } else {
  346. s.event <- rpDeliveredInvalid
  347. }
  348. if !valid {
  349. return errResp(ErrInvalidResponse, "reqID = %v", msg.ReqID)
  350. }
  351. return nil
  352. }
  353. // frozen sends a "not delivered" event to the peer event channel belonging to the
  354. // given peer if the request has been sent there, causing the state machine to not
  355. // expect an answer and potentially even send the request to the same peer again
  356. // when canSend allows it.
  357. func (r *sentReq) frozen(peer distPeer) {
  358. r.lock.Lock()
  359. defer r.lock.Unlock()
  360. s, ok := r.sentTo[peer]
  361. if ok && !s.delivered && !s.frozen {
  362. r.sentTo[peer] = sentReqToPeer{delivered: false, frozen: true, event: s.event}
  363. s.event <- rpNotDelivered
  364. }
  365. }
  366. // stop stops the retrieval process and sets an error code that will be returned
  367. // by getError
  368. func (r *sentReq) stop(err error) {
  369. r.lock.Lock()
  370. if !r.stopped {
  371. r.stopped = true
  372. r.err = err
  373. close(r.stopCh)
  374. }
  375. r.lock.Unlock()
  376. }
  377. // getError returns any retrieval error (either internally generated or set by the
  378. // stop function) after stopCh has been closed
  379. func (r *sentReq) getError() error {
  380. return r.err
  381. }
  382. // genReqID generates a new random request ID
  383. func genReqID() uint64 {
  384. var rnd [8]byte
  385. rand.Read(rnd[:])
  386. return binary.BigEndian.Uint64(rnd[:])
  387. }