fetcher.go 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567
  1. // Copyright 2016 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package les
  17. import (
  18. "math/big"
  19. "math/rand"
  20. "sync"
  21. "time"
  22. "github.com/ethereum/go-ethereum/common"
  23. "github.com/ethereum/go-ethereum/consensus"
  24. "github.com/ethereum/go-ethereum/core"
  25. "github.com/ethereum/go-ethereum/core/rawdb"
  26. "github.com/ethereum/go-ethereum/core/types"
  27. "github.com/ethereum/go-ethereum/ethdb"
  28. "github.com/ethereum/go-ethereum/les/fetcher"
  29. "github.com/ethereum/go-ethereum/light"
  30. "github.com/ethereum/go-ethereum/log"
  31. "github.com/ethereum/go-ethereum/p2p/enode"
  32. )
  33. const (
  34. blockDelayTimeout = 10 * time.Second // Timeout for retrieving the headers from the peer
  35. gatherSlack = 100 * time.Millisecond // Interval used to collate almost-expired requests
  36. cachedAnnosThreshold = 64 // The maximum queued announcements
  37. )
  38. // announce represents an new block announcement from the les server.
  39. type announce struct {
  40. data *announceData
  41. trust bool
  42. peerid enode.ID
  43. }
  44. // request represents a record when the header request is sent.
  45. type request struct {
  46. reqid uint64
  47. peerid enode.ID
  48. sendAt time.Time
  49. hash common.Hash
  50. }
  51. // response represents a response packet from network as well as a channel
  52. // to return all un-requested data.
  53. type response struct {
  54. reqid uint64
  55. headers []*types.Header
  56. peerid enode.ID
  57. remain chan []*types.Header
  58. }
  59. // fetcherPeer holds the fetcher-specific information for each active peer
  60. type fetcherPeer struct {
  61. latest *announceData // The latest announcement sent from the peer
  62. // These following two fields can track the latest announces
  63. // from the peer with limited size for caching. We hold the
  64. // assumption that all enqueued announces are td-monotonic.
  65. announces map[common.Hash]*announce // Announcement map
  66. fifo []common.Hash // FIFO announces list
  67. }
  68. // addAnno enqueues an new trusted announcement. If the queued announces overflow,
  69. // evict from the oldest.
  70. func (fp *fetcherPeer) addAnno(anno *announce) {
  71. // Short circuit if the anno already exists. In normal case it should
  72. // never happen since only monotonic anno is accepted. But the adversary
  73. // may feed us fake announces with higher td but same hash. In this case,
  74. // ignore the anno anyway.
  75. hash := anno.data.Hash
  76. if _, exist := fp.announces[hash]; exist {
  77. return
  78. }
  79. fp.announces[hash] = anno
  80. fp.fifo = append(fp.fifo, hash)
  81. // Evict oldest if the announces are oversized.
  82. if len(fp.fifo)-cachedAnnosThreshold > 0 {
  83. for i := 0; i < len(fp.fifo)-cachedAnnosThreshold; i++ {
  84. delete(fp.announces, fp.fifo[i])
  85. }
  86. copy(fp.fifo, fp.fifo[len(fp.fifo)-cachedAnnosThreshold:])
  87. fp.fifo = fp.fifo[:cachedAnnosThreshold]
  88. }
  89. }
  90. // forwardAnno removes all announces from the map with a number lower than
  91. // the provided threshold.
  92. func (fp *fetcherPeer) forwardAnno(td *big.Int) []*announce {
  93. var (
  94. cutset int
  95. evicted []*announce
  96. )
  97. for ; cutset < len(fp.fifo); cutset++ {
  98. anno := fp.announces[fp.fifo[cutset]]
  99. if anno == nil {
  100. continue // In theory it should never ever happen
  101. }
  102. if anno.data.Td.Cmp(td) > 0 {
  103. break
  104. }
  105. evicted = append(evicted, anno)
  106. delete(fp.announces, anno.data.Hash)
  107. }
  108. if cutset > 0 {
  109. copy(fp.fifo, fp.fifo[cutset:])
  110. fp.fifo = fp.fifo[:len(fp.fifo)-cutset]
  111. }
  112. return evicted
  113. }
  114. // lightFetcher implements retrieval of newly announced headers. It reuses
  115. // the eth.BlockFetcher as the underlying fetcher but adding more additional
  116. // rules: e.g. evict "timeout" peers.
  117. type lightFetcher struct {
  118. // Various handlers
  119. ulc *ulc
  120. chaindb ethdb.Database
  121. reqDist *requestDistributor
  122. peerset *serverPeerSet // The global peerset of light client which shared by all components
  123. chain *light.LightChain // The local light chain which maintains the canonical header chain.
  124. fetcher *fetcher.BlockFetcher // The underlying fetcher which takes care block header retrieval.
  125. // Peerset maintained by fetcher
  126. plock sync.RWMutex
  127. peers map[enode.ID]*fetcherPeer
  128. // Various channels
  129. announceCh chan *announce
  130. requestCh chan *request
  131. deliverCh chan *response
  132. syncDone chan *types.Header
  133. closeCh chan struct{}
  134. wg sync.WaitGroup
  135. // Callback
  136. synchronise func(peer *serverPeer)
  137. // Test fields or hooks
  138. newHeadHook func(*types.Header)
  139. }
  140. // newLightFetcher creates a light fetcher instance.
  141. func newLightFetcher(chain *light.LightChain, engine consensus.Engine, peers *serverPeerSet, ulc *ulc, chaindb ethdb.Database, reqDist *requestDistributor, syncFn func(p *serverPeer)) *lightFetcher {
  142. // Construct the fetcher by offering all necessary APIs
  143. validator := func(header *types.Header) error {
  144. // Disable seal verification explicitly if we are running in ulc mode.
  145. return engine.VerifyHeader(chain, header, ulc == nil)
  146. }
  147. heighter := func() uint64 { return chain.CurrentHeader().Number.Uint64() }
  148. dropper := func(id string) { peers.unregister(id) }
  149. inserter := func(headers []*types.Header) (int, error) {
  150. // Disable PoW checking explicitly if we are running in ulc mode.
  151. checkFreq := 1
  152. if ulc != nil {
  153. checkFreq = 0
  154. }
  155. return chain.InsertHeaderChain(headers, checkFreq)
  156. }
  157. f := &lightFetcher{
  158. ulc: ulc,
  159. peerset: peers,
  160. chaindb: chaindb,
  161. chain: chain,
  162. reqDist: reqDist,
  163. fetcher: fetcher.NewBlockFetcher(true, chain.GetHeaderByHash, nil, validator, nil, heighter, inserter, nil, dropper),
  164. peers: make(map[enode.ID]*fetcherPeer),
  165. synchronise: syncFn,
  166. announceCh: make(chan *announce),
  167. requestCh: make(chan *request),
  168. deliverCh: make(chan *response),
  169. syncDone: make(chan *types.Header),
  170. closeCh: make(chan struct{}),
  171. }
  172. peers.subscribe(f)
  173. return f
  174. }
  175. func (f *lightFetcher) start() {
  176. f.wg.Add(1)
  177. f.fetcher.Start()
  178. go f.mainloop()
  179. }
  180. func (f *lightFetcher) stop() {
  181. close(f.closeCh)
  182. f.fetcher.Stop()
  183. f.wg.Wait()
  184. }
  185. // registerPeer adds an new peer to the fetcher's peer set
  186. func (f *lightFetcher) registerPeer(p *serverPeer) {
  187. f.plock.Lock()
  188. defer f.plock.Unlock()
  189. f.peers[p.ID()] = &fetcherPeer{announces: make(map[common.Hash]*announce)}
  190. }
  191. // unregisterPeer removes the specified peer from the fetcher's peer set
  192. func (f *lightFetcher) unregisterPeer(p *serverPeer) {
  193. f.plock.Lock()
  194. defer f.plock.Unlock()
  195. delete(f.peers, p.ID())
  196. }
  197. // peer returns the peer from the fetcher peerset.
  198. func (f *lightFetcher) peer(id enode.ID) *fetcherPeer {
  199. f.plock.RLock()
  200. defer f.plock.RUnlock()
  201. return f.peers[id]
  202. }
  203. // forEachPeer iterates the fetcher peerset, abort the iteration if the
  204. // callback returns false.
  205. func (f *lightFetcher) forEachPeer(check func(id enode.ID, p *fetcherPeer) bool) {
  206. f.plock.RLock()
  207. defer f.plock.RUnlock()
  208. for id, peer := range f.peers {
  209. if !check(id, peer) {
  210. return
  211. }
  212. }
  213. }
  214. // mainloop is the main event loop of the light fetcher, which is responsible for
  215. // - announcement maintenance(ulc)
  216. // If we are running in ultra light client mode, then all announcements from
  217. // the trusted servers are maintained. If the same announcements from trusted
  218. // servers reach the threshold, then the relevant header is requested for retrieval.
  219. //
  220. // - block header retrieval
  221. // Whenever we receive announce with higher td compared with local chain, the
  222. // request will be made for header retrieval.
  223. //
  224. // - re-sync trigger
  225. // If the local chain lags too much, then the fetcher will enter "synchronise"
  226. // mode to retrieve missing headers in batch.
  227. func (f *lightFetcher) mainloop() {
  228. defer f.wg.Done()
  229. var (
  230. syncInterval = uint64(1) // Interval used to trigger a light resync.
  231. syncing bool // Indicator whether the client is syncing
  232. ulc = f.ulc != nil
  233. headCh = make(chan core.ChainHeadEvent, 100)
  234. fetching = make(map[uint64]*request)
  235. requestTimer = time.NewTimer(0)
  236. // Local status
  237. localHead = f.chain.CurrentHeader()
  238. localTd = f.chain.GetTd(localHead.Hash(), localHead.Number.Uint64())
  239. )
  240. sub := f.chain.SubscribeChainHeadEvent(headCh)
  241. defer sub.Unsubscribe()
  242. // reset updates the local status with given header.
  243. reset := func(header *types.Header) {
  244. localHead = header
  245. localTd = f.chain.GetTd(header.Hash(), header.Number.Uint64())
  246. }
  247. // trustedHeader returns an indicator whether the header is regarded as
  248. // trusted. If we are running in the ulc mode, only when we receive enough
  249. // same announcement from trusted server, the header will be trusted.
  250. trustedHeader := func(hash common.Hash, number uint64) (bool, []enode.ID) {
  251. var (
  252. agreed []enode.ID
  253. trusted bool
  254. )
  255. f.forEachPeer(func(id enode.ID, p *fetcherPeer) bool {
  256. if anno := p.announces[hash]; anno != nil && anno.trust && anno.data.Number == number {
  257. agreed = append(agreed, id)
  258. if 100*len(agreed)/len(f.ulc.keys) >= f.ulc.fraction {
  259. trusted = true
  260. return false // abort iteration
  261. }
  262. }
  263. return true
  264. })
  265. return trusted, agreed
  266. }
  267. for {
  268. select {
  269. case anno := <-f.announceCh:
  270. peerid, data := anno.peerid, anno.data
  271. log.Debug("Received new announce", "peer", peerid, "number", data.Number, "hash", data.Hash, "reorg", data.ReorgDepth)
  272. peer := f.peer(peerid)
  273. if peer == nil {
  274. log.Debug("Receive announce from unknown peer", "peer", peerid)
  275. continue
  276. }
  277. // Announced tds should be strictly monotonic, drop the peer if
  278. // the announce is out-of-order.
  279. if peer.latest != nil && data.Td.Cmp(peer.latest.Td) <= 0 {
  280. f.peerset.unregister(peerid.String())
  281. log.Debug("Non-monotonic td", "peer", peerid, "current", data.Td, "previous", peer.latest.Td)
  282. continue
  283. }
  284. peer.latest = data
  285. // Filter out any stale announce, the local chain is ahead of announce
  286. if localTd != nil && data.Td.Cmp(localTd) <= 0 {
  287. continue
  288. }
  289. peer.addAnno(anno)
  290. // If we are not syncing, try to trigger a single retrieval or re-sync
  291. if !ulc && !syncing {
  292. // Two scenarios lead to re-sync:
  293. // - reorg happens
  294. // - local chain lags
  295. // We can't retrieve the parent of the announce by single retrieval
  296. // in both cases, so resync is necessary.
  297. if data.Number > localHead.Number.Uint64()+syncInterval || data.ReorgDepth > 0 {
  298. syncing = true
  299. go f.startSync(peerid)
  300. log.Debug("Trigger light sync", "peer", peerid, "local", localHead.Number, "localhash", localHead.Hash(), "remote", data.Number, "remotehash", data.Hash)
  301. continue
  302. }
  303. f.fetcher.Notify(peerid.String(), data.Hash, data.Number, time.Now(), f.requestHeaderByHash(peerid), nil)
  304. log.Debug("Trigger header retrieval", "peer", peerid, "number", data.Number, "hash", data.Hash)
  305. }
  306. // Keep collecting announces from trusted server even we are syncing.
  307. if ulc && anno.trust {
  308. // Notify underlying fetcher to retrieve header or trigger a resync if
  309. // we have receive enough announcements from trusted server.
  310. trusted, agreed := trustedHeader(data.Hash, data.Number)
  311. if trusted && !syncing {
  312. if data.Number > localHead.Number.Uint64()+syncInterval || data.ReorgDepth > 0 {
  313. syncing = true
  314. go f.startSync(peerid)
  315. log.Debug("Trigger trusted light sync", "local", localHead.Number, "localhash", localHead.Hash(), "remote", data.Number, "remotehash", data.Hash)
  316. continue
  317. }
  318. p := agreed[rand.Intn(len(agreed))]
  319. f.fetcher.Notify(p.String(), data.Hash, data.Number, time.Now(), f.requestHeaderByHash(p), nil)
  320. log.Debug("Trigger trusted header retrieval", "number", data.Number, "hash", data.Hash)
  321. }
  322. }
  323. case req := <-f.requestCh:
  324. fetching[req.reqid] = req // Tracking all in-flight requests for response latency statistic.
  325. if len(fetching) == 1 {
  326. f.rescheduleTimer(fetching, requestTimer)
  327. }
  328. case <-requestTimer.C:
  329. for reqid, request := range fetching {
  330. if time.Since(request.sendAt) > blockDelayTimeout-gatherSlack {
  331. delete(fetching, reqid)
  332. f.peerset.unregister(request.peerid.String())
  333. log.Debug("Request timeout", "peer", request.peerid, "reqid", reqid)
  334. }
  335. }
  336. f.rescheduleTimer(fetching, requestTimer)
  337. case resp := <-f.deliverCh:
  338. if req := fetching[resp.reqid]; req != nil {
  339. delete(fetching, resp.reqid)
  340. f.rescheduleTimer(fetching, requestTimer)
  341. // The underlying fetcher does not check the consistency of request and response.
  342. // The adversary can send the fake announces with invalid hash and number but always
  343. // delivery some mismatched header. So it can't be punished by the underlying fetcher.
  344. // We have to add two more rules here to detect.
  345. if len(resp.headers) != 1 {
  346. f.peerset.unregister(req.peerid.String())
  347. log.Debug("Deliver more than requested", "peer", req.peerid, "reqid", req.reqid)
  348. continue
  349. }
  350. if resp.headers[0].Hash() != req.hash {
  351. f.peerset.unregister(req.peerid.String())
  352. log.Debug("Deliver invalid header", "peer", req.peerid, "reqid", req.reqid)
  353. continue
  354. }
  355. resp.remain <- f.fetcher.FilterHeaders(resp.peerid.String(), resp.headers, time.Now())
  356. } else {
  357. // Discard the entire packet no matter it's a timeout response or unexpected one.
  358. resp.remain <- resp.headers
  359. }
  360. case ev := <-headCh:
  361. // Short circuit if we are still syncing.
  362. if syncing {
  363. continue
  364. }
  365. reset(ev.Block.Header())
  366. // Clean stale announcements from les-servers.
  367. var droplist []enode.ID
  368. f.forEachPeer(func(id enode.ID, p *fetcherPeer) bool {
  369. removed := p.forwardAnno(localTd)
  370. for _, anno := range removed {
  371. if header := f.chain.GetHeaderByHash(anno.data.Hash); header != nil {
  372. if header.Number.Uint64() != anno.data.Number {
  373. droplist = append(droplist, id)
  374. break
  375. }
  376. // In theory td should exists.
  377. td := f.chain.GetTd(anno.data.Hash, anno.data.Number)
  378. if td != nil && td.Cmp(anno.data.Td) != 0 {
  379. droplist = append(droplist, id)
  380. break
  381. }
  382. }
  383. }
  384. return true
  385. })
  386. for _, id := range droplist {
  387. f.peerset.unregister(id.String())
  388. log.Debug("Kicked out peer for invalid announcement")
  389. }
  390. if f.newHeadHook != nil {
  391. f.newHeadHook(localHead)
  392. }
  393. case origin := <-f.syncDone:
  394. syncing = false // Reset the status
  395. // Rewind all untrusted headers for ulc mode.
  396. if ulc {
  397. head := f.chain.CurrentHeader()
  398. ancestor := rawdb.FindCommonAncestor(f.chaindb, origin, head)
  399. // Recap the ancestor with genesis header in case the ancestor
  400. // is not found. It can happen the original head is before the
  401. // checkpoint while the synced headers are after it. In this
  402. // case there is no ancestor between them.
  403. if ancestor == nil {
  404. ancestor = f.chain.Genesis().Header()
  405. }
  406. var untrusted []common.Hash
  407. for head.Number.Cmp(ancestor.Number) > 0 {
  408. hash, number := head.Hash(), head.Number.Uint64()
  409. if trusted, _ := trustedHeader(hash, number); trusted {
  410. break
  411. }
  412. untrusted = append(untrusted, hash)
  413. head = f.chain.GetHeader(head.ParentHash, number-1)
  414. if head == nil {
  415. break // all the synced headers will be dropped
  416. }
  417. }
  418. if len(untrusted) > 0 {
  419. for i, j := 0, len(untrusted)-1; i < j; i, j = i+1, j-1 {
  420. untrusted[i], untrusted[j] = untrusted[j], untrusted[i]
  421. }
  422. f.chain.Rollback(untrusted)
  423. }
  424. }
  425. // Reset local status.
  426. reset(f.chain.CurrentHeader())
  427. if f.newHeadHook != nil {
  428. f.newHeadHook(localHead)
  429. }
  430. log.Debug("light sync finished", "number", localHead.Number, "hash", localHead.Hash())
  431. case <-f.closeCh:
  432. return
  433. }
  434. }
  435. }
  436. // announce processes a new announcement message received from a peer.
  437. func (f *lightFetcher) announce(p *serverPeer, head *announceData) {
  438. select {
  439. case f.announceCh <- &announce{peerid: p.ID(), trust: p.trusted, data: head}:
  440. case <-f.closeCh:
  441. return
  442. }
  443. }
  444. // trackRequest sends a reqID to main loop for in-flight request tracking.
  445. func (f *lightFetcher) trackRequest(peerid enode.ID, reqid uint64, hash common.Hash) {
  446. select {
  447. case f.requestCh <- &request{reqid: reqid, peerid: peerid, sendAt: time.Now(), hash: hash}:
  448. case <-f.closeCh:
  449. }
  450. }
  451. // requestHeaderByHash constructs a header retrieval request and sends it to
  452. // local request distributor.
  453. //
  454. // Note, we rely on the underlying eth/fetcher to retrieve and validate the
  455. // response, so that we have to obey the rule of eth/fetcher which only accepts
  456. // the response from given peer.
  457. func (f *lightFetcher) requestHeaderByHash(peerid enode.ID) func(common.Hash) error {
  458. return func(hash common.Hash) error {
  459. req := &distReq{
  460. getCost: func(dp distPeer) uint64 { return dp.(*serverPeer).getRequestCost(GetBlockHeadersMsg, 1) },
  461. canSend: func(dp distPeer) bool { return dp.(*serverPeer).ID() == peerid },
  462. request: func(dp distPeer) func() {
  463. peer, id := dp.(*serverPeer), rand.Uint64()
  464. cost := peer.getRequestCost(GetBlockHeadersMsg, 1)
  465. peer.fcServer.QueuedRequest(id, cost)
  466. return func() {
  467. f.trackRequest(peer.ID(), id, hash)
  468. peer.requestHeadersByHash(id, hash, 1, 0, false)
  469. }
  470. },
  471. }
  472. f.reqDist.queue(req)
  473. return nil
  474. }
  475. }
  476. // startSync invokes synchronisation callback to start syncing.
  477. func (f *lightFetcher) startSync(id enode.ID) {
  478. defer func(header *types.Header) {
  479. f.syncDone <- header
  480. }(f.chain.CurrentHeader())
  481. peer := f.peerset.peer(id.String())
  482. if peer == nil || peer.onlyAnnounce {
  483. return
  484. }
  485. f.synchronise(peer)
  486. }
  487. // deliverHeaders delivers header download request responses for processing
  488. func (f *lightFetcher) deliverHeaders(peer *serverPeer, reqid uint64, headers []*types.Header) []*types.Header {
  489. remain := make(chan []*types.Header, 1)
  490. select {
  491. case f.deliverCh <- &response{reqid: reqid, headers: headers, peerid: peer.ID(), remain: remain}:
  492. case <-f.closeCh:
  493. return nil
  494. }
  495. return <-remain
  496. }
  497. // rescheduleTimer resets the specified timeout timer to the next request timeout.
  498. func (f *lightFetcher) rescheduleTimer(requests map[uint64]*request, timer *time.Timer) {
  499. // Short circuit if no inflight requests
  500. if len(requests) == 0 {
  501. timer.Stop()
  502. return
  503. }
  504. // Otherwise find the earliest expiring request
  505. earliest := time.Now()
  506. for _, req := range requests {
  507. if earliest.After(req.sendAt) {
  508. earliest = req.sendAt
  509. }
  510. }
  511. timer.Reset(blockDelayTimeout - time.Since(earliest))
  512. }