fetcher.go 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891
  1. // Copyright 2016 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package les
  17. import (
  18. "math/big"
  19. "sync"
  20. "time"
  21. "github.com/ethereum/go-ethereum/common"
  22. "github.com/ethereum/go-ethereum/common/mclock"
  23. "github.com/ethereum/go-ethereum/consensus"
  24. "github.com/ethereum/go-ethereum/core/rawdb"
  25. "github.com/ethereum/go-ethereum/core/types"
  26. "github.com/ethereum/go-ethereum/light"
  27. "github.com/ethereum/go-ethereum/log"
  28. )
  29. const (
  30. blockDelayTimeout = time.Second * 10 // timeout for a peer to announce a head that has already been confirmed by others
  31. maxNodeCount = 20 // maximum number of fetcherTreeNode entries remembered for each peer
  32. serverStateAvailable = 100 // number of recent blocks where state availability is assumed
  33. )
  34. // lightFetcher implements retrieval of newly announced headers. It also provides a peerHasBlock function for the
  35. // ODR system to ensure that we only request data related to a certain block from peers who have already processed
  36. // and announced that block.
  37. type lightFetcher struct {
  38. handler *clientHandler
  39. chain *light.LightChain
  40. lock sync.Mutex // lock protects access to the fetcher's internal state variables except sent requests
  41. maxConfirmedTd *big.Int
  42. peers map[*serverPeer]*fetcherPeerInfo
  43. lastUpdateStats *updateStatsEntry
  44. syncing bool
  45. syncDone chan *serverPeer
  46. reqMu sync.RWMutex // reqMu protects access to sent header fetch requests
  47. requested map[uint64]fetchRequest
  48. deliverChn chan fetchResponse
  49. timeoutChn chan uint64
  50. requestTriggered bool
  51. requestTrigger chan struct{}
  52. lastTrustedHeader *types.Header
  53. closeCh chan struct{}
  54. wg sync.WaitGroup
  55. }
  56. // fetcherPeerInfo holds fetcher-specific information about each active peer
  57. type fetcherPeerInfo struct {
  58. root, lastAnnounced *fetcherTreeNode
  59. nodeCnt int
  60. confirmedTd *big.Int
  61. bestConfirmed *fetcherTreeNode
  62. nodeByHash map[common.Hash]*fetcherTreeNode
  63. firstUpdateStats *updateStatsEntry
  64. }
  65. // fetcherTreeNode is a node of a tree that holds information about blocks recently
  66. // announced and confirmed by a certain peer. Each new announce message from a peer
  67. // adds nodes to the tree, based on the previous announced head and the reorg depth.
  68. // There are three possible states for a tree node:
  69. // - announced: not downloaded (known) yet, but we know its head, number and td
  70. // - intermediate: not known, hash and td are empty, they are filled out when it becomes known
  71. // - known: both announced by this peer and downloaded (from any peer).
  72. // This structure makes it possible to always know which peer has a certain block,
  73. // which is necessary for selecting a suitable peer for ODR requests and also for
  74. // canonizing new heads. It also helps to always download the minimum necessary
  75. // amount of headers with a single request.
  76. type fetcherTreeNode struct {
  77. hash common.Hash
  78. number uint64
  79. td *big.Int
  80. known, requested bool
  81. parent *fetcherTreeNode
  82. children []*fetcherTreeNode
  83. }
  84. // fetchRequest represents a header download request
  85. type fetchRequest struct {
  86. hash common.Hash
  87. amount uint64
  88. peer *serverPeer
  89. sent mclock.AbsTime
  90. timeout bool
  91. }
  92. // fetchResponse represents a header download response
  93. type fetchResponse struct {
  94. reqID uint64
  95. headers []*types.Header
  96. peer *serverPeer
  97. }
  98. // newLightFetcher creates a new light fetcher
  99. func newLightFetcher(h *clientHandler) *lightFetcher {
  100. f := &lightFetcher{
  101. handler: h,
  102. chain: h.backend.blockchain,
  103. peers: make(map[*serverPeer]*fetcherPeerInfo),
  104. deliverChn: make(chan fetchResponse, 100),
  105. requested: make(map[uint64]fetchRequest),
  106. timeoutChn: make(chan uint64),
  107. requestTrigger: make(chan struct{}, 1),
  108. syncDone: make(chan *serverPeer),
  109. closeCh: make(chan struct{}),
  110. maxConfirmedTd: big.NewInt(0),
  111. }
  112. h.backend.peers.subscribe(f)
  113. f.wg.Add(1)
  114. go f.syncLoop()
  115. return f
  116. }
  117. func (f *lightFetcher) close() {
  118. close(f.closeCh)
  119. f.wg.Wait()
  120. }
  121. // syncLoop is the main event loop of the light fetcher
  122. func (f *lightFetcher) syncLoop() {
  123. defer f.wg.Done()
  124. for {
  125. select {
  126. case <-f.closeCh:
  127. return
  128. // request loop keeps running until no further requests are necessary or possible
  129. case <-f.requestTrigger:
  130. f.lock.Lock()
  131. var (
  132. rq *distReq
  133. reqID uint64
  134. syncing bool
  135. )
  136. if !f.syncing {
  137. rq, reqID, syncing = f.nextRequest()
  138. }
  139. f.requestTriggered = rq != nil
  140. f.lock.Unlock()
  141. if rq != nil {
  142. if _, ok := <-f.handler.backend.reqDist.queue(rq); ok {
  143. if syncing {
  144. f.lock.Lock()
  145. f.syncing = true
  146. f.lock.Unlock()
  147. } else {
  148. go func() {
  149. time.Sleep(softRequestTimeout)
  150. f.reqMu.Lock()
  151. req, ok := f.requested[reqID]
  152. if ok {
  153. req.timeout = true
  154. f.requested[reqID] = req
  155. }
  156. f.reqMu.Unlock()
  157. // keep starting new requests while possible
  158. f.requestTrigger <- struct{}{}
  159. }()
  160. }
  161. } else {
  162. f.requestTrigger <- struct{}{}
  163. }
  164. }
  165. case reqID := <-f.timeoutChn:
  166. f.reqMu.Lock()
  167. req, ok := f.requested[reqID]
  168. if ok {
  169. delete(f.requested, reqID)
  170. }
  171. f.reqMu.Unlock()
  172. if ok {
  173. f.handler.backend.serverPool.adjustResponseTime(req.peer.poolEntry, time.Duration(mclock.Now()-req.sent), true)
  174. req.peer.Log().Debug("Fetching data timed out hard")
  175. go f.handler.removePeer(req.peer.id)
  176. }
  177. case resp := <-f.deliverChn:
  178. f.reqMu.Lock()
  179. req, ok := f.requested[resp.reqID]
  180. if ok && req.peer != resp.peer {
  181. ok = false
  182. }
  183. if ok {
  184. delete(f.requested, resp.reqID)
  185. }
  186. f.reqMu.Unlock()
  187. if ok {
  188. f.handler.backend.serverPool.adjustResponseTime(req.peer.poolEntry, time.Duration(mclock.Now()-req.sent), req.timeout)
  189. }
  190. f.lock.Lock()
  191. if !ok || !(f.syncing || f.processResponse(req, resp)) {
  192. resp.peer.Log().Debug("Failed processing response")
  193. go f.handler.removePeer(resp.peer.id)
  194. }
  195. f.lock.Unlock()
  196. case p := <-f.syncDone:
  197. f.lock.Lock()
  198. p.Log().Debug("Done synchronising with peer")
  199. f.checkSyncedHeaders(p)
  200. f.syncing = false
  201. f.lock.Unlock()
  202. f.requestTrigger <- struct{}{} // f.requestTriggered is always true here
  203. }
  204. }
  205. }
  206. // registerPeer adds a new peer to the fetcher's peer set
  207. func (f *lightFetcher) registerPeer(p *serverPeer) {
  208. p.lock.Lock()
  209. p.hasBlock = func(hash common.Hash, number uint64, hasState bool) bool {
  210. return f.peerHasBlock(p, hash, number, hasState)
  211. }
  212. p.lock.Unlock()
  213. f.lock.Lock()
  214. defer f.lock.Unlock()
  215. f.peers[p] = &fetcherPeerInfo{nodeByHash: make(map[common.Hash]*fetcherTreeNode)}
  216. }
  217. // unregisterPeer removes a new peer from the fetcher's peer set
  218. func (f *lightFetcher) unregisterPeer(p *serverPeer) {
  219. p.lock.Lock()
  220. p.hasBlock = nil
  221. p.lock.Unlock()
  222. f.lock.Lock()
  223. defer f.lock.Unlock()
  224. // check for potential timed out block delay statistics
  225. f.checkUpdateStats(p, nil)
  226. delete(f.peers, p)
  227. }
  228. // announce processes a new announcement message received from a peer, adding new
  229. // nodes to the peer's block tree and removing old nodes if necessary
  230. func (f *lightFetcher) announce(p *serverPeer, head *announceData) {
  231. f.lock.Lock()
  232. defer f.lock.Unlock()
  233. p.Log().Debug("Received new announcement", "number", head.Number, "hash", head.Hash, "reorg", head.ReorgDepth)
  234. fp := f.peers[p]
  235. if fp == nil {
  236. p.Log().Debug("Announcement from unknown peer")
  237. return
  238. }
  239. if fp.lastAnnounced != nil && head.Td.Cmp(fp.lastAnnounced.td) <= 0 {
  240. // announced tds should be strictly monotonic
  241. p.Log().Debug("Received non-monotonic td", "current", head.Td, "previous", fp.lastAnnounced.td)
  242. go f.handler.removePeer(p.id)
  243. return
  244. }
  245. n := fp.lastAnnounced
  246. for i := uint64(0); i < head.ReorgDepth; i++ {
  247. if n == nil {
  248. break
  249. }
  250. n = n.parent
  251. }
  252. // n is now the reorg common ancestor, add a new branch of nodes
  253. if n != nil && (head.Number >= n.number+maxNodeCount || head.Number <= n.number) {
  254. // if announced head block height is lower or same as n or too far from it to add
  255. // intermediate nodes then discard previous announcement info and trigger a resync
  256. n = nil
  257. fp.nodeCnt = 0
  258. fp.nodeByHash = make(map[common.Hash]*fetcherTreeNode)
  259. }
  260. // check if the node count is too high to add new nodes, discard oldest ones if necessary
  261. if n != nil {
  262. // n is now the reorg common ancestor, add a new branch of nodes
  263. // check if the node count is too high to add new nodes
  264. locked := false
  265. for uint64(fp.nodeCnt)+head.Number-n.number > maxNodeCount && fp.root != nil {
  266. if !locked {
  267. f.chain.LockChain()
  268. defer f.chain.UnlockChain()
  269. locked = true
  270. }
  271. // if one of root's children is canonical, keep it, delete other branches and root itself
  272. var newRoot *fetcherTreeNode
  273. for i, nn := range fp.root.children {
  274. if rawdb.ReadCanonicalHash(f.handler.backend.chainDb, nn.number) == nn.hash {
  275. fp.root.children = append(fp.root.children[:i], fp.root.children[i+1:]...)
  276. nn.parent = nil
  277. newRoot = nn
  278. break
  279. }
  280. }
  281. fp.deleteNode(fp.root)
  282. if n == fp.root {
  283. n = newRoot
  284. }
  285. fp.root = newRoot
  286. if newRoot == nil || !f.checkKnownNode(p, newRoot) {
  287. fp.bestConfirmed = nil
  288. fp.confirmedTd = nil
  289. }
  290. if n == nil {
  291. break
  292. }
  293. }
  294. if n != nil {
  295. for n.number < head.Number {
  296. nn := &fetcherTreeNode{number: n.number + 1, parent: n}
  297. n.children = append(n.children, nn)
  298. n = nn
  299. fp.nodeCnt++
  300. }
  301. n.hash = head.Hash
  302. n.td = head.Td
  303. fp.nodeByHash[n.hash] = n
  304. }
  305. }
  306. if n == nil {
  307. // could not find reorg common ancestor or had to delete entire tree, a new root and a resync is needed
  308. if fp.root != nil {
  309. fp.deleteNode(fp.root)
  310. }
  311. n = &fetcherTreeNode{hash: head.Hash, number: head.Number, td: head.Td}
  312. fp.root = n
  313. fp.nodeCnt++
  314. fp.nodeByHash[n.hash] = n
  315. fp.bestConfirmed = nil
  316. fp.confirmedTd = nil
  317. }
  318. f.checkKnownNode(p, n)
  319. p.lock.Lock()
  320. p.headInfo = blockInfo{Number: head.Number, Hash: head.Hash, Td: head.Td}
  321. fp.lastAnnounced = n
  322. p.lock.Unlock()
  323. f.checkUpdateStats(p, nil)
  324. if !f.requestTriggered {
  325. f.requestTriggered = true
  326. f.requestTrigger <- struct{}{}
  327. }
  328. }
  329. // peerHasBlock returns true if we can assume the peer knows the given block
  330. // based on its announcements
  331. func (f *lightFetcher) peerHasBlock(p *serverPeer, hash common.Hash, number uint64, hasState bool) bool {
  332. f.lock.Lock()
  333. defer f.lock.Unlock()
  334. fp := f.peers[p]
  335. if fp == nil || fp.root == nil {
  336. return false
  337. }
  338. if hasState {
  339. if fp.lastAnnounced == nil || fp.lastAnnounced.number > number+serverStateAvailable {
  340. return false
  341. }
  342. }
  343. if f.syncing {
  344. // always return true when syncing
  345. // false positives are acceptable, a more sophisticated condition can be implemented later
  346. return true
  347. }
  348. if number >= fp.root.number {
  349. // it is recent enough that if it is known, is should be in the peer's block tree
  350. return fp.nodeByHash[hash] != nil
  351. }
  352. f.chain.LockChain()
  353. defer f.chain.UnlockChain()
  354. // if it's older than the peer's block tree root but it's in the same canonical chain
  355. // as the root, we can still be sure the peer knows it
  356. //
  357. // when syncing, just check if it is part of the known chain, there is nothing better we
  358. // can do since we do not know the most recent block hash yet
  359. return rawdb.ReadCanonicalHash(f.handler.backend.chainDb, fp.root.number) == fp.root.hash && rawdb.ReadCanonicalHash(f.handler.backend.chainDb, number) == hash
  360. }
  361. // requestAmount calculates the amount of headers to be downloaded starting
  362. // from a certain head backwards
  363. func (f *lightFetcher) requestAmount(p *serverPeer, n *fetcherTreeNode) uint64 {
  364. amount := uint64(0)
  365. nn := n
  366. for nn != nil && !f.checkKnownNode(p, nn) {
  367. nn = nn.parent
  368. amount++
  369. }
  370. if nn == nil {
  371. amount = n.number
  372. }
  373. return amount
  374. }
  375. // requestedID tells if a certain reqID has been requested by the fetcher
  376. func (f *lightFetcher) requestedID(reqID uint64) bool {
  377. f.reqMu.RLock()
  378. _, ok := f.requested[reqID]
  379. f.reqMu.RUnlock()
  380. return ok
  381. }
  382. // nextRequest selects the peer and announced head to be requested next, amount
  383. // to be downloaded starting from the head backwards is also returned
  384. func (f *lightFetcher) nextRequest() (*distReq, uint64, bool) {
  385. var (
  386. bestHash common.Hash
  387. bestAmount uint64
  388. bestTd *big.Int
  389. bestSyncing bool
  390. )
  391. bestHash, bestAmount, bestTd, bestSyncing = f.findBestRequest()
  392. if bestTd == f.maxConfirmedTd {
  393. return nil, 0, false
  394. }
  395. var rq *distReq
  396. reqID := genReqID()
  397. if bestSyncing {
  398. rq = f.newFetcherDistReqForSync(bestHash)
  399. } else {
  400. rq = f.newFetcherDistReq(bestHash, reqID, bestAmount)
  401. }
  402. return rq, reqID, bestSyncing
  403. }
  404. // findBestRequest finds the best head to request that has been announced by but not yet requested from a known peer.
  405. // It also returns the announced Td (which should be verified after fetching the head),
  406. // the necessary amount to request and whether a downloader sync is necessary instead of a normal header request.
  407. func (f *lightFetcher) findBestRequest() (bestHash common.Hash, bestAmount uint64, bestTd *big.Int, bestSyncing bool) {
  408. bestTd = f.maxConfirmedTd
  409. bestSyncing = false
  410. for p, fp := range f.peers {
  411. for hash, n := range fp.nodeByHash {
  412. if f.checkKnownNode(p, n) || n.requested {
  413. continue
  414. }
  415. // if ulc mode is disabled, isTrustedHash returns true
  416. amount := f.requestAmount(p, n)
  417. if (bestTd == nil || n.td.Cmp(bestTd) > 0 || amount < bestAmount) && (f.isTrustedHash(hash) || f.maxConfirmedTd.Int64() == 0) {
  418. bestHash = hash
  419. bestTd = n.td
  420. bestAmount = amount
  421. bestSyncing = fp.bestConfirmed == nil || fp.root == nil || !f.checkKnownNode(p, fp.root)
  422. }
  423. }
  424. }
  425. return
  426. }
  427. // isTrustedHash checks if the block can be trusted by the minimum trusted fraction.
  428. func (f *lightFetcher) isTrustedHash(hash common.Hash) bool {
  429. // If ultra light cliet mode is disabled, trust all hashes
  430. if f.handler.ulc == nil {
  431. return true
  432. }
  433. // Ultra light enabled, only trust after enough confirmations
  434. var agreed int
  435. for peer, info := range f.peers {
  436. if peer.trusted && info.nodeByHash[hash] != nil {
  437. agreed++
  438. }
  439. }
  440. return 100*agreed/len(f.handler.ulc.keys) >= f.handler.ulc.fraction
  441. }
  442. func (f *lightFetcher) newFetcherDistReqForSync(bestHash common.Hash) *distReq {
  443. return &distReq{
  444. getCost: func(dp distPeer) uint64 {
  445. return 0
  446. },
  447. canSend: func(dp distPeer) bool {
  448. p := dp.(*serverPeer)
  449. f.lock.Lock()
  450. defer f.lock.Unlock()
  451. if p.onlyAnnounce {
  452. return false
  453. }
  454. fp := f.peers[p]
  455. return fp != nil && fp.nodeByHash[bestHash] != nil
  456. },
  457. request: func(dp distPeer) func() {
  458. if f.handler.ulc != nil {
  459. // Keep last trusted header before sync
  460. f.setLastTrustedHeader(f.chain.CurrentHeader())
  461. }
  462. go func() {
  463. p := dp.(*serverPeer)
  464. p.Log().Debug("Synchronisation started")
  465. f.handler.synchronise(p)
  466. f.syncDone <- p
  467. }()
  468. return nil
  469. },
  470. }
  471. }
  472. // newFetcherDistReq creates a new request for the distributor.
  473. func (f *lightFetcher) newFetcherDistReq(bestHash common.Hash, reqID uint64, bestAmount uint64) *distReq {
  474. return &distReq{
  475. getCost: func(dp distPeer) uint64 {
  476. p := dp.(*serverPeer)
  477. return p.getRequestCost(GetBlockHeadersMsg, int(bestAmount))
  478. },
  479. canSend: func(dp distPeer) bool {
  480. p := dp.(*serverPeer)
  481. f.lock.Lock()
  482. defer f.lock.Unlock()
  483. if p.onlyAnnounce {
  484. return false
  485. }
  486. fp := f.peers[p]
  487. if fp == nil {
  488. return false
  489. }
  490. n := fp.nodeByHash[bestHash]
  491. return n != nil && !n.requested
  492. },
  493. request: func(dp distPeer) func() {
  494. p := dp.(*serverPeer)
  495. f.lock.Lock()
  496. fp := f.peers[p]
  497. if fp != nil {
  498. n := fp.nodeByHash[bestHash]
  499. if n != nil {
  500. n.requested = true
  501. }
  502. }
  503. f.lock.Unlock()
  504. cost := p.getRequestCost(GetBlockHeadersMsg, int(bestAmount))
  505. p.fcServer.QueuedRequest(reqID, cost)
  506. f.reqMu.Lock()
  507. f.requested[reqID] = fetchRequest{hash: bestHash, amount: bestAmount, peer: p, sent: mclock.Now()}
  508. f.reqMu.Unlock()
  509. go func() {
  510. time.Sleep(hardRequestTimeout)
  511. f.timeoutChn <- reqID
  512. }()
  513. return func() { p.requestHeadersByHash(reqID, bestHash, int(bestAmount), 0, true) }
  514. },
  515. }
  516. }
  517. // deliverHeaders delivers header download request responses for processing
  518. func (f *lightFetcher) deliverHeaders(peer *serverPeer, reqID uint64, headers []*types.Header) {
  519. f.deliverChn <- fetchResponse{reqID: reqID, headers: headers, peer: peer}
  520. }
  521. // processResponse processes header download request responses, returns true if successful
  522. func (f *lightFetcher) processResponse(req fetchRequest, resp fetchResponse) bool {
  523. if uint64(len(resp.headers)) != req.amount || resp.headers[0].Hash() != req.hash {
  524. req.peer.Log().Debug("Response content mismatch", "requested", len(resp.headers), "reqfrom", resp.headers[0], "delivered", req.amount, "delfrom", req.hash)
  525. return false
  526. }
  527. headers := make([]*types.Header, req.amount)
  528. for i, header := range resp.headers {
  529. headers[int(req.amount)-1-i] = header
  530. }
  531. if _, err := f.chain.InsertHeaderChain(headers, 1); err != nil {
  532. if err == consensus.ErrFutureBlock {
  533. return true
  534. }
  535. log.Debug("Failed to insert header chain", "err", err)
  536. return false
  537. }
  538. tds := make([]*big.Int, len(headers))
  539. for i, header := range headers {
  540. td := f.chain.GetTd(header.Hash(), header.Number.Uint64())
  541. if td == nil {
  542. log.Debug("Total difficulty not found for header", "index", i+1, "number", header.Number, "hash", header.Hash())
  543. return false
  544. }
  545. tds[i] = td
  546. }
  547. f.newHeaders(headers, tds)
  548. return true
  549. }
  550. // newHeaders updates the block trees of all active peers according to a newly
  551. // downloaded and validated batch or headers
  552. func (f *lightFetcher) newHeaders(headers []*types.Header, tds []*big.Int) {
  553. var maxTd *big.Int
  554. for p, fp := range f.peers {
  555. if !f.checkAnnouncedHeaders(fp, headers, tds) {
  556. p.Log().Debug("Inconsistent announcement")
  557. go f.handler.removePeer(p.id)
  558. }
  559. if fp.confirmedTd != nil && (maxTd == nil || maxTd.Cmp(fp.confirmedTd) > 0) {
  560. maxTd = fp.confirmedTd
  561. }
  562. }
  563. if maxTd != nil {
  564. f.updateMaxConfirmedTd(maxTd)
  565. }
  566. }
  567. // checkAnnouncedHeaders updates peer's block tree if necessary after validating
  568. // a batch of headers. It searches for the latest header in the batch that has a
  569. // matching tree node (if any), and if it has not been marked as known already,
  570. // sets it and its parents to known (even those which are older than the currently
  571. // validated ones). Return value shows if all hashes, numbers and Tds matched
  572. // correctly to the announced values (otherwise the peer should be dropped).
  573. func (f *lightFetcher) checkAnnouncedHeaders(fp *fetcherPeerInfo, headers []*types.Header, tds []*big.Int) bool {
  574. var (
  575. n *fetcherTreeNode
  576. header *types.Header
  577. td *big.Int
  578. )
  579. for i := len(headers) - 1; ; i-- {
  580. if i < 0 {
  581. if n == nil {
  582. // no more headers and nothing to match
  583. return true
  584. }
  585. // we ran out of recently delivered headers but have not reached a node known by this peer yet, continue matching
  586. hash, number := header.ParentHash, header.Number.Uint64()-1
  587. td = f.chain.GetTd(hash, number)
  588. header = f.chain.GetHeader(hash, number)
  589. if header == nil || td == nil {
  590. log.Error("Missing parent of validated header", "hash", hash, "number", number)
  591. return false
  592. }
  593. } else {
  594. header = headers[i]
  595. td = tds[i]
  596. }
  597. hash := header.Hash()
  598. number := header.Number.Uint64()
  599. if n == nil {
  600. n = fp.nodeByHash[hash]
  601. }
  602. if n != nil {
  603. if n.td == nil {
  604. // node was unannounced
  605. if nn := fp.nodeByHash[hash]; nn != nil {
  606. // if there was already a node with the same hash, continue there and drop this one
  607. nn.children = append(nn.children, n.children...)
  608. n.children = nil
  609. fp.deleteNode(n)
  610. n = nn
  611. } else {
  612. n.hash = hash
  613. n.td = td
  614. fp.nodeByHash[hash] = n
  615. }
  616. }
  617. // check if it matches the header
  618. if n.hash != hash || n.number != number || n.td.Cmp(td) != 0 {
  619. // peer has previously made an invalid announcement
  620. return false
  621. }
  622. if n.known {
  623. // we reached a known node that matched our expectations, return with success
  624. return true
  625. }
  626. n.known = true
  627. if fp.confirmedTd == nil || td.Cmp(fp.confirmedTd) > 0 {
  628. fp.confirmedTd = td
  629. fp.bestConfirmed = n
  630. }
  631. n = n.parent
  632. if n == nil {
  633. return true
  634. }
  635. }
  636. }
  637. }
  638. // checkSyncedHeaders updates peer's block tree after synchronisation by marking
  639. // downloaded headers as known. If none of the announced headers are found after
  640. // syncing, the peer is dropped.
  641. func (f *lightFetcher) checkSyncedHeaders(p *serverPeer) {
  642. fp := f.peers[p]
  643. if fp == nil {
  644. p.Log().Debug("Unknown peer to check sync headers")
  645. return
  646. }
  647. var (
  648. node = fp.lastAnnounced
  649. td *big.Int
  650. )
  651. if f.handler.ulc != nil {
  652. // Roll back untrusted blocks
  653. h, unapproved := f.lastTrustedTreeNode(p)
  654. f.chain.Rollback(unapproved)
  655. node = fp.nodeByHash[h.Hash()]
  656. }
  657. // Find last valid block
  658. for node != nil {
  659. if td = f.chain.GetTd(node.hash, node.number); td != nil {
  660. break
  661. }
  662. node = node.parent
  663. }
  664. // Now node is the latest downloaded/approved header after syncing
  665. if node == nil {
  666. p.Log().Debug("Synchronisation failed")
  667. go f.handler.removePeer(p.id)
  668. return
  669. }
  670. header := f.chain.GetHeader(node.hash, node.number)
  671. f.newHeaders([]*types.Header{header}, []*big.Int{td})
  672. }
  673. // lastTrustedTreeNode return last approved treeNode and a list of unapproved hashes
  674. func (f *lightFetcher) lastTrustedTreeNode(p *serverPeer) (*types.Header, []common.Hash) {
  675. unapprovedHashes := make([]common.Hash, 0)
  676. current := f.chain.CurrentHeader()
  677. if f.lastTrustedHeader == nil {
  678. return current, unapprovedHashes
  679. }
  680. canonical := f.chain.CurrentHeader()
  681. if canonical.Number.Uint64() > f.lastTrustedHeader.Number.Uint64() {
  682. canonical = f.chain.GetHeaderByNumber(f.lastTrustedHeader.Number.Uint64())
  683. }
  684. commonAncestor := rawdb.FindCommonAncestor(f.handler.backend.chainDb, canonical, f.lastTrustedHeader)
  685. if commonAncestor == nil {
  686. log.Error("Common ancestor of last trusted header and canonical header is nil", "canonical hash", canonical.Hash(), "trusted hash", f.lastTrustedHeader.Hash())
  687. return current, unapprovedHashes
  688. }
  689. for current.Hash() == commonAncestor.Hash() {
  690. if f.isTrustedHash(current.Hash()) {
  691. break
  692. }
  693. unapprovedHashes = append(unapprovedHashes, current.Hash())
  694. current = f.chain.GetHeader(current.ParentHash, current.Number.Uint64()-1)
  695. }
  696. return current, unapprovedHashes
  697. }
  698. func (f *lightFetcher) setLastTrustedHeader(h *types.Header) {
  699. f.lock.Lock()
  700. defer f.lock.Unlock()
  701. f.lastTrustedHeader = h
  702. }
  703. // checkKnownNode checks if a block tree node is known (downloaded and validated)
  704. // If it was not known previously but found in the database, sets its known flag
  705. func (f *lightFetcher) checkKnownNode(p *serverPeer, n *fetcherTreeNode) bool {
  706. if n.known {
  707. return true
  708. }
  709. td := f.chain.GetTd(n.hash, n.number)
  710. if td == nil {
  711. return false
  712. }
  713. header := f.chain.GetHeader(n.hash, n.number)
  714. // check the availability of both header and td because reads are not protected by chain db mutex
  715. // Note: returning false is always safe here
  716. if header == nil {
  717. return false
  718. }
  719. fp := f.peers[p]
  720. if fp == nil {
  721. p.Log().Debug("Unknown peer to check known nodes")
  722. return false
  723. }
  724. if !f.checkAnnouncedHeaders(fp, []*types.Header{header}, []*big.Int{td}) {
  725. p.Log().Debug("Inconsistent announcement")
  726. go f.handler.removePeer(p.id)
  727. }
  728. if fp.confirmedTd != nil {
  729. f.updateMaxConfirmedTd(fp.confirmedTd)
  730. }
  731. return n.known
  732. }
  733. // deleteNode deletes a node and its child subtrees from a peer's block tree
  734. func (fp *fetcherPeerInfo) deleteNode(n *fetcherTreeNode) {
  735. if n.parent != nil {
  736. for i, nn := range n.parent.children {
  737. if nn == n {
  738. n.parent.children = append(n.parent.children[:i], n.parent.children[i+1:]...)
  739. break
  740. }
  741. }
  742. }
  743. for {
  744. if n.td != nil {
  745. delete(fp.nodeByHash, n.hash)
  746. }
  747. fp.nodeCnt--
  748. if len(n.children) == 0 {
  749. return
  750. }
  751. for i, nn := range n.children {
  752. if i == 0 {
  753. n = nn
  754. } else {
  755. fp.deleteNode(nn)
  756. }
  757. }
  758. }
  759. }
  760. // updateStatsEntry items form a linked list that is expanded with a new item every time a new head with a higher Td
  761. // than the previous one has been downloaded and validated. The list contains a series of maximum confirmed Td values
  762. // and the time these values have been confirmed, both increasing monotonically. A maximum confirmed Td is calculated
  763. // both globally for all peers and also for each individual peer (meaning that the given peer has announced the head
  764. // and it has also been downloaded from any peer, either before or after the given announcement).
  765. // The linked list has a global tail where new confirmed Td entries are added and a separate head for each peer,
  766. // pointing to the next Td entry that is higher than the peer's max confirmed Td (nil if it has already confirmed
  767. // the current global head).
  768. type updateStatsEntry struct {
  769. time mclock.AbsTime
  770. td *big.Int
  771. next *updateStatsEntry
  772. }
  773. // updateMaxConfirmedTd updates the block delay statistics of active peers. Whenever a new highest Td is confirmed,
  774. // adds it to the end of a linked list together with the time it has been confirmed. Then checks which peers have
  775. // already confirmed a head with the same or higher Td (which counts as zero block delay) and updates their statistics.
  776. // Those who have not confirmed such a head by now will be updated by a subsequent checkUpdateStats call with a
  777. // positive block delay value.
  778. func (f *lightFetcher) updateMaxConfirmedTd(td *big.Int) {
  779. if f.maxConfirmedTd == nil || td.Cmp(f.maxConfirmedTd) > 0 {
  780. f.maxConfirmedTd = td
  781. newEntry := &updateStatsEntry{
  782. time: mclock.Now(),
  783. td: td,
  784. }
  785. if f.lastUpdateStats != nil {
  786. f.lastUpdateStats.next = newEntry
  787. }
  788. f.lastUpdateStats = newEntry
  789. for p := range f.peers {
  790. f.checkUpdateStats(p, newEntry)
  791. }
  792. }
  793. }
  794. // checkUpdateStats checks those peers who have not confirmed a certain highest Td (or a larger one) by the time it
  795. // has been confirmed by another peer. If they have confirmed such a head by now, their stats are updated with the
  796. // block delay which is (this peer's confirmation time)-(first confirmation time). After blockDelayTimeout has passed,
  797. // the stats are updated with blockDelayTimeout value. In either case, the confirmed or timed out updateStatsEntry
  798. // items are removed from the head of the linked list.
  799. // If a new entry has been added to the global tail, it is passed as a parameter here even though this function
  800. // assumes that it has already been added, so that if the peer's list is empty (all heads confirmed, head is nil),
  801. // it can set the new head to newEntry.
  802. func (f *lightFetcher) checkUpdateStats(p *serverPeer, newEntry *updateStatsEntry) {
  803. now := mclock.Now()
  804. fp := f.peers[p]
  805. if fp == nil {
  806. p.Log().Debug("Unknown peer to check update stats")
  807. return
  808. }
  809. if newEntry != nil && fp.firstUpdateStats == nil {
  810. fp.firstUpdateStats = newEntry
  811. }
  812. for fp.firstUpdateStats != nil && fp.firstUpdateStats.time <= now-mclock.AbsTime(blockDelayTimeout) {
  813. f.handler.backend.serverPool.adjustBlockDelay(p.poolEntry, blockDelayTimeout)
  814. fp.firstUpdateStats = fp.firstUpdateStats.next
  815. }
  816. if fp.confirmedTd != nil {
  817. for fp.firstUpdateStats != nil && fp.firstUpdateStats.td.Cmp(fp.confirmedTd) <= 0 {
  818. f.handler.backend.serverPool.adjustBlockDelay(p.poolEntry, time.Duration(now-fp.firstUpdateStats.time))
  819. fp.firstUpdateStats = fp.firstUpdateStats.next
  820. }
  821. }
  822. }