retrieve.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407
  1. // Copyright 2017 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. // Package light implements on-demand retrieval capable state and chain objects
  17. // for the Ethereum Light Client.
  18. package les
  19. import (
  20. "context"
  21. "crypto/rand"
  22. "encoding/binary"
  23. "fmt"
  24. "sync"
  25. "time"
  26. "github.com/ethereum/go-ethereum/common/mclock"
  27. "github.com/ethereum/go-ethereum/light"
  28. )
  29. var (
  30. retryQueue = time.Millisecond * 100
  31. softRequestTimeout = time.Millisecond * 500
  32. hardRequestTimeout = time.Second * 10
  33. )
  34. // retrieveManager is a layer on top of requestDistributor which takes care of
  35. // matching replies by request ID and handles timeouts and resends if necessary.
  36. type retrieveManager struct {
  37. dist *requestDistributor
  38. peers *peerSet
  39. serverPool peerSelector
  40. lock sync.RWMutex
  41. sentReqs map[uint64]*sentReq
  42. }
  43. // validatorFunc is a function that processes a reply message
  44. type validatorFunc func(distPeer, *Msg) error
  45. // peerSelector receives feedback info about response times and timeouts
  46. type peerSelector interface {
  47. adjustResponseTime(*poolEntry, time.Duration, bool)
  48. }
  49. // sentReq represents a request sent and tracked by retrieveManager
  50. type sentReq struct {
  51. rm *retrieveManager
  52. req *distReq
  53. id uint64
  54. validate validatorFunc
  55. eventsCh chan reqPeerEvent
  56. stopCh chan struct{}
  57. stopped bool
  58. err error
  59. lock sync.RWMutex // protect access to sentTo map
  60. sentTo map[distPeer]sentReqToPeer
  61. lastReqQueued bool // last request has been queued but not sent
  62. lastReqSentTo distPeer // if not nil then last request has been sent to given peer but not timed out
  63. reqSrtoCount int // number of requests that reached soft (but not hard) timeout
  64. }
  65. // sentReqToPeer notifies the request-from-peer goroutine (tryRequest) about a response
  66. // delivered by the given peer. Only one delivery is allowed per request per peer,
  67. // after which delivered is set to true, the validity of the response is sent on the
  68. // valid channel and no more responses are accepted.
  69. type sentReqToPeer struct {
  70. delivered bool
  71. valid chan bool
  72. }
  73. // reqPeerEvent is sent by the request-from-peer goroutine (tryRequest) to the
  74. // request state machine (retrieveLoop) through the eventsCh channel.
  75. type reqPeerEvent struct {
  76. event int
  77. peer distPeer
  78. }
  79. const (
  80. rpSent = iota // if peer == nil, not sent (no suitable peers)
  81. rpSoftTimeout
  82. rpHardTimeout
  83. rpDeliveredValid
  84. rpDeliveredInvalid
  85. )
  86. // newRetrieveManager creates the retrieve manager
  87. func newRetrieveManager(peers *peerSet, dist *requestDistributor, serverPool peerSelector) *retrieveManager {
  88. return &retrieveManager{
  89. peers: peers,
  90. dist: dist,
  91. serverPool: serverPool,
  92. sentReqs: make(map[uint64]*sentReq),
  93. }
  94. }
  95. // retrieve sends a request (to multiple peers if necessary) and waits for an answer
  96. // that is delivered through the deliver function and successfully validated by the
  97. // validator callback. It returns when a valid answer is delivered or the context is
  98. // cancelled.
  99. func (rm *retrieveManager) retrieve(ctx context.Context, reqID uint64, req *distReq, val validatorFunc, shutdown chan struct{}) error {
  100. sentReq := rm.sendReq(reqID, req, val)
  101. select {
  102. case <-sentReq.stopCh:
  103. case <-ctx.Done():
  104. sentReq.stop(ctx.Err())
  105. case <-shutdown:
  106. sentReq.stop(fmt.Errorf("Client is shutting down"))
  107. }
  108. return sentReq.getError()
  109. }
  110. // sendReq starts a process that keeps trying to retrieve a valid answer for a
  111. // request from any suitable peers until stopped or succeeded.
  112. func (rm *retrieveManager) sendReq(reqID uint64, req *distReq, val validatorFunc) *sentReq {
  113. r := &sentReq{
  114. rm: rm,
  115. req: req,
  116. id: reqID,
  117. sentTo: make(map[distPeer]sentReqToPeer),
  118. stopCh: make(chan struct{}),
  119. eventsCh: make(chan reqPeerEvent, 10),
  120. validate: val,
  121. }
  122. canSend := req.canSend
  123. req.canSend = func(p distPeer) bool {
  124. // add an extra check to canSend: the request has not been sent to the same peer before
  125. r.lock.RLock()
  126. _, sent := r.sentTo[p]
  127. r.lock.RUnlock()
  128. return !sent && canSend(p)
  129. }
  130. request := req.request
  131. req.request = func(p distPeer) func() {
  132. // before actually sending the request, put an entry into the sentTo map
  133. r.lock.Lock()
  134. r.sentTo[p] = sentReqToPeer{false, make(chan bool, 1)}
  135. r.lock.Unlock()
  136. return request(p)
  137. }
  138. rm.lock.Lock()
  139. rm.sentReqs[reqID] = r
  140. rm.lock.Unlock()
  141. go r.retrieveLoop()
  142. return r
  143. }
  144. // deliver is called by the LES protocol manager to deliver reply messages to waiting requests
  145. func (rm *retrieveManager) deliver(peer distPeer, msg *Msg) error {
  146. rm.lock.RLock()
  147. req, ok := rm.sentReqs[msg.ReqID]
  148. rm.lock.RUnlock()
  149. if ok {
  150. return req.deliver(peer, msg)
  151. }
  152. return errResp(ErrUnexpectedResponse, "reqID = %v", msg.ReqID)
  153. }
  154. // reqStateFn represents a state of the retrieve loop state machine
  155. type reqStateFn func() reqStateFn
  156. // retrieveLoop is the retrieval state machine event loop
  157. func (r *sentReq) retrieveLoop() {
  158. go r.tryRequest()
  159. r.lastReqQueued = true
  160. state := r.stateRequesting
  161. for state != nil {
  162. state = state()
  163. }
  164. r.rm.lock.Lock()
  165. delete(r.rm.sentReqs, r.id)
  166. r.rm.lock.Unlock()
  167. }
  168. // stateRequesting: a request has been queued or sent recently; when it reaches soft timeout,
  169. // a new request is sent to a new peer
  170. func (r *sentReq) stateRequesting() reqStateFn {
  171. select {
  172. case ev := <-r.eventsCh:
  173. r.update(ev)
  174. switch ev.event {
  175. case rpSent:
  176. if ev.peer == nil {
  177. // request send failed, no more suitable peers
  178. if r.waiting() {
  179. // we are already waiting for sent requests which may succeed so keep waiting
  180. return r.stateNoMorePeers
  181. }
  182. // nothing to wait for, no more peers to ask, return with error
  183. r.stop(light.ErrNoPeers)
  184. // no need to go to stopped state because waiting() already returned false
  185. return nil
  186. }
  187. case rpSoftTimeout, rpDeliveredInvalid:
  188. // last request timed out, try asking a new peer
  189. go r.tryRequest()
  190. r.lastReqQueued = true
  191. return r.stateRequesting
  192. case rpDeliveredValid:
  193. r.stop(nil)
  194. return r.stateStopped
  195. }
  196. return r.stateRequesting
  197. case <-r.stopCh:
  198. return r.stateStopped
  199. }
  200. }
  201. // stateNoMorePeers: could not send more requests because no suitable peers are available.
  202. // Peers may become suitable for a certain request later or new peers may appear so we
  203. // keep trying.
  204. func (r *sentReq) stateNoMorePeers() reqStateFn {
  205. select {
  206. case <-time.After(retryQueue):
  207. go r.tryRequest()
  208. r.lastReqQueued = true
  209. return r.stateRequesting
  210. case ev := <-r.eventsCh:
  211. r.update(ev)
  212. if ev.event == rpDeliveredValid {
  213. r.stop(nil)
  214. return r.stateStopped
  215. }
  216. if r.waiting() {
  217. return r.stateNoMorePeers
  218. }
  219. r.stop(light.ErrNoPeers)
  220. return nil
  221. case <-r.stopCh:
  222. return r.stateStopped
  223. }
  224. }
  225. // stateStopped: request succeeded or cancelled, just waiting for some peers
  226. // to either answer or time out hard
  227. func (r *sentReq) stateStopped() reqStateFn {
  228. for r.waiting() {
  229. r.update(<-r.eventsCh)
  230. }
  231. return nil
  232. }
  233. // update updates the queued/sent flags and timed out peers counter according to the event
  234. func (r *sentReq) update(ev reqPeerEvent) {
  235. switch ev.event {
  236. case rpSent:
  237. r.lastReqQueued = false
  238. r.lastReqSentTo = ev.peer
  239. case rpSoftTimeout:
  240. r.lastReqSentTo = nil
  241. r.reqSrtoCount++
  242. case rpHardTimeout:
  243. r.reqSrtoCount--
  244. case rpDeliveredValid, rpDeliveredInvalid:
  245. if ev.peer == r.lastReqSentTo {
  246. r.lastReqSentTo = nil
  247. } else {
  248. r.reqSrtoCount--
  249. }
  250. }
  251. }
  252. // waiting returns true if the retrieval mechanism is waiting for an answer from
  253. // any peer
  254. func (r *sentReq) waiting() bool {
  255. return r.lastReqQueued || r.lastReqSentTo != nil || r.reqSrtoCount > 0
  256. }
  257. // tryRequest tries to send the request to a new peer and waits for it to either
  258. // succeed or time out if it has been sent. It also sends the appropriate reqPeerEvent
  259. // messages to the request's event channel.
  260. func (r *sentReq) tryRequest() {
  261. sent := r.rm.dist.queue(r.req)
  262. var p distPeer
  263. select {
  264. case p = <-sent:
  265. case <-r.stopCh:
  266. if r.rm.dist.cancel(r.req) {
  267. p = nil
  268. } else {
  269. p = <-sent
  270. }
  271. }
  272. r.eventsCh <- reqPeerEvent{rpSent, p}
  273. if p == nil {
  274. return
  275. }
  276. reqSent := mclock.Now()
  277. srto, hrto := false, false
  278. r.lock.RLock()
  279. s, ok := r.sentTo[p]
  280. r.lock.RUnlock()
  281. if !ok {
  282. panic(nil)
  283. }
  284. defer func() {
  285. // send feedback to server pool and remove peer if hard timeout happened
  286. pp, ok := p.(*peer)
  287. if ok && r.rm.serverPool != nil {
  288. respTime := time.Duration(mclock.Now() - reqSent)
  289. r.rm.serverPool.adjustResponseTime(pp.poolEntry, respTime, srto)
  290. }
  291. if hrto {
  292. pp.Log().Debug("Request timed out hard")
  293. if r.rm.peers != nil {
  294. r.rm.peers.Unregister(pp.id)
  295. }
  296. }
  297. r.lock.Lock()
  298. delete(r.sentTo, p)
  299. r.lock.Unlock()
  300. }()
  301. select {
  302. case ok := <-s.valid:
  303. if ok {
  304. r.eventsCh <- reqPeerEvent{rpDeliveredValid, p}
  305. } else {
  306. r.eventsCh <- reqPeerEvent{rpDeliveredInvalid, p}
  307. }
  308. return
  309. case <-time.After(softRequestTimeout):
  310. srto = true
  311. r.eventsCh <- reqPeerEvent{rpSoftTimeout, p}
  312. }
  313. select {
  314. case ok := <-s.valid:
  315. if ok {
  316. r.eventsCh <- reqPeerEvent{rpDeliveredValid, p}
  317. } else {
  318. r.eventsCh <- reqPeerEvent{rpDeliveredInvalid, p}
  319. }
  320. case <-time.After(hardRequestTimeout):
  321. hrto = true
  322. r.eventsCh <- reqPeerEvent{rpHardTimeout, p}
  323. }
  324. }
  325. // deliver a reply belonging to this request
  326. func (r *sentReq) deliver(peer distPeer, msg *Msg) error {
  327. r.lock.Lock()
  328. defer r.lock.Unlock()
  329. s, ok := r.sentTo[peer]
  330. if !ok || s.delivered {
  331. return errResp(ErrUnexpectedResponse, "reqID = %v", msg.ReqID)
  332. }
  333. valid := r.validate(peer, msg) == nil
  334. r.sentTo[peer] = sentReqToPeer{true, s.valid}
  335. s.valid <- valid
  336. if !valid {
  337. return errResp(ErrInvalidResponse, "reqID = %v", msg.ReqID)
  338. }
  339. return nil
  340. }
  341. // stop stops the retrieval process and sets an error code that will be returned
  342. // by getError
  343. func (r *sentReq) stop(err error) {
  344. r.lock.Lock()
  345. if !r.stopped {
  346. r.stopped = true
  347. r.err = err
  348. close(r.stopCh)
  349. }
  350. r.lock.Unlock()
  351. }
  352. // getError returns any retrieval error (either internally generated or set by the
  353. // stop function) after stopCh has been closed
  354. func (r *sentReq) getError() error {
  355. return r.err
  356. }
  357. // genReqID generates a new random request ID
  358. func genReqID() uint64 {
  359. var rnd [8]byte
  360. rand.Read(rnd[:])
  361. return binary.BigEndian.Uint64(rnd[:])
  362. }