fetcher.go 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753
  1. // Copyright 2016 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. // Package les implements the Light Ethereum Subprotocol.
  17. package les
  18. import (
  19. "math/big"
  20. "sync"
  21. "time"
  22. "github.com/ethereum/go-ethereum/common"
  23. "github.com/ethereum/go-ethereum/common/mclock"
  24. "github.com/ethereum/go-ethereum/consensus"
  25. "github.com/ethereum/go-ethereum/core"
  26. "github.com/ethereum/go-ethereum/core/types"
  27. "github.com/ethereum/go-ethereum/light"
  28. "github.com/ethereum/go-ethereum/log"
  29. )
  30. const (
  31. blockDelayTimeout = time.Second * 10 // timeout for a peer to announce a head that has already been confirmed by others
  32. maxNodeCount = 20 // maximum number of fetcherTreeNode entries remembered for each peer
  33. )
  34. // lightFetcher
  35. type lightFetcher struct {
  36. pm *ProtocolManager
  37. odr *LesOdr
  38. chain *light.LightChain
  39. maxConfirmedTd *big.Int
  40. peers map[*peer]*fetcherPeerInfo
  41. lastUpdateStats *updateStatsEntry
  42. lock sync.Mutex // qwerqwerqwe
  43. deliverChn chan fetchResponse
  44. reqMu sync.RWMutex
  45. requested map[uint64]fetchRequest
  46. timeoutChn chan uint64
  47. requestChn chan bool // true if initiated from outside
  48. syncing bool
  49. syncDone chan *peer
  50. }
  51. // fetcherPeerInfo holds fetcher-specific information about each active peer
  52. type fetcherPeerInfo struct {
  53. root, lastAnnounced *fetcherTreeNode
  54. nodeCnt int
  55. confirmedTd *big.Int
  56. bestConfirmed *fetcherTreeNode
  57. nodeByHash map[common.Hash]*fetcherTreeNode
  58. firstUpdateStats *updateStatsEntry
  59. }
  60. // fetcherTreeNode is a node of a tree that holds information about blocks recently
  61. // announced and confirmed by a certain peer. Each new announce message from a peer
  62. // adds nodes to the tree, based on the previous announced head and the reorg depth.
  63. // There are three possible states for a tree node:
  64. // - announced: not downloaded (known) yet, but we know its head, number and td
  65. // - intermediate: not known, hash and td are empty, they are filled out when it becomes known
  66. // - known: both announced by this peer and downloaded (from any peer).
  67. // This structure makes it possible to always know which peer has a certain block,
  68. // which is necessary for selecting a suitable peer for ODR requests and also for
  69. // canonizing new heads. It also helps to always download the minimum necessary
  70. // amount of headers with a single request.
  71. type fetcherTreeNode struct {
  72. hash common.Hash
  73. number uint64
  74. td *big.Int
  75. known, requested bool
  76. parent *fetcherTreeNode
  77. children []*fetcherTreeNode
  78. }
  79. // fetchRequest represents a header download request
  80. type fetchRequest struct {
  81. hash common.Hash
  82. amount uint64
  83. peer *peer
  84. sent mclock.AbsTime
  85. timeout bool
  86. }
  87. // fetchResponse represents a header download response
  88. type fetchResponse struct {
  89. reqID uint64
  90. headers []*types.Header
  91. peer *peer
  92. }
  93. // newLightFetcher creates a new light fetcher
  94. func newLightFetcher(pm *ProtocolManager) *lightFetcher {
  95. f := &lightFetcher{
  96. pm: pm,
  97. chain: pm.blockchain.(*light.LightChain),
  98. odr: pm.odr,
  99. peers: make(map[*peer]*fetcherPeerInfo),
  100. deliverChn: make(chan fetchResponse, 100),
  101. requested: make(map[uint64]fetchRequest),
  102. timeoutChn: make(chan uint64),
  103. requestChn: make(chan bool, 100),
  104. syncDone: make(chan *peer),
  105. maxConfirmedTd: big.NewInt(0),
  106. }
  107. go f.syncLoop()
  108. return f
  109. }
  110. // syncLoop is the main event loop of the light fetcher
  111. func (f *lightFetcher) syncLoop() {
  112. f.pm.wg.Add(1)
  113. defer f.pm.wg.Done()
  114. requesting := false
  115. for {
  116. select {
  117. case <-f.pm.quitSync:
  118. return
  119. // when a new announce is received, request loop keeps running until
  120. // no further requests are necessary or possible
  121. case newAnnounce := <-f.requestChn:
  122. f.lock.Lock()
  123. s := requesting
  124. requesting = false
  125. var (
  126. rq *distReq
  127. reqID uint64
  128. )
  129. if !f.syncing && !(newAnnounce && s) {
  130. rq, reqID = f.nextRequest()
  131. }
  132. syncing := f.syncing
  133. f.lock.Unlock()
  134. if rq != nil {
  135. requesting = true
  136. _, ok := <-f.pm.reqDist.queue(rq)
  137. if !ok {
  138. f.requestChn <- false
  139. }
  140. if !syncing {
  141. go func() {
  142. time.Sleep(softRequestTimeout)
  143. f.reqMu.Lock()
  144. req, ok := f.requested[reqID]
  145. if ok {
  146. req.timeout = true
  147. f.requested[reqID] = req
  148. }
  149. f.reqMu.Unlock()
  150. // keep starting new requests while possible
  151. f.requestChn <- false
  152. }()
  153. }
  154. }
  155. case reqID := <-f.timeoutChn:
  156. f.reqMu.Lock()
  157. req, ok := f.requested[reqID]
  158. if ok {
  159. delete(f.requested, reqID)
  160. }
  161. f.reqMu.Unlock()
  162. if ok {
  163. f.pm.serverPool.adjustResponseTime(req.peer.poolEntry, time.Duration(mclock.Now()-req.sent), true)
  164. req.peer.Log().Debug("Fetching data timed out hard")
  165. go f.pm.removePeer(req.peer.id)
  166. }
  167. case resp := <-f.deliverChn:
  168. f.reqMu.Lock()
  169. req, ok := f.requested[resp.reqID]
  170. if ok && req.peer != resp.peer {
  171. ok = false
  172. }
  173. if ok {
  174. delete(f.requested, resp.reqID)
  175. }
  176. f.reqMu.Unlock()
  177. if ok {
  178. f.pm.serverPool.adjustResponseTime(req.peer.poolEntry, time.Duration(mclock.Now()-req.sent), req.timeout)
  179. }
  180. f.lock.Lock()
  181. if !ok || !(f.syncing || f.processResponse(req, resp)) {
  182. resp.peer.Log().Debug("Failed processing response")
  183. go f.pm.removePeer(resp.peer.id)
  184. }
  185. f.lock.Unlock()
  186. case p := <-f.syncDone:
  187. f.lock.Lock()
  188. p.Log().Debug("Done synchronising with peer")
  189. f.checkSyncedHeaders(p)
  190. f.syncing = false
  191. f.lock.Unlock()
  192. }
  193. }
  194. }
  195. // addPeer adds a new peer to the fetcher's peer set
  196. func (f *lightFetcher) addPeer(p *peer) {
  197. p.lock.Lock()
  198. p.hasBlock = func(hash common.Hash, number uint64) bool {
  199. return f.peerHasBlock(p, hash, number)
  200. }
  201. p.lock.Unlock()
  202. f.lock.Lock()
  203. defer f.lock.Unlock()
  204. f.peers[p] = &fetcherPeerInfo{nodeByHash: make(map[common.Hash]*fetcherTreeNode)}
  205. }
  206. // removePeer removes a new peer from the fetcher's peer set
  207. func (f *lightFetcher) removePeer(p *peer) {
  208. p.lock.Lock()
  209. p.hasBlock = nil
  210. p.lock.Unlock()
  211. f.lock.Lock()
  212. defer f.lock.Unlock()
  213. // check for potential timed out block delay statistics
  214. f.checkUpdateStats(p, nil)
  215. delete(f.peers, p)
  216. }
  217. // announce processes a new announcement message received from a peer, adding new
  218. // nodes to the peer's block tree and removing old nodes if necessary
  219. func (f *lightFetcher) announce(p *peer, head *announceData) {
  220. f.lock.Lock()
  221. defer f.lock.Unlock()
  222. p.Log().Debug("Received new announcement", "number", head.Number, "hash", head.Hash, "reorg", head.ReorgDepth)
  223. fp := f.peers[p]
  224. if fp == nil {
  225. p.Log().Debug("Announcement from unknown peer")
  226. return
  227. }
  228. if fp.lastAnnounced != nil && head.Td.Cmp(fp.lastAnnounced.td) <= 0 {
  229. // announced tds should be strictly monotonic
  230. p.Log().Debug("Received non-monotonic td", "current", head.Td, "previous", fp.lastAnnounced.td)
  231. go f.pm.removePeer(p.id)
  232. return
  233. }
  234. n := fp.lastAnnounced
  235. for i := uint64(0); i < head.ReorgDepth; i++ {
  236. if n == nil {
  237. break
  238. }
  239. n = n.parent
  240. }
  241. if n != nil {
  242. // n is now the reorg common ancestor, add a new branch of nodes
  243. // check if the node count is too high to add new nodes
  244. locked := false
  245. for uint64(fp.nodeCnt)+head.Number-n.number > maxNodeCount && fp.root != nil {
  246. if !locked {
  247. f.chain.LockChain()
  248. defer f.chain.UnlockChain()
  249. locked = true
  250. }
  251. // if one of root's children is canonical, keep it, delete other branches and root itself
  252. var newRoot *fetcherTreeNode
  253. for i, nn := range fp.root.children {
  254. if core.GetCanonicalHash(f.pm.chainDb, nn.number) == nn.hash {
  255. fp.root.children = append(fp.root.children[:i], fp.root.children[i+1:]...)
  256. nn.parent = nil
  257. newRoot = nn
  258. break
  259. }
  260. }
  261. fp.deleteNode(fp.root)
  262. if n == fp.root {
  263. n = newRoot
  264. }
  265. fp.root = newRoot
  266. if newRoot == nil || !f.checkKnownNode(p, newRoot) {
  267. fp.bestConfirmed = nil
  268. fp.confirmedTd = nil
  269. }
  270. if n == nil {
  271. break
  272. }
  273. }
  274. if n != nil {
  275. for n.number < head.Number {
  276. nn := &fetcherTreeNode{number: n.number + 1, parent: n}
  277. n.children = append(n.children, nn)
  278. n = nn
  279. fp.nodeCnt++
  280. }
  281. n.hash = head.Hash
  282. n.td = head.Td
  283. fp.nodeByHash[n.hash] = n
  284. }
  285. }
  286. if n == nil {
  287. // could not find reorg common ancestor or had to delete entire tree, a new root and a resync is needed
  288. if fp.root != nil {
  289. fp.deleteNode(fp.root)
  290. }
  291. n = &fetcherTreeNode{hash: head.Hash, number: head.Number, td: head.Td}
  292. fp.root = n
  293. fp.nodeCnt++
  294. fp.nodeByHash[n.hash] = n
  295. fp.bestConfirmed = nil
  296. fp.confirmedTd = nil
  297. }
  298. f.checkKnownNode(p, n)
  299. p.lock.Lock()
  300. p.headInfo = head
  301. fp.lastAnnounced = n
  302. p.lock.Unlock()
  303. f.checkUpdateStats(p, nil)
  304. f.requestChn <- true
  305. }
  306. // peerHasBlock returns true if we can assume the peer knows the given block
  307. // based on its announcements
  308. func (f *lightFetcher) peerHasBlock(p *peer, hash common.Hash, number uint64) bool {
  309. f.lock.Lock()
  310. defer f.lock.Unlock()
  311. if f.syncing {
  312. // always return true when syncing
  313. // false positives are acceptable, a more sophisticated condition can be implemented later
  314. return true
  315. }
  316. fp := f.peers[p]
  317. if fp == nil || fp.root == nil {
  318. return false
  319. }
  320. if number >= fp.root.number {
  321. // it is recent enough that if it is known, is should be in the peer's block tree
  322. return fp.nodeByHash[hash] != nil
  323. }
  324. f.chain.LockChain()
  325. defer f.chain.UnlockChain()
  326. // if it's older than the peer's block tree root but it's in the same canonical chain
  327. // as the root, we can still be sure the peer knows it
  328. //
  329. // when syncing, just check if it is part of the known chain, there is nothing better we
  330. // can do since we do not know the most recent block hash yet
  331. return core.GetCanonicalHash(f.pm.chainDb, fp.root.number) == fp.root.hash && core.GetCanonicalHash(f.pm.chainDb, number) == hash
  332. }
  333. // requestAmount calculates the amount of headers to be downloaded starting
  334. // from a certain head backwards
  335. func (f *lightFetcher) requestAmount(p *peer, n *fetcherTreeNode) uint64 {
  336. amount := uint64(0)
  337. nn := n
  338. for nn != nil && !f.checkKnownNode(p, nn) {
  339. nn = nn.parent
  340. amount++
  341. }
  342. if nn == nil {
  343. amount = n.number
  344. }
  345. return amount
  346. }
  347. // requestedID tells if a certain reqID has been requested by the fetcher
  348. func (f *lightFetcher) requestedID(reqID uint64) bool {
  349. f.reqMu.RLock()
  350. _, ok := f.requested[reqID]
  351. f.reqMu.RUnlock()
  352. return ok
  353. }
  354. // nextRequest selects the peer and announced head to be requested next, amount
  355. // to be downloaded starting from the head backwards is also returned
  356. func (f *lightFetcher) nextRequest() (*distReq, uint64) {
  357. var (
  358. bestHash common.Hash
  359. bestAmount uint64
  360. )
  361. bestTd := f.maxConfirmedTd
  362. bestSyncing := false
  363. for p, fp := range f.peers {
  364. for hash, n := range fp.nodeByHash {
  365. if !f.checkKnownNode(p, n) && !n.requested && (bestTd == nil || n.td.Cmp(bestTd) >= 0) {
  366. amount := f.requestAmount(p, n)
  367. if bestTd == nil || n.td.Cmp(bestTd) > 0 || amount < bestAmount {
  368. bestHash = hash
  369. bestAmount = amount
  370. bestTd = n.td
  371. bestSyncing = fp.bestConfirmed == nil || fp.root == nil || !f.checkKnownNode(p, fp.root)
  372. }
  373. }
  374. }
  375. }
  376. if bestTd == f.maxConfirmedTd {
  377. return nil, 0
  378. }
  379. f.syncing = bestSyncing
  380. var rq *distReq
  381. reqID := getNextReqID()
  382. if f.syncing {
  383. rq = &distReq{
  384. getCost: func(dp distPeer) uint64 {
  385. return 0
  386. },
  387. canSend: func(dp distPeer) bool {
  388. p := dp.(*peer)
  389. fp := f.peers[p]
  390. return fp != nil && fp.nodeByHash[bestHash] != nil
  391. },
  392. request: func(dp distPeer) func() {
  393. go func() {
  394. p := dp.(*peer)
  395. p.Log().Debug("Synchronisation started")
  396. f.pm.synchronise(p)
  397. f.syncDone <- p
  398. }()
  399. return nil
  400. },
  401. }
  402. } else {
  403. rq = &distReq{
  404. getCost: func(dp distPeer) uint64 {
  405. p := dp.(*peer)
  406. return p.GetRequestCost(GetBlockHeadersMsg, int(bestAmount))
  407. },
  408. canSend: func(dp distPeer) bool {
  409. p := dp.(*peer)
  410. f.lock.Lock()
  411. defer f.lock.Unlock()
  412. fp := f.peers[p]
  413. if fp == nil {
  414. return false
  415. }
  416. n := fp.nodeByHash[bestHash]
  417. return n != nil && !n.requested
  418. },
  419. request: func(dp distPeer) func() {
  420. p := dp.(*peer)
  421. f.lock.Lock()
  422. fp := f.peers[p]
  423. if fp != nil {
  424. n := fp.nodeByHash[bestHash]
  425. if n != nil {
  426. n.requested = true
  427. }
  428. }
  429. f.lock.Unlock()
  430. cost := p.GetRequestCost(GetBlockHeadersMsg, int(bestAmount))
  431. p.fcServer.QueueRequest(reqID, cost)
  432. f.reqMu.Lock()
  433. f.requested[reqID] = fetchRequest{hash: bestHash, amount: bestAmount, peer: p, sent: mclock.Now()}
  434. f.reqMu.Unlock()
  435. go func() {
  436. time.Sleep(hardRequestTimeout)
  437. f.timeoutChn <- reqID
  438. }()
  439. return func() { p.RequestHeadersByHash(reqID, cost, bestHash, int(bestAmount), 0, true) }
  440. },
  441. }
  442. }
  443. return rq, reqID
  444. }
  445. // deliverHeaders delivers header download request responses for processing
  446. func (f *lightFetcher) deliverHeaders(peer *peer, reqID uint64, headers []*types.Header) {
  447. f.deliverChn <- fetchResponse{reqID: reqID, headers: headers, peer: peer}
  448. }
  449. // processResponse processes header download request responses, returns true if successful
  450. func (f *lightFetcher) processResponse(req fetchRequest, resp fetchResponse) bool {
  451. if uint64(len(resp.headers)) != req.amount || resp.headers[0].Hash() != req.hash {
  452. req.peer.Log().Debug("Response content mismatch", "requested", len(resp.headers), "reqfrom", resp.headers[0], "delivered", req.amount, "delfrom", req.hash)
  453. return false
  454. }
  455. headers := make([]*types.Header, req.amount)
  456. for i, header := range resp.headers {
  457. headers[int(req.amount)-1-i] = header
  458. }
  459. if _, err := f.chain.InsertHeaderChain(headers, 1); err != nil {
  460. if err == consensus.ErrFutureBlock {
  461. return true
  462. }
  463. log.Debug("Failed to insert header chain", "err", err)
  464. return false
  465. }
  466. tds := make([]*big.Int, len(headers))
  467. for i, header := range headers {
  468. td := f.chain.GetTd(header.Hash(), header.Number.Uint64())
  469. if td == nil {
  470. log.Debug("Total difficulty not found for header", "index", i+1, "number", header.Number, "hash", header.Hash())
  471. return false
  472. }
  473. tds[i] = td
  474. }
  475. f.newHeaders(headers, tds)
  476. return true
  477. }
  478. // newHeaders updates the block trees of all active peers according to a newly
  479. // downloaded and validated batch or headers
  480. func (f *lightFetcher) newHeaders(headers []*types.Header, tds []*big.Int) {
  481. var maxTd *big.Int
  482. for p, fp := range f.peers {
  483. if !f.checkAnnouncedHeaders(fp, headers, tds) {
  484. p.Log().Debug("Inconsistent announcement")
  485. go f.pm.removePeer(p.id)
  486. }
  487. if fp.confirmedTd != nil && (maxTd == nil || maxTd.Cmp(fp.confirmedTd) > 0) {
  488. maxTd = fp.confirmedTd
  489. }
  490. }
  491. if maxTd != nil {
  492. f.updateMaxConfirmedTd(maxTd)
  493. }
  494. }
  495. // checkAnnouncedHeaders updates peer's block tree if necessary after validating
  496. // a batch of headers. It searches for the latest header in the batch that has a
  497. // matching tree node (if any), and if it has not been marked as known already,
  498. // sets it and its parents to known (even those which are older than the currently
  499. // validated ones). Return value shows if all hashes, numbers and Tds matched
  500. // correctly to the announced values (otherwise the peer should be dropped).
  501. func (f *lightFetcher) checkAnnouncedHeaders(fp *fetcherPeerInfo, headers []*types.Header, tds []*big.Int) bool {
  502. var (
  503. n *fetcherTreeNode
  504. header *types.Header
  505. td *big.Int
  506. )
  507. for i := len(headers) - 1; ; i-- {
  508. if i < 0 {
  509. if n == nil {
  510. // no more headers and nothing to match
  511. return true
  512. }
  513. // we ran out of recently delivered headers but have not reached a node known by this peer yet, continue matching
  514. td = f.chain.GetTd(header.ParentHash, header.Number.Uint64()-1)
  515. header = f.chain.GetHeader(header.ParentHash, header.Number.Uint64()-1)
  516. } else {
  517. header = headers[i]
  518. td = tds[i]
  519. }
  520. hash := header.Hash()
  521. number := header.Number.Uint64()
  522. if n == nil {
  523. n = fp.nodeByHash[hash]
  524. }
  525. if n != nil {
  526. if n.td == nil {
  527. // node was unannounced
  528. if nn := fp.nodeByHash[hash]; nn != nil {
  529. // if there was already a node with the same hash, continue there and drop this one
  530. nn.children = append(nn.children, n.children...)
  531. n.children = nil
  532. fp.deleteNode(n)
  533. n = nn
  534. } else {
  535. n.hash = hash
  536. n.td = td
  537. fp.nodeByHash[hash] = n
  538. }
  539. }
  540. // check if it matches the header
  541. if n.hash != hash || n.number != number || n.td.Cmp(td) != 0 {
  542. // peer has previously made an invalid announcement
  543. return false
  544. }
  545. if n.known {
  546. // we reached a known node that matched our expectations, return with success
  547. return true
  548. }
  549. n.known = true
  550. if fp.confirmedTd == nil || td.Cmp(fp.confirmedTd) > 0 {
  551. fp.confirmedTd = td
  552. fp.bestConfirmed = n
  553. }
  554. n = n.parent
  555. if n == nil {
  556. return true
  557. }
  558. }
  559. }
  560. }
  561. // checkSyncedHeaders updates peer's block tree after synchronisation by marking
  562. // downloaded headers as known. If none of the announced headers are found after
  563. // syncing, the peer is dropped.
  564. func (f *lightFetcher) checkSyncedHeaders(p *peer) {
  565. fp := f.peers[p]
  566. if fp == nil {
  567. p.Log().Debug("Unknown peer to check sync headers")
  568. return
  569. }
  570. n := fp.lastAnnounced
  571. var td *big.Int
  572. for n != nil {
  573. if td = f.chain.GetTd(n.hash, n.number); td != nil {
  574. break
  575. }
  576. n = n.parent
  577. }
  578. // now n is the latest downloaded header after syncing
  579. if n == nil {
  580. p.Log().Debug("Synchronisation failed")
  581. go f.pm.removePeer(p.id)
  582. } else {
  583. header := f.chain.GetHeader(n.hash, n.number)
  584. f.newHeaders([]*types.Header{header}, []*big.Int{td})
  585. }
  586. }
  587. // checkKnownNode checks if a block tree node is known (downloaded and validated)
  588. // If it was not known previously but found in the database, sets its known flag
  589. func (f *lightFetcher) checkKnownNode(p *peer, n *fetcherTreeNode) bool {
  590. if n.known {
  591. return true
  592. }
  593. td := f.chain.GetTd(n.hash, n.number)
  594. if td == nil {
  595. return false
  596. }
  597. fp := f.peers[p]
  598. if fp == nil {
  599. p.Log().Debug("Unknown peer to check known nodes")
  600. return false
  601. }
  602. header := f.chain.GetHeader(n.hash, n.number)
  603. if !f.checkAnnouncedHeaders(fp, []*types.Header{header}, []*big.Int{td}) {
  604. p.Log().Debug("Inconsistent announcement")
  605. go f.pm.removePeer(p.id)
  606. }
  607. if fp.confirmedTd != nil {
  608. f.updateMaxConfirmedTd(fp.confirmedTd)
  609. }
  610. return n.known
  611. }
  612. // deleteNode deletes a node and its child subtrees from a peer's block tree
  613. func (fp *fetcherPeerInfo) deleteNode(n *fetcherTreeNode) {
  614. if n.parent != nil {
  615. for i, nn := range n.parent.children {
  616. if nn == n {
  617. n.parent.children = append(n.parent.children[:i], n.parent.children[i+1:]...)
  618. break
  619. }
  620. }
  621. }
  622. for {
  623. if n.td != nil {
  624. delete(fp.nodeByHash, n.hash)
  625. }
  626. fp.nodeCnt--
  627. if len(n.children) == 0 {
  628. return
  629. }
  630. for i, nn := range n.children {
  631. if i == 0 {
  632. n = nn
  633. } else {
  634. fp.deleteNode(nn)
  635. }
  636. }
  637. }
  638. }
  639. // updateStatsEntry items form a linked list that is expanded with a new item every time a new head with a higher Td
  640. // than the previous one has been downloaded and validated. The list contains a series of maximum confirmed Td values
  641. // and the time these values have been confirmed, both increasing monotonically. A maximum confirmed Td is calculated
  642. // both globally for all peers and also for each individual peer (meaning that the given peer has announced the head
  643. // and it has also been downloaded from any peer, either before or after the given announcement).
  644. // The linked list has a global tail where new confirmed Td entries are added and a separate head for each peer,
  645. // pointing to the next Td entry that is higher than the peer's max confirmed Td (nil if it has already confirmed
  646. // the current global head).
  647. type updateStatsEntry struct {
  648. time mclock.AbsTime
  649. td *big.Int
  650. next *updateStatsEntry
  651. }
  652. // updateMaxConfirmedTd updates the block delay statistics of active peers. Whenever a new highest Td is confirmed,
  653. // adds it to the end of a linked list together with the time it has been confirmed. Then checks which peers have
  654. // already confirmed a head with the same or higher Td (which counts as zero block delay) and updates their statistics.
  655. // Those who have not confirmed such a head by now will be updated by a subsequent checkUpdateStats call with a
  656. // positive block delay value.
  657. func (f *lightFetcher) updateMaxConfirmedTd(td *big.Int) {
  658. if f.maxConfirmedTd == nil || td.Cmp(f.maxConfirmedTd) > 0 {
  659. f.maxConfirmedTd = td
  660. newEntry := &updateStatsEntry{
  661. time: mclock.Now(),
  662. td: td,
  663. }
  664. if f.lastUpdateStats != nil {
  665. f.lastUpdateStats.next = newEntry
  666. }
  667. f.lastUpdateStats = newEntry
  668. for p := range f.peers {
  669. f.checkUpdateStats(p, newEntry)
  670. }
  671. }
  672. }
  673. // checkUpdateStats checks those peers who have not confirmed a certain highest Td (or a larger one) by the time it
  674. // has been confirmed by another peer. If they have confirmed such a head by now, their stats are updated with the
  675. // block delay which is (this peer's confirmation time)-(first confirmation time). After blockDelayTimeout has passed,
  676. // the stats are updated with blockDelayTimeout value. In either case, the confirmed or timed out updateStatsEntry
  677. // items are removed from the head of the linked list.
  678. // If a new entry has been added to the global tail, it is passed as a parameter here even though this function
  679. // assumes that it has already been added, so that if the peer's list is empty (all heads confirmed, head is nil),
  680. // it can set the new head to newEntry.
  681. func (f *lightFetcher) checkUpdateStats(p *peer, newEntry *updateStatsEntry) {
  682. now := mclock.Now()
  683. fp := f.peers[p]
  684. if fp == nil {
  685. p.Log().Debug("Unknown peer to check update stats")
  686. return
  687. }
  688. if newEntry != nil && fp.firstUpdateStats == nil {
  689. fp.firstUpdateStats = newEntry
  690. }
  691. for fp.firstUpdateStats != nil && fp.firstUpdateStats.time <= now-mclock.AbsTime(blockDelayTimeout) {
  692. f.pm.serverPool.adjustBlockDelay(p.poolEntry, blockDelayTimeout)
  693. fp.firstUpdateStats = fp.firstUpdateStats.next
  694. }
  695. if fp.confirmedTd != nil {
  696. for fp.firstUpdateStats != nil && fp.firstUpdateStats.td.Cmp(fp.confirmedTd) <= 0 {
  697. f.pm.serverPool.adjustBlockDelay(p.poolEntry, time.Duration(now-fp.firstUpdateStats.time))
  698. fp.firstUpdateStats = fp.firstUpdateStats.next
  699. }
  700. }
  701. }