fetcher.go 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719
  1. // Copyright 2016 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. // Package les implements the Light Ethereum Subprotocol.
  17. package les
  18. import (
  19. "math/big"
  20. "sync"
  21. "time"
  22. "github.com/ethereum/go-ethereum/common"
  23. "github.com/ethereum/go-ethereum/common/mclock"
  24. "github.com/ethereum/go-ethereum/core"
  25. "github.com/ethereum/go-ethereum/core/types"
  26. "github.com/ethereum/go-ethereum/light"
  27. "github.com/ethereum/go-ethereum/logger"
  28. "github.com/ethereum/go-ethereum/logger/glog"
  29. )
  30. const (
  31. blockDelayTimeout = time.Second * 10 // timeout for a peer to announce a head that has already been confirmed by others
  32. maxNodeCount = 20 // maximum number of fetcherTreeNode entries remembered for each peer
  33. )
  34. // lightFetcher
  35. type lightFetcher struct {
  36. pm *ProtocolManager
  37. odr *LesOdr
  38. chain *light.LightChain
  39. maxConfirmedTd *big.Int
  40. peers map[*peer]*fetcherPeerInfo
  41. lastUpdateStats *updateStatsEntry
  42. lock sync.Mutex // qwerqwerqwe
  43. deliverChn chan fetchResponse
  44. reqMu sync.RWMutex
  45. requested map[uint64]fetchRequest
  46. timeoutChn chan uint64
  47. requestChn chan bool // true if initiated from outside
  48. syncing bool
  49. syncDone chan *peer
  50. }
  51. // fetcherPeerInfo holds fetcher-specific information about each active peer
  52. type fetcherPeerInfo struct {
  53. root, lastAnnounced *fetcherTreeNode
  54. nodeCnt int
  55. confirmedTd *big.Int
  56. bestConfirmed *fetcherTreeNode
  57. nodeByHash map[common.Hash]*fetcherTreeNode
  58. firstUpdateStats *updateStatsEntry
  59. }
  60. // fetcherTreeNode is a node of a tree that holds information about blocks recently
  61. // announced and confirmed by a certain peer. Each new announce message from a peer
  62. // adds nodes to the tree, based on the previous announced head and the reorg depth.
  63. // There are three possible states for a tree node:
  64. // - announced: not downloaded (known) yet, but we know its head, number and td
  65. // - intermediate: not known, hash and td are empty, they are filled out when it becomes known
  66. // - known: both announced by this peer and downloaded (from any peer).
  67. // This structure makes it possible to always know which peer has a certain block,
  68. // which is necessary for selecting a suitable peer for ODR requests and also for
  69. // canonizing new heads. It also helps to always download the minimum necessary
  70. // amount of headers with a single request.
  71. type fetcherTreeNode struct {
  72. hash common.Hash
  73. number uint64
  74. td *big.Int
  75. known, requested bool
  76. parent *fetcherTreeNode
  77. children []*fetcherTreeNode
  78. }
  79. // fetchRequest represents a header download request
  80. type fetchRequest struct {
  81. hash common.Hash
  82. amount uint64
  83. peer *peer
  84. sent mclock.AbsTime
  85. timeout bool
  86. }
  87. // fetchResponse represents a header download response
  88. type fetchResponse struct {
  89. reqID uint64
  90. headers []*types.Header
  91. peer *peer
  92. }
  93. // newLightFetcher creates a new light fetcher
  94. func newLightFetcher(pm *ProtocolManager) *lightFetcher {
  95. f := &lightFetcher{
  96. pm: pm,
  97. chain: pm.blockchain.(*light.LightChain),
  98. odr: pm.odr,
  99. peers: make(map[*peer]*fetcherPeerInfo),
  100. deliverChn: make(chan fetchResponse, 100),
  101. requested: make(map[uint64]fetchRequest),
  102. timeoutChn: make(chan uint64),
  103. requestChn: make(chan bool, 100),
  104. syncDone: make(chan *peer),
  105. maxConfirmedTd: big.NewInt(0),
  106. }
  107. go f.syncLoop()
  108. return f
  109. }
  110. // syncLoop is the main event loop of the light fetcher
  111. func (f *lightFetcher) syncLoop() {
  112. f.pm.wg.Add(1)
  113. defer f.pm.wg.Done()
  114. requesting := false
  115. for {
  116. select {
  117. case <-f.pm.quitSync:
  118. return
  119. // when a new announce is received, request loop keeps running until
  120. // no further requests are necessary or possible
  121. case newAnnounce := <-f.requestChn:
  122. f.lock.Lock()
  123. s := requesting
  124. requesting = false
  125. if !f.syncing && !(newAnnounce && s) {
  126. reqID := getNextReqID()
  127. if peer, node, amount, retry := f.nextRequest(reqID); node != nil {
  128. requesting = true
  129. if reqID, ok := f.request(peer, reqID, node, amount); ok {
  130. go func() {
  131. time.Sleep(softRequestTimeout)
  132. f.reqMu.Lock()
  133. req, ok := f.requested[reqID]
  134. if ok {
  135. req.timeout = true
  136. f.requested[reqID] = req
  137. }
  138. f.reqMu.Unlock()
  139. // keep starting new requests while possible
  140. f.requestChn <- false
  141. }()
  142. }
  143. } else {
  144. if retry {
  145. requesting = true
  146. go func() {
  147. time.Sleep(time.Millisecond * 100)
  148. f.requestChn <- false
  149. }()
  150. }
  151. }
  152. }
  153. f.lock.Unlock()
  154. case reqID := <-f.timeoutChn:
  155. f.reqMu.Lock()
  156. req, ok := f.requested[reqID]
  157. if ok {
  158. delete(f.requested, reqID)
  159. }
  160. f.reqMu.Unlock()
  161. if ok {
  162. f.pm.serverPool.adjustResponseTime(req.peer.poolEntry, time.Duration(mclock.Now()-req.sent), true)
  163. glog.V(logger.Debug).Infof("hard timeout by peer %v", req.peer.id)
  164. go f.pm.removePeer(req.peer.id)
  165. }
  166. case resp := <-f.deliverChn:
  167. f.reqMu.Lock()
  168. req, ok := f.requested[resp.reqID]
  169. if ok && req.peer != resp.peer {
  170. ok = false
  171. }
  172. if ok {
  173. delete(f.requested, resp.reqID)
  174. }
  175. f.reqMu.Unlock()
  176. if ok {
  177. f.pm.serverPool.adjustResponseTime(req.peer.poolEntry, time.Duration(mclock.Now()-req.sent), req.timeout)
  178. }
  179. f.lock.Lock()
  180. if !ok || !(f.syncing || f.processResponse(req, resp)) {
  181. glog.V(logger.Debug).Infof("failed processing response by peer %v", resp.peer.id)
  182. go f.pm.removePeer(resp.peer.id)
  183. }
  184. f.lock.Unlock()
  185. case p := <-f.syncDone:
  186. f.lock.Lock()
  187. glog.V(logger.Debug).Infof("done synchronising with peer %v", p.id)
  188. f.checkSyncedHeaders(p)
  189. f.syncing = false
  190. f.lock.Unlock()
  191. }
  192. }
  193. }
  194. // addPeer adds a new peer to the fetcher's peer set
  195. func (f *lightFetcher) addPeer(p *peer) {
  196. p.lock.Lock()
  197. p.hasBlock = func(hash common.Hash, number uint64) bool {
  198. return f.peerHasBlock(p, hash, number)
  199. }
  200. p.lock.Unlock()
  201. f.lock.Lock()
  202. defer f.lock.Unlock()
  203. f.peers[p] = &fetcherPeerInfo{nodeByHash: make(map[common.Hash]*fetcherTreeNode)}
  204. }
  205. // removePeer removes a new peer from the fetcher's peer set
  206. func (f *lightFetcher) removePeer(p *peer) {
  207. p.lock.Lock()
  208. p.hasBlock = nil
  209. p.lock.Unlock()
  210. f.lock.Lock()
  211. defer f.lock.Unlock()
  212. // check for potential timed out block delay statistics
  213. f.checkUpdateStats(p, nil)
  214. delete(f.peers, p)
  215. }
  216. // announce processes a new announcement message received from a peer, adding new
  217. // nodes to the peer's block tree and removing old nodes if necessary
  218. func (f *lightFetcher) announce(p *peer, head *announceData) {
  219. f.lock.Lock()
  220. defer f.lock.Unlock()
  221. glog.V(logger.Debug).Infof("received announce from peer %v #%d %016x reorg: %d", p.id, head.Number, head.Hash[:8], head.ReorgDepth)
  222. fp := f.peers[p]
  223. if fp == nil {
  224. glog.V(logger.Debug).Infof("announce: unknown peer")
  225. return
  226. }
  227. if fp.lastAnnounced != nil && head.Td.Cmp(fp.lastAnnounced.td) <= 0 {
  228. // announced tds should be strictly monotonic
  229. glog.V(logger.Debug).Infof("non-monotonic Td from peer %v", p.id)
  230. go f.pm.removePeer(p.id)
  231. return
  232. }
  233. n := fp.lastAnnounced
  234. for i := uint64(0); i < head.ReorgDepth; i++ {
  235. if n == nil {
  236. break
  237. }
  238. n = n.parent
  239. }
  240. if n != nil {
  241. // n is now the reorg common ancestor, add a new branch of nodes
  242. // check if the node count is too high to add new nodes
  243. locked := false
  244. for uint64(fp.nodeCnt)+head.Number-n.number > maxNodeCount && fp.root != nil {
  245. if !locked {
  246. f.chain.LockChain()
  247. defer f.chain.UnlockChain()
  248. locked = true
  249. }
  250. // if one of root's children is canonical, keep it, delete other branches and root itself
  251. var newRoot *fetcherTreeNode
  252. for i, nn := range fp.root.children {
  253. if core.GetCanonicalHash(f.pm.chainDb, nn.number) == nn.hash {
  254. fp.root.children = append(fp.root.children[:i], fp.root.children[i+1:]...)
  255. nn.parent = nil
  256. newRoot = nn
  257. break
  258. }
  259. }
  260. fp.deleteNode(fp.root)
  261. if n == fp.root {
  262. n = newRoot
  263. }
  264. fp.root = newRoot
  265. if newRoot == nil || !f.checkKnownNode(p, newRoot) {
  266. fp.bestConfirmed = nil
  267. fp.confirmedTd = nil
  268. }
  269. if n == nil {
  270. break
  271. }
  272. }
  273. if n != nil {
  274. for n.number < head.Number {
  275. nn := &fetcherTreeNode{number: n.number + 1, parent: n}
  276. n.children = append(n.children, nn)
  277. n = nn
  278. fp.nodeCnt++
  279. }
  280. n.hash = head.Hash
  281. n.td = head.Td
  282. fp.nodeByHash[n.hash] = n
  283. }
  284. }
  285. if n == nil {
  286. // could not find reorg common ancestor or had to delete entire tree, a new root and a resync is needed
  287. if fp.root != nil {
  288. fp.deleteNode(fp.root)
  289. }
  290. n = &fetcherTreeNode{hash: head.Hash, number: head.Number, td: head.Td}
  291. fp.root = n
  292. fp.nodeCnt++
  293. fp.nodeByHash[n.hash] = n
  294. fp.bestConfirmed = nil
  295. fp.confirmedTd = nil
  296. }
  297. f.checkKnownNode(p, n)
  298. p.lock.Lock()
  299. p.headInfo = head
  300. fp.lastAnnounced = n
  301. p.lock.Unlock()
  302. f.checkUpdateStats(p, nil)
  303. f.requestChn <- true
  304. }
  305. // peerHasBlock returns true if we can assume the peer knows the given block
  306. // based on its announcements
  307. func (f *lightFetcher) peerHasBlock(p *peer, hash common.Hash, number uint64) bool {
  308. f.lock.Lock()
  309. defer f.lock.Unlock()
  310. fp := f.peers[p]
  311. if fp == nil || fp.root == nil {
  312. return false
  313. }
  314. if number >= fp.root.number {
  315. // it is recent enough that if it is known, is should be in the peer's block tree
  316. return fp.nodeByHash[hash] != nil
  317. }
  318. f.chain.LockChain()
  319. defer f.chain.UnlockChain()
  320. // if it's older than the peer's block tree root but it's in the same canonical chain
  321. // than the root, we can still be sure the peer knows it
  322. return core.GetCanonicalHash(f.pm.chainDb, fp.root.number) == fp.root.hash && core.GetCanonicalHash(f.pm.chainDb, number) == hash
  323. }
  324. // request initiates a header download request from a certain peer
  325. func (f *lightFetcher) request(p *peer, reqID uint64, n *fetcherTreeNode, amount uint64) (uint64, bool) {
  326. fp := f.peers[p]
  327. if fp == nil {
  328. glog.V(logger.Debug).Infof("request: unknown peer")
  329. p.fcServer.DeassignRequest(reqID)
  330. return 0, false
  331. }
  332. if fp.bestConfirmed == nil || fp.root == nil || !f.checkKnownNode(p, fp.root) {
  333. f.syncing = true
  334. go func() {
  335. glog.V(logger.Debug).Infof("synchronising with peer %v", p.id)
  336. f.pm.synchronise(p)
  337. f.syncDone <- p
  338. }()
  339. p.fcServer.DeassignRequest(reqID)
  340. return 0, false
  341. }
  342. n.requested = true
  343. cost := p.GetRequestCost(GetBlockHeadersMsg, int(amount))
  344. p.fcServer.SendRequest(reqID, cost)
  345. f.reqMu.Lock()
  346. f.requested[reqID] = fetchRequest{hash: n.hash, amount: amount, peer: p, sent: mclock.Now()}
  347. f.reqMu.Unlock()
  348. go p.RequestHeadersByHash(reqID, cost, n.hash, int(amount), 0, true)
  349. go func() {
  350. time.Sleep(hardRequestTimeout)
  351. f.timeoutChn <- reqID
  352. }()
  353. return reqID, true
  354. }
  355. // requestAmount calculates the amount of headers to be downloaded starting
  356. // from a certain head backwards
  357. func (f *lightFetcher) requestAmount(p *peer, n *fetcherTreeNode) uint64 {
  358. amount := uint64(0)
  359. nn := n
  360. for nn != nil && !f.checkKnownNode(p, nn) {
  361. nn = nn.parent
  362. amount++
  363. }
  364. if nn == nil {
  365. amount = n.number
  366. }
  367. return amount
  368. }
  369. // requestedID tells if a certain reqID has been requested by the fetcher
  370. func (f *lightFetcher) requestedID(reqID uint64) bool {
  371. f.reqMu.RLock()
  372. _, ok := f.requested[reqID]
  373. f.reqMu.RUnlock()
  374. return ok
  375. }
  376. // nextRequest selects the peer and announced head to be requested next, amount
  377. // to be downloaded starting from the head backwards is also returned
  378. func (f *lightFetcher) nextRequest(reqID uint64) (*peer, *fetcherTreeNode, uint64, bool) {
  379. var (
  380. bestHash common.Hash
  381. bestAmount uint64
  382. )
  383. bestTd := f.maxConfirmedTd
  384. for p, fp := range f.peers {
  385. for hash, n := range fp.nodeByHash {
  386. if !f.checkKnownNode(p, n) && !n.requested && (bestTd == nil || n.td.Cmp(bestTd) >= 0) {
  387. amount := f.requestAmount(p, n)
  388. if bestTd == nil || n.td.Cmp(bestTd) > 0 || amount < bestAmount {
  389. bestHash = hash
  390. bestAmount = amount
  391. bestTd = n.td
  392. }
  393. }
  394. }
  395. }
  396. if bestTd == f.maxConfirmedTd {
  397. return nil, nil, 0, false
  398. }
  399. peer, _, locked := f.pm.serverPool.selectPeer(reqID, func(p *peer) (bool, time.Duration) {
  400. fp := f.peers[p]
  401. if fp == nil || fp.nodeByHash[bestHash] == nil {
  402. return false, 0
  403. }
  404. return true, p.fcServer.CanSend(p.GetRequestCost(GetBlockHeadersMsg, int(bestAmount)))
  405. })
  406. if !locked {
  407. return nil, nil, 0, true
  408. }
  409. var node *fetcherTreeNode
  410. if peer != nil {
  411. node = f.peers[peer].nodeByHash[bestHash]
  412. }
  413. return peer, node, bestAmount, false
  414. }
  415. // deliverHeaders delivers header download request responses for processing
  416. func (f *lightFetcher) deliverHeaders(peer *peer, reqID uint64, headers []*types.Header) {
  417. f.deliverChn <- fetchResponse{reqID: reqID, headers: headers, peer: peer}
  418. }
  419. // processResponse processes header download request responses, returns true if successful
  420. func (f *lightFetcher) processResponse(req fetchRequest, resp fetchResponse) bool {
  421. if uint64(len(resp.headers)) != req.amount || resp.headers[0].Hash() != req.hash {
  422. glog.V(logger.Debug).Infof("response mismatch %v %016x != %v %016x", len(resp.headers), resp.headers[0].Hash().Bytes()[:8], req.amount, req.hash[:8])
  423. return false
  424. }
  425. headers := make([]*types.Header, req.amount)
  426. for i, header := range resp.headers {
  427. headers[int(req.amount)-1-i] = header
  428. }
  429. if _, err := f.chain.InsertHeaderChain(headers, 1); err != nil {
  430. if err == core.BlockFutureErr {
  431. return true
  432. }
  433. glog.V(logger.Debug).Infof("InsertHeaderChain error: %v", err)
  434. return false
  435. }
  436. tds := make([]*big.Int, len(headers))
  437. for i, header := range headers {
  438. td := f.chain.GetTd(header.Hash(), header.Number.Uint64())
  439. if td == nil {
  440. glog.V(logger.Debug).Infof("TD not found for header %v of %v", i+1, len(headers))
  441. return false
  442. }
  443. tds[i] = td
  444. }
  445. f.newHeaders(headers, tds)
  446. return true
  447. }
  448. // newHeaders updates the block trees of all active peers according to a newly
  449. // downloaded and validated batch or headers
  450. func (f *lightFetcher) newHeaders(headers []*types.Header, tds []*big.Int) {
  451. var maxTd *big.Int
  452. for p, fp := range f.peers {
  453. if !f.checkAnnouncedHeaders(fp, headers, tds) {
  454. glog.V(logger.Debug).Infof("announce inconsistency by peer %v", p.id)
  455. go f.pm.removePeer(p.id)
  456. }
  457. if fp.confirmedTd != nil && (maxTd == nil || maxTd.Cmp(fp.confirmedTd) > 0) {
  458. maxTd = fp.confirmedTd
  459. }
  460. }
  461. if maxTd != nil {
  462. f.updateMaxConfirmedTd(maxTd)
  463. }
  464. }
  465. // checkAnnouncedHeaders updates peer's block tree if necessary after validating
  466. // a batch of headers. It searches for the latest header in the batch that has a
  467. // matching tree node (if any), and if it has not been marked as known already,
  468. // sets it and its parents to known (even those which are older than the currently
  469. // validated ones). Return value shows if all hashes, numbers and Tds matched
  470. // correctly to the announced values (otherwise the peer should be dropped).
  471. func (f *lightFetcher) checkAnnouncedHeaders(fp *fetcherPeerInfo, headers []*types.Header, tds []*big.Int) bool {
  472. var (
  473. n *fetcherTreeNode
  474. header *types.Header
  475. td *big.Int
  476. )
  477. for i := len(headers) - 1; ; i-- {
  478. if i < 0 {
  479. if n == nil {
  480. // no more headers and nothing to match
  481. return true
  482. }
  483. // we ran out of recently delivered headers but have not reached a node known by this peer yet, continue matching
  484. td = f.chain.GetTd(header.ParentHash, header.Number.Uint64()-1)
  485. header = f.chain.GetHeader(header.ParentHash, header.Number.Uint64()-1)
  486. } else {
  487. header = headers[i]
  488. td = tds[i]
  489. }
  490. hash := header.Hash()
  491. number := header.Number.Uint64()
  492. if n == nil {
  493. n = fp.nodeByHash[hash]
  494. }
  495. if n != nil {
  496. if n.td == nil {
  497. // node was unannounced
  498. if nn := fp.nodeByHash[hash]; nn != nil {
  499. // if there was already a node with the same hash, continue there and drop this one
  500. nn.children = append(nn.children, n.children...)
  501. n.children = nil
  502. fp.deleteNode(n)
  503. n = nn
  504. } else {
  505. n.hash = hash
  506. n.td = td
  507. fp.nodeByHash[hash] = n
  508. }
  509. }
  510. // check if it matches the header
  511. if n.hash != hash || n.number != number || n.td.Cmp(td) != 0 {
  512. // peer has previously made an invalid announcement
  513. return false
  514. }
  515. if n.known {
  516. // we reached a known node that matched our expectations, return with success
  517. return true
  518. }
  519. n.known = true
  520. if fp.confirmedTd == nil || td.Cmp(fp.confirmedTd) > 0 {
  521. fp.confirmedTd = td
  522. fp.bestConfirmed = n
  523. }
  524. n = n.parent
  525. if n == nil {
  526. return true
  527. }
  528. }
  529. }
  530. }
  531. // checkSyncedHeaders updates peer's block tree after synchronisation by marking
  532. // downloaded headers as known. If none of the announced headers are found after
  533. // syncing, the peer is dropped.
  534. func (f *lightFetcher) checkSyncedHeaders(p *peer) {
  535. fp := f.peers[p]
  536. if fp == nil {
  537. glog.V(logger.Debug).Infof("checkSyncedHeaders: unknown peer")
  538. return
  539. }
  540. n := fp.lastAnnounced
  541. var td *big.Int
  542. for n != nil {
  543. if td = f.chain.GetTd(n.hash, n.number); td != nil {
  544. break
  545. }
  546. n = n.parent
  547. }
  548. // now n is the latest downloaded header after syncing
  549. if n == nil {
  550. glog.V(logger.Debug).Infof("synchronisation failed with peer %v", p.id)
  551. go f.pm.removePeer(p.id)
  552. } else {
  553. header := f.chain.GetHeader(n.hash, n.number)
  554. f.newHeaders([]*types.Header{header}, []*big.Int{td})
  555. }
  556. }
  557. // checkKnownNode checks if a block tree node is known (downloaded and validated)
  558. // If it was not known previously but found in the database, sets its known flag
  559. func (f *lightFetcher) checkKnownNode(p *peer, n *fetcherTreeNode) bool {
  560. if n.known {
  561. return true
  562. }
  563. td := f.chain.GetTd(n.hash, n.number)
  564. if td == nil {
  565. return false
  566. }
  567. fp := f.peers[p]
  568. if fp == nil {
  569. glog.V(logger.Debug).Infof("checkKnownNode: unknown peer")
  570. return false
  571. }
  572. header := f.chain.GetHeader(n.hash, n.number)
  573. if !f.checkAnnouncedHeaders(fp, []*types.Header{header}, []*big.Int{td}) {
  574. glog.V(logger.Debug).Infof("announce inconsistency by peer %v", p.id)
  575. go f.pm.removePeer(p.id)
  576. }
  577. if fp.confirmedTd != nil {
  578. f.updateMaxConfirmedTd(fp.confirmedTd)
  579. }
  580. return n.known
  581. }
  582. // deleteNode deletes a node and its child subtrees from a peer's block tree
  583. func (fp *fetcherPeerInfo) deleteNode(n *fetcherTreeNode) {
  584. if n.parent != nil {
  585. for i, nn := range n.parent.children {
  586. if nn == n {
  587. n.parent.children = append(n.parent.children[:i], n.parent.children[i+1:]...)
  588. break
  589. }
  590. }
  591. }
  592. for {
  593. if n.td != nil {
  594. delete(fp.nodeByHash, n.hash)
  595. }
  596. fp.nodeCnt--
  597. if len(n.children) == 0 {
  598. return
  599. }
  600. for i, nn := range n.children {
  601. if i == 0 {
  602. n = nn
  603. } else {
  604. fp.deleteNode(nn)
  605. }
  606. }
  607. }
  608. }
  609. // updateStatsEntry items form a linked list that is expanded with a new item every time a new head with a higher Td
  610. // than the previous one has been downloaded and validated. The list contains a series of maximum confirmed Td values
  611. // and the time these values have been confirmed, both increasing monotonically. A maximum confirmed Td is calculated
  612. // both globally for all peers and also for each individual peer (meaning that the given peer has announced the head
  613. // and it has also been downloaded from any peer, either before or after the given announcement).
  614. // The linked list has a global tail where new confirmed Td entries are added and a separate head for each peer,
  615. // pointing to the next Td entry that is higher than the peer's max confirmed Td (nil if it has already confirmed
  616. // the current global head).
  617. type updateStatsEntry struct {
  618. time mclock.AbsTime
  619. td *big.Int
  620. next *updateStatsEntry
  621. }
  622. // updateMaxConfirmedTd updates the block delay statistics of active peers. Whenever a new highest Td is confirmed,
  623. // adds it to the end of a linked list together with the time it has been confirmed. Then checks which peers have
  624. // already confirmed a head with the same or higher Td (which counts as zero block delay) and updates their statistics.
  625. // Those who have not confirmed such a head by now will be updated by a subsequent checkUpdateStats call with a
  626. // positive block delay value.
  627. func (f *lightFetcher) updateMaxConfirmedTd(td *big.Int) {
  628. if f.maxConfirmedTd == nil || td.Cmp(f.maxConfirmedTd) > 0 {
  629. f.maxConfirmedTd = td
  630. newEntry := &updateStatsEntry{
  631. time: mclock.Now(),
  632. td: td,
  633. }
  634. if f.lastUpdateStats != nil {
  635. f.lastUpdateStats.next = newEntry
  636. }
  637. f.lastUpdateStats = newEntry
  638. for p := range f.peers {
  639. f.checkUpdateStats(p, newEntry)
  640. }
  641. }
  642. }
  643. // checkUpdateStats checks those peers who have not confirmed a certain highest Td (or a larger one) by the time it
  644. // has been confirmed by another peer. If they have confirmed such a head by now, their stats are updated with the
  645. // block delay which is (this peer's confirmation time)-(first confirmation time). After blockDelayTimeout has passed,
  646. // the stats are updated with blockDelayTimeout value. In either case, the confirmed or timed out updateStatsEntry
  647. // items are removed from the head of the linked list.
  648. // If a new entry has been added to the global tail, it is passed as a parameter here even though this function
  649. // assumes that it has already been added, so that if the peer's list is empty (all heads confirmed, head is nil),
  650. // it can set the new head to newEntry.
  651. func (f *lightFetcher) checkUpdateStats(p *peer, newEntry *updateStatsEntry) {
  652. now := mclock.Now()
  653. fp := f.peers[p]
  654. if fp == nil {
  655. glog.V(logger.Debug).Infof("checkUpdateStats: unknown peer")
  656. return
  657. }
  658. if newEntry != nil && fp.firstUpdateStats == nil {
  659. fp.firstUpdateStats = newEntry
  660. }
  661. for fp.firstUpdateStats != nil && fp.firstUpdateStats.time <= now-mclock.AbsTime(blockDelayTimeout) {
  662. f.pm.serverPool.adjustBlockDelay(p.poolEntry, blockDelayTimeout)
  663. fp.firstUpdateStats = fp.firstUpdateStats.next
  664. }
  665. if fp.confirmedTd != nil {
  666. for fp.firstUpdateStats != nil && fp.firstUpdateStats.td.Cmp(fp.confirmedTd) <= 0 {
  667. f.pm.serverPool.adjustBlockDelay(p.poolEntry, time.Duration(now-fp.firstUpdateStats.time))
  668. fp.firstUpdateStats = fp.firstUpdateStats.next
  669. }
  670. }
  671. }