skeleton.go 47 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186
  1. // Copyright 2022 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package downloader
  17. import (
  18. "encoding/json"
  19. "errors"
  20. "fmt"
  21. "math/rand"
  22. "sort"
  23. "time"
  24. "github.com/ethereum/go-ethereum/common"
  25. "github.com/ethereum/go-ethereum/core/rawdb"
  26. "github.com/ethereum/go-ethereum/core/types"
  27. "github.com/ethereum/go-ethereum/eth/protocols/eth"
  28. "github.com/ethereum/go-ethereum/ethdb"
  29. "github.com/ethereum/go-ethereum/log"
  30. )
  31. // scratchHeaders is the number of headers to store in a scratch space to allow
  32. // concurrent downloads. A header is about 0.5KB in size, so there is no worry
  33. // about using too much memory. The only catch is that we can only validate gaps
  34. // afer they're linked to the head, so the bigger the scratch space, the larger
  35. // potential for invalid headers.
  36. //
  37. // The current scratch space of 131072 headers is expected to use 64MB RAM.
  38. const scratchHeaders = 131072
  39. // requestHeaders is the number of header to request from a remote peer in a single
  40. // network packet. Although the skeleton downloader takes into consideration peer
  41. // capacities when picking idlers, the packet size was decided to remain constant
  42. // since headers are relatively small and it's easier to work with fixed batches
  43. // vs. dynamic interval fillings.
  44. const requestHeaders = 512
  45. // errSyncLinked is an internal helper error to signal that the current sync
  46. // cycle linked up to the genesis block, this the skeleton syncer should ping
  47. // the backfiller to resume. Since we already have that logic on sync start,
  48. // piggy-back on that instead of 2 entrypoints.
  49. var errSyncLinked = errors.New("sync linked")
  50. // errSyncMerged is an internal helper error to signal that the current sync
  51. // cycle merged with a previously aborted subchain, thus the skeleton syncer
  52. // should abort and restart with the new state.
  53. var errSyncMerged = errors.New("sync merged")
  54. // errSyncReorged is an internal helper error to signal that the head chain of
  55. // the current sync cycle was (partially) reorged, thus the skeleton syncer
  56. // should abort and restart with the new state.
  57. var errSyncReorged = errors.New("sync reorged")
  58. // errTerminated is returned if the sync mechanism was terminated for this run of
  59. // the process. This is usually the case when Geth is shutting down and some events
  60. // might still be propagating.
  61. var errTerminated = errors.New("terminated")
  62. // errReorgDenied is returned if an attempt is made to extend the beacon chain
  63. // with a new header, but it does not link up to the existing sync.
  64. var errReorgDenied = errors.New("non-forced head reorg denied")
  65. func init() {
  66. // Tuning parameters is nice, but the scratch space must be assignable in
  67. // full to peers. It's a useless cornercase to support a dangling half-group.
  68. if scratchHeaders%requestHeaders != 0 {
  69. panic("Please make scratchHeaders divisible by requestHeaders")
  70. }
  71. }
  72. // subchain is a contiguous header chain segment that is backed by the database,
  73. // but may not be linked to the live chain. The skeleton downloader may produce
  74. // a new one of these every time it is restarted until the subchain grows large
  75. // enough to connect with a previous subchain.
  76. //
  77. // The subchains use the exact same database namespace and are not disjoint from
  78. // each other. As such, extending one to overlap the other entails reducing the
  79. // second one first. This combined buffer model is used to avoid having to move
  80. // data on disk when two subchains are joined together.
  81. type subchain struct {
  82. Head uint64 // Block number of the newest header in the subchain
  83. Tail uint64 // Block number of the oldest header in the subchain
  84. Next common.Hash // Block hash of the next oldest header in the subchain
  85. }
  86. // skeletonProgress is a database entry to allow suspending and resuming a chain
  87. // sync. As the skeleton header chain is downloaded backwards, restarts can and
  88. // will produce temporarily disjoint subchains. There is no way to restart a
  89. // suspended skeleton sync without prior knowledge of all prior suspension points.
  90. type skeletonProgress struct {
  91. Subchains []*subchain // Disjoint subchains downloaded until now
  92. }
  93. // headUpdate is a notification that the beacon sync should switch to a new target.
  94. // The update might request whether to forcefully change the target, or only try to
  95. // extend it and fail if it's not possible.
  96. type headUpdate struct {
  97. header *types.Header // Header to update the sync target to
  98. force bool // Whether to force the update or only extend if possible
  99. errc chan error // Channel to signal acceptance of the new head
  100. }
  101. // headerRequest tracks a pending header request to ensure responses are to
  102. // actual requests and to validate any security constraints.
  103. //
  104. // Concurrency note: header requests and responses are handled concurrently from
  105. // the main runloop to allow Keccak256 hash verifications on the peer's thread and
  106. // to drop on invalid response. The request struct must contain all the data to
  107. // construct the response without accessing runloop internals (i.e. subchains).
  108. // That is only included to allow the runloop to match a response to the task being
  109. // synced without having yet another set of maps.
  110. type headerRequest struct {
  111. peer string // Peer to which this request is assigned
  112. id uint64 // Request ID of this request
  113. deliver chan *headerResponse // Channel to deliver successful response on
  114. revert chan *headerRequest // Channel to deliver request failure on
  115. cancel chan struct{} // Channel to track sync cancellation
  116. stale chan struct{} // Channel to signal the request was dropped
  117. head uint64 // Head number of the requested batch of headers
  118. }
  119. // headerResponse is an already verified remote response to a header request.
  120. type headerResponse struct {
  121. peer *peerConnection // Peer from which this response originates
  122. reqid uint64 // Request ID that this response fulfils
  123. headers []*types.Header // Chain of headers
  124. }
  125. // backfiller is a callback interface through which the skeleton sync can tell
  126. // the downloader that it should suspend or resume backfilling on specific head
  127. // events (e.g. suspend on forks or gaps, resume on successful linkups).
  128. type backfiller interface {
  129. // suspend requests the backfiller to abort any running full or snap sync
  130. // based on the skeleton chain as it might be invalid. The backfiller should
  131. // gracefully handle multiple consecutive suspends without a resume, even
  132. // on initial startup.
  133. //
  134. // The method should return the last block header that has been successfully
  135. // backfilled, or nil if the backfiller was not resumed.
  136. suspend() *types.Header
  137. // resume requests the backfiller to start running fill or snap sync based on
  138. // the skeleton chain as it has successfully been linked. Appending new heads
  139. // to the end of the chain will not result in suspend/resume cycles.
  140. // leaking too much sync logic out to the filler.
  141. resume()
  142. }
  143. // skeleton represents a header chain synchronized after the merge where blocks
  144. // aren't validated any more via PoW in a forward fashion, rather are dictated
  145. // and extended at the head via the beacon chain and backfilled on the original
  146. // Ethereum block sync protocol.
  147. //
  148. // Since the skeleton is grown backwards from head to genesis, it is handled as
  149. // a separate entity, not mixed in with the logical sequential transition of the
  150. // blocks. Once the skeleton is connected to an existing, validated chain, the
  151. // headers will be moved into the main downloader for filling and execution.
  152. //
  153. // Opposed to the original Ethereum block synchronization which is trustless (and
  154. // uses a master peer to minimize the attack surface), post-merge block sync starts
  155. // from a trusted head. As such, there is no need for a master peer any more and
  156. // headers can be requested fully concurrently (though some batches might be
  157. // discarded if they don't link up correctly).
  158. //
  159. // Although a skeleton is part of a sync cycle, it is not recreated, rather stays
  160. // alive throughout the lifetime of the downloader. This allows it to be extended
  161. // concurrently with the sync cycle, since extensions arrive from an API surface,
  162. // not from within (vs. legacy Ethereum sync).
  163. //
  164. // Since the skeleton tracks the entire header chain until it is consumed by the
  165. // forward block filling, it needs 0.5KB/block storage. At current mainnet sizes
  166. // this is only possible with a disk backend. Since the skeleton is separate from
  167. // the node's header chain, storing the headers ephemerally until sync finishes
  168. // is wasted disk IO, but it's a price we're going to pay to keep things simple
  169. // for now.
  170. type skeleton struct {
  171. db ethdb.Database // Database backing the skeleton
  172. filler backfiller // Chain syncer suspended/resumed by head events
  173. peers *peerSet // Set of peers we can sync from
  174. idles map[string]*peerConnection // Set of idle peers in the current sync cycle
  175. drop peerDropFn // Drops a peer for misbehaving
  176. progress *skeletonProgress // Sync progress tracker for resumption and metrics
  177. started time.Time // Timestamp when the skeleton syncer was created
  178. logged time.Time // Timestamp when progress was last logged to the user
  179. pulled uint64 // Number of headers downloaded in this run
  180. scratchSpace []*types.Header // Scratch space to accumulate headers in (first = recent)
  181. scratchOwners []string // Peer IDs owning chunks of the scratch space (pend or delivered)
  182. scratchHead uint64 // Block number of the first item in the scratch space
  183. requests map[uint64]*headerRequest // Header requests currently running
  184. headEvents chan *headUpdate // Notification channel for new heads
  185. terminate chan chan error // Termination channel to abort sync
  186. terminated chan struct{} // Channel to signal that the syncer is dead
  187. // Callback hooks used during testing
  188. syncStarting func() // callback triggered after a sync cycle is inited but before started
  189. }
  190. // newSkeleton creates a new sync skeleton that tracks a potentially dangling
  191. // header chain until it's linked into an existing set of blocks.
  192. func newSkeleton(db ethdb.Database, peers *peerSet, drop peerDropFn, filler backfiller) *skeleton {
  193. sk := &skeleton{
  194. db: db,
  195. filler: filler,
  196. peers: peers,
  197. drop: drop,
  198. requests: make(map[uint64]*headerRequest),
  199. headEvents: make(chan *headUpdate),
  200. terminate: make(chan chan error),
  201. terminated: make(chan struct{}),
  202. }
  203. go sk.startup()
  204. return sk
  205. }
  206. // startup is an initial background loop which waits for an event to start or
  207. // tear the syncer down. This is required to make the skeleton sync loop once
  208. // per process but at the same time not start before the beacon chain announces
  209. // a new (existing) head.
  210. func (s *skeleton) startup() {
  211. // Close a notification channel so anyone sending us events will know if the
  212. // sync loop was torn down for good.
  213. defer close(s.terminated)
  214. // Wait for startup or teardown. This wait might loop a few times if a beacon
  215. // client requests sync head extensions, but not forced reorgs (i.e. they are
  216. // giving us new payloads without setting a starting head initially).
  217. for {
  218. select {
  219. case errc := <-s.terminate:
  220. // No head was announced but Geth is shutting down
  221. errc <- nil
  222. return
  223. case event := <-s.headEvents:
  224. // New head announced, start syncing to it, looping every time a current
  225. // cycle is terminated due to a chain event (head reorg, old chain merge).
  226. if !event.force {
  227. event.errc <- errors.New("forced head needed for startup")
  228. continue
  229. }
  230. event.errc <- nil // forced head accepted for startup
  231. head := event.header
  232. s.started = time.Now()
  233. for {
  234. // If the sync cycle terminated or was terminated, propagate up when
  235. // higher layers request termination. There's no fancy explicit error
  236. // signalling as the sync loop should never terminate (TM).
  237. newhead, err := s.sync(head)
  238. switch {
  239. case err == errSyncLinked:
  240. // Sync cycle linked up to the genesis block. Tear down the loop
  241. // and restart it so, it can properly notify the backfiller. Don't
  242. // account a new head.
  243. head = nil
  244. case err == errSyncMerged:
  245. // Subchains were merged, we just need to reinit the internal
  246. // start to continue on the tail of the merged chain. Don't
  247. // announce a new head,
  248. head = nil
  249. case err == errSyncReorged:
  250. // The subchain being synced got modified at the head in a
  251. // way that requires resyncing it. Restart sync with the new
  252. // head to force a cleanup.
  253. head = newhead
  254. case err == errTerminated:
  255. // Sync was requested to be terminated from within, stop and
  256. // return (no need to pass a message, was already done internally)
  257. return
  258. default:
  259. // Sync either successfully terminated or failed with an unhandled
  260. // error. Abort and wait until Geth requests a termination.
  261. errc := <-s.terminate
  262. errc <- err
  263. return
  264. }
  265. }
  266. }
  267. }
  268. }
  269. // Terminate tears down the syncer indefinitely.
  270. func (s *skeleton) Terminate() error {
  271. // Request termination and fetch any errors
  272. errc := make(chan error)
  273. s.terminate <- errc
  274. err := <-errc
  275. // Wait for full shutdown (not necessary, but cleaner)
  276. <-s.terminated
  277. return err
  278. }
  279. // Sync starts or resumes a previous sync cycle to download and maintain a reverse
  280. // header chain starting at the head and leading towards genesis to an available
  281. // ancestor.
  282. //
  283. // This method does not block, rather it just waits until the syncer receives the
  284. // fed header. What the syncer does with it is the syncer's problem.
  285. func (s *skeleton) Sync(head *types.Header, force bool) error {
  286. log.Trace("New skeleton head announced", "number", head.Number, "hash", head.Hash(), "force", force)
  287. errc := make(chan error)
  288. select {
  289. case s.headEvents <- &headUpdate{header: head, force: force, errc: errc}:
  290. return <-errc
  291. case <-s.terminated:
  292. return errTerminated
  293. }
  294. }
  295. // sync is the internal version of Sync that executes a single sync cycle, either
  296. // until some termination condition is reached, or until the current cycle merges
  297. // with a previously aborted run.
  298. func (s *skeleton) sync(head *types.Header) (*types.Header, error) {
  299. // If we're continuing a previous merge interrupt, just access the existing
  300. // old state without initing from disk.
  301. if head == nil {
  302. head = rawdb.ReadSkeletonHeader(s.db, s.progress.Subchains[0].Head)
  303. } else {
  304. // Otherwise, initialize the sync, trimming and previous leftovers until
  305. // we're consistent with the newly requested chain head
  306. s.initSync(head)
  307. }
  308. // Create the scratch space to fill with concurrently downloaded headers
  309. s.scratchSpace = make([]*types.Header, scratchHeaders)
  310. defer func() { s.scratchSpace = nil }() // don't hold on to references after sync
  311. s.scratchOwners = make([]string, scratchHeaders/requestHeaders)
  312. defer func() { s.scratchOwners = nil }() // don't hold on to references after sync
  313. s.scratchHead = s.progress.Subchains[0].Tail - 1 // tail must not be 0!
  314. // If the sync is already done, resume the backfiller. When the loop stops,
  315. // terminate the backfiller too.
  316. linked := len(s.progress.Subchains) == 1 &&
  317. rawdb.HasHeader(s.db, s.progress.Subchains[0].Next, s.scratchHead) &&
  318. rawdb.HasBody(s.db, s.progress.Subchains[0].Next, s.scratchHead) &&
  319. rawdb.HasReceipts(s.db, s.progress.Subchains[0].Next, s.scratchHead)
  320. if linked {
  321. s.filler.resume()
  322. }
  323. defer func() {
  324. if filled := s.filler.suspend(); filled != nil {
  325. // If something was filled, try to delete stale sync helpers. If
  326. // unsuccessful, warn the user, but not much else we can do (it's
  327. // a programming error, just let users report an issue and don't
  328. // choke in the meantime).
  329. if err := s.cleanStales(filled); err != nil {
  330. log.Error("Failed to clean stale beacon headers", "err", err)
  331. }
  332. }
  333. }()
  334. // Create a set of unique channels for this sync cycle. We need these to be
  335. // ephemeral so a data race doesn't accidentally deliver something stale on
  336. // a persistent channel across syncs (yup, this happened)
  337. var (
  338. requestFails = make(chan *headerRequest)
  339. responses = make(chan *headerResponse)
  340. )
  341. cancel := make(chan struct{})
  342. defer close(cancel)
  343. log.Debug("Starting reverse header sync cycle", "head", head.Number, "hash", head.Hash(), "cont", s.scratchHead)
  344. // Whether sync completed or not, disregard any future packets
  345. defer func() {
  346. log.Debug("Terminating reverse header sync cycle", "head", head.Number, "hash", head.Hash(), "cont", s.scratchHead)
  347. s.requests = make(map[uint64]*headerRequest)
  348. }()
  349. // Start tracking idle peers for task assignments
  350. peering := make(chan *peeringEvent, 64) // arbitrary buffer, just some burst protection
  351. peeringSub := s.peers.SubscribeEvents(peering)
  352. defer peeringSub.Unsubscribe()
  353. s.idles = make(map[string]*peerConnection)
  354. for _, peer := range s.peers.AllPeers() {
  355. s.idles[peer.id] = peer
  356. }
  357. // Nofity any tester listening for startup events
  358. if s.syncStarting != nil {
  359. s.syncStarting()
  360. }
  361. for {
  362. // Something happened, try to assign new tasks to any idle peers
  363. if !linked {
  364. s.assignTasks(responses, requestFails, cancel)
  365. }
  366. // Wait for something to happen
  367. select {
  368. case event := <-peering:
  369. // A peer joined or left, the tasks queue and allocations need to be
  370. // checked for potential assignment or reassignment
  371. peerid := event.peer.id
  372. if event.join {
  373. log.Debug("Joining skeleton peer", "id", peerid)
  374. s.idles[peerid] = event.peer
  375. } else {
  376. log.Debug("Leaving skeleton peer", "id", peerid)
  377. s.revertRequests(peerid)
  378. delete(s.idles, peerid)
  379. }
  380. case errc := <-s.terminate:
  381. errc <- nil
  382. return nil, errTerminated
  383. case event := <-s.headEvents:
  384. // New head was announced, try to integrate it. If successful, nothing
  385. // needs to be done as the head simply extended the last range. For now
  386. // we don't seamlessly integrate reorgs to keep things simple. If the
  387. // network starts doing many mini reorgs, it might be worthwhile handling
  388. // a limited depth without an error.
  389. if reorged := s.processNewHead(event.header, event.force); reorged {
  390. // If a reorg is needed, and we're forcing the new head, signal
  391. // the syncer to tear down and start over. Otherwise, drop the
  392. // non-force reorg.
  393. if event.force {
  394. event.errc <- nil // forced head reorg accepted
  395. return event.header, errSyncReorged
  396. }
  397. event.errc <- errReorgDenied
  398. continue
  399. }
  400. event.errc <- nil // head extension accepted
  401. // New head was integrated into the skeleton chain. If the backfiller
  402. // is still running, it will pick it up. If it already terminated,
  403. // a new cycle needs to be spun up.
  404. if linked {
  405. s.filler.resume()
  406. }
  407. case req := <-requestFails:
  408. s.revertRequest(req)
  409. case res := <-responses:
  410. // Process the batch of headers. If though processing we managed to
  411. // link the current subchain to a previously downloaded one, abort the
  412. // sync and restart with the merged subchains.
  413. //
  414. // If we managed to link to the existing local chain or genesis block,
  415. // abort sync altogether.
  416. linked, merged := s.processResponse(res)
  417. if linked {
  418. log.Debug("Beacon sync linked to local chain")
  419. return nil, errSyncLinked
  420. }
  421. if merged {
  422. log.Debug("Beacon sync merged subchains")
  423. return nil, errSyncMerged
  424. }
  425. // We still have work to do, loop and repeat
  426. }
  427. }
  428. }
  429. // initSync attempts to get the skeleton sync into a consistent state wrt any
  430. // past state on disk and the newly requested head to sync to. If the new head
  431. // is nil, the method will return and continue from the previous head.
  432. func (s *skeleton) initSync(head *types.Header) {
  433. // Extract the head number, we'll need it all over
  434. number := head.Number.Uint64()
  435. // Retrieve the previously saved sync progress
  436. if status := rawdb.ReadSkeletonSyncStatus(s.db); len(status) > 0 {
  437. s.progress = new(skeletonProgress)
  438. if err := json.Unmarshal(status, s.progress); err != nil {
  439. log.Error("Failed to decode skeleton sync status", "err", err)
  440. } else {
  441. // Previous sync was available, print some continuation logs
  442. for _, subchain := range s.progress.Subchains {
  443. log.Debug("Restarting skeleton subchain", "head", subchain.Head, "tail", subchain.Tail)
  444. }
  445. // Create a new subchain for the head (unless the last can be extended),
  446. // trimming anything it would overwrite
  447. headchain := &subchain{
  448. Head: number,
  449. Tail: number,
  450. Next: head.ParentHash,
  451. }
  452. for len(s.progress.Subchains) > 0 {
  453. // If the last chain is above the new head, delete altogether
  454. lastchain := s.progress.Subchains[0]
  455. if lastchain.Tail >= headchain.Tail {
  456. log.Debug("Dropping skeleton subchain", "head", lastchain.Head, "tail", lastchain.Tail)
  457. s.progress.Subchains = s.progress.Subchains[1:]
  458. continue
  459. }
  460. // Otherwise truncate the last chain if needed and abort trimming
  461. if lastchain.Head >= headchain.Tail {
  462. log.Debug("Trimming skeleton subchain", "oldhead", lastchain.Head, "newhead", headchain.Tail-1, "tail", lastchain.Tail)
  463. lastchain.Head = headchain.Tail - 1
  464. }
  465. break
  466. }
  467. // If the last subchain can be extended, we're lucky. Otherwise create
  468. // a new subchain sync task.
  469. var extended bool
  470. if n := len(s.progress.Subchains); n > 0 {
  471. lastchain := s.progress.Subchains[0]
  472. if lastchain.Head == headchain.Tail-1 {
  473. lasthead := rawdb.ReadSkeletonHeader(s.db, lastchain.Head)
  474. if lasthead.Hash() == head.ParentHash {
  475. log.Debug("Extended skeleton subchain with new head", "head", headchain.Tail, "tail", lastchain.Tail)
  476. lastchain.Head = headchain.Tail
  477. extended = true
  478. }
  479. }
  480. }
  481. if !extended {
  482. log.Debug("Created new skeleton subchain", "head", number, "tail", number)
  483. s.progress.Subchains = append([]*subchain{headchain}, s.progress.Subchains...)
  484. }
  485. // Update the database with the new sync stats and insert the new
  486. // head header. We won't delete any trimmed skeleton headers since
  487. // those will be outside the index space of the many subchains and
  488. // the database space will be reclaimed eventually when processing
  489. // blocks above the current head (TODO(karalabe): don't forget).
  490. batch := s.db.NewBatch()
  491. rawdb.WriteSkeletonHeader(batch, head)
  492. s.saveSyncStatus(batch)
  493. if err := batch.Write(); err != nil {
  494. log.Crit("Failed to write skeleton sync status", "err", err)
  495. }
  496. return
  497. }
  498. }
  499. // Either we've failed to decode the previous state, or there was none. Start
  500. // a fresh sync with a single subchain represented by the currently sent
  501. // chain head.
  502. s.progress = &skeletonProgress{
  503. Subchains: []*subchain{
  504. {
  505. Head: number,
  506. Tail: number,
  507. Next: head.ParentHash,
  508. },
  509. },
  510. }
  511. batch := s.db.NewBatch()
  512. rawdb.WriteSkeletonHeader(batch, head)
  513. s.saveSyncStatus(batch)
  514. if err := batch.Write(); err != nil {
  515. log.Crit("Failed to write initial skeleton sync status", "err", err)
  516. }
  517. log.Debug("Created initial skeleton subchain", "head", number, "tail", number)
  518. }
  519. // saveSyncStatus marshals the remaining sync tasks into leveldb.
  520. func (s *skeleton) saveSyncStatus(db ethdb.KeyValueWriter) {
  521. status, err := json.Marshal(s.progress)
  522. if err != nil {
  523. panic(err) // This can only fail during implementation
  524. }
  525. rawdb.WriteSkeletonSyncStatus(db, status)
  526. }
  527. // processNewHead does the internal shuffling for a new head marker and either
  528. // accepts and integrates it into the skeleton or requests a reorg. Upon reorg,
  529. // the syncer will tear itself down and restart with a fresh head. It is simpler
  530. // to reconstruct the sync state than to mutate it and hope for the best.
  531. func (s *skeleton) processNewHead(head *types.Header, force bool) bool {
  532. // If the header cannot be inserted without interruption, return an error for
  533. // the outer loop to tear down the skeleton sync and restart it
  534. number := head.Number.Uint64()
  535. lastchain := s.progress.Subchains[0]
  536. if lastchain.Tail >= number {
  537. // If the chain is down to a single beacon header, and it is re-announced
  538. // once more, ignore it instead of tearing down sync for a noop.
  539. if lastchain.Head == lastchain.Tail {
  540. if current := rawdb.ReadSkeletonHeader(s.db, number); current.Hash() == head.Hash() {
  541. return false
  542. }
  543. }
  544. // Not a noop / double head announce, abort with a reorg
  545. if force {
  546. log.Warn("Beacon chain reorged", "tail", lastchain.Tail, "head", lastchain.Head, "newHead", number)
  547. }
  548. return true
  549. }
  550. if lastchain.Head+1 < number {
  551. if force {
  552. log.Warn("Beacon chain gapped", "head", lastchain.Head, "newHead", number)
  553. }
  554. return true
  555. }
  556. if parent := rawdb.ReadSkeletonHeader(s.db, number-1); parent.Hash() != head.ParentHash {
  557. if force {
  558. log.Warn("Beacon chain forked", "ancestor", parent.Number, "hash", parent.Hash(), "want", head.ParentHash)
  559. }
  560. return true
  561. }
  562. // New header seems to be in the last subchain range. Unwind any extra headers
  563. // from the chain tip and insert the new head. We won't delete any trimmed
  564. // skeleton headers since those will be outside the index space of the many
  565. // subchains and the database space will be reclaimed eventually when processing
  566. // blocks above the current head (TODO(karalabe): don't forget).
  567. batch := s.db.NewBatch()
  568. rawdb.WriteSkeletonHeader(batch, head)
  569. lastchain.Head = number
  570. s.saveSyncStatus(batch)
  571. if err := batch.Write(); err != nil {
  572. log.Crit("Failed to write skeleton sync status", "err", err)
  573. }
  574. return false
  575. }
  576. // assignTasks attempts to match idle peers to pending header retrievals.
  577. func (s *skeleton) assignTasks(success chan *headerResponse, fail chan *headerRequest, cancel chan struct{}) {
  578. // Sort the peers by download capacity to use faster ones if many available
  579. idlers := &peerCapacitySort{
  580. peers: make([]*peerConnection, 0, len(s.idles)),
  581. caps: make([]int, 0, len(s.idles)),
  582. }
  583. targetTTL := s.peers.rates.TargetTimeout()
  584. for _, peer := range s.idles {
  585. idlers.peers = append(idlers.peers, peer)
  586. idlers.caps = append(idlers.caps, s.peers.rates.Capacity(peer.id, eth.BlockHeadersMsg, targetTTL))
  587. }
  588. if len(idlers.peers) == 0 {
  589. return
  590. }
  591. sort.Sort(idlers)
  592. // Find header regions not yet downloading and fill them
  593. for task, owner := range s.scratchOwners {
  594. // If we're out of idle peers, stop assigning tasks
  595. if len(idlers.peers) == 0 {
  596. return
  597. }
  598. // Skip any tasks already filling
  599. if owner != "" {
  600. continue
  601. }
  602. // If we've reached the genesis, stop assigning tasks
  603. if uint64(task*requestHeaders) >= s.scratchHead {
  604. return
  605. }
  606. // Found a task and have peers available, assign it
  607. idle := idlers.peers[0]
  608. idlers.peers = idlers.peers[1:]
  609. idlers.caps = idlers.caps[1:]
  610. // Matched a pending task to an idle peer, allocate a unique request id
  611. var reqid uint64
  612. for {
  613. reqid = uint64(rand.Int63())
  614. if reqid == 0 {
  615. continue
  616. }
  617. if _, ok := s.requests[reqid]; ok {
  618. continue
  619. }
  620. break
  621. }
  622. // Generate the network query and send it to the peer
  623. req := &headerRequest{
  624. peer: idle.id,
  625. id: reqid,
  626. deliver: success,
  627. revert: fail,
  628. cancel: cancel,
  629. stale: make(chan struct{}),
  630. head: s.scratchHead - uint64(task*requestHeaders),
  631. }
  632. s.requests[reqid] = req
  633. delete(s.idles, idle.id)
  634. // Generate the network query and send it to the peer
  635. go s.executeTask(idle, req)
  636. // Inject the request into the task to block further assignments
  637. s.scratchOwners[task] = idle.id
  638. }
  639. }
  640. // executeTask executes a single fetch request, blocking until either a result
  641. // arrives or a timeouts / cancellation is triggered. The method should be run
  642. // on its own goroutine and will deliver on the requested channels.
  643. func (s *skeleton) executeTask(peer *peerConnection, req *headerRequest) {
  644. start := time.Now()
  645. resCh := make(chan *eth.Response)
  646. // Figure out how many headers to fetch. Usually this will be a full batch,
  647. // but for the very tail of the chain, trim the request to the number left.
  648. // Since nodes may or may not return the genesis header for a batch request,
  649. // don't even request it. The parent hash of block #1 is enough to link.
  650. requestCount := requestHeaders
  651. if req.head < requestHeaders {
  652. requestCount = int(req.head)
  653. }
  654. peer.log.Trace("Fetching skeleton headers", "from", req.head, "count", requestCount)
  655. netreq, err := peer.peer.RequestHeadersByNumber(req.head, requestCount, 0, true, resCh)
  656. if err != nil {
  657. peer.log.Trace("Failed to request headers", "err", err)
  658. s.scheduleRevertRequest(req)
  659. return
  660. }
  661. defer netreq.Close()
  662. // Wait until the response arrives, the request is cancelled or times out
  663. ttl := s.peers.rates.TargetTimeout()
  664. timeoutTimer := time.NewTimer(ttl)
  665. defer timeoutTimer.Stop()
  666. select {
  667. case <-req.cancel:
  668. peer.log.Debug("Header request cancelled")
  669. s.scheduleRevertRequest(req)
  670. case <-timeoutTimer.C:
  671. // Header retrieval timed out, update the metrics
  672. peer.log.Warn("Header request timed out, dropping peer", "elapsed", ttl)
  673. headerTimeoutMeter.Mark(1)
  674. s.peers.rates.Update(peer.id, eth.BlockHeadersMsg, 0, 0)
  675. s.scheduleRevertRequest(req)
  676. // At this point we either need to drop the offending peer, or we need a
  677. // mechanism to allow waiting for the response and not cancel it. For now
  678. // lets go with dropping since the header sizes are deterministic and the
  679. // beacon sync runs exclusive (downloader is idle) so there should be no
  680. // other load to make timeouts probable. If we notice that timeouts happen
  681. // more often than we'd like, we can introduce a tracker for the requests
  682. // gone stale and monitor them. However, in that case too, we need a way
  683. // to protect against malicious peers never responding, so it would need
  684. // a second, hard-timeout mechanism.
  685. s.drop(peer.id)
  686. case res := <-resCh:
  687. // Headers successfully retrieved, update the metrics
  688. headers := *res.Res.(*eth.BlockHeadersPacket)
  689. headerReqTimer.Update(time.Since(start))
  690. s.peers.rates.Update(peer.id, eth.BlockHeadersMsg, res.Time, len(headers))
  691. // Cross validate the headers with the requests
  692. switch {
  693. case len(headers) == 0:
  694. // No headers were delivered, reject the response and reschedule
  695. peer.log.Debug("No headers delivered")
  696. res.Done <- errors.New("no headers delivered")
  697. s.scheduleRevertRequest(req)
  698. case headers[0].Number.Uint64() != req.head:
  699. // Header batch anchored at non-requested number
  700. peer.log.Debug("Invalid header response head", "have", headers[0].Number, "want", req.head)
  701. res.Done <- errors.New("invalid header batch anchor")
  702. s.scheduleRevertRequest(req)
  703. case req.head >= requestHeaders && len(headers) != requestHeaders:
  704. // Invalid number of non-genesis headers delivered, reject the response and reschedule
  705. peer.log.Debug("Invalid non-genesis header count", "have", len(headers), "want", requestHeaders)
  706. res.Done <- errors.New("not enough non-genesis headers delivered")
  707. s.scheduleRevertRequest(req)
  708. case req.head < requestHeaders && uint64(len(headers)) != req.head:
  709. // Invalid number of genesis headers delivered, reject the response and reschedule
  710. peer.log.Debug("Invalid genesis header count", "have", len(headers), "want", headers[0].Number.Uint64())
  711. res.Done <- errors.New("not enough genesis headers delivered")
  712. s.scheduleRevertRequest(req)
  713. default:
  714. // Packet seems structurally valid, check hash progression and if it
  715. // is correct too, deliver for storage
  716. for i := 0; i < len(headers)-1; i++ {
  717. if headers[i].ParentHash != headers[i+1].Hash() {
  718. peer.log.Debug("Invalid hash progression", "index", i, "wantparenthash", headers[i].ParentHash, "haveparenthash", headers[i+1].Hash())
  719. res.Done <- errors.New("invalid hash progression")
  720. s.scheduleRevertRequest(req)
  721. return
  722. }
  723. }
  724. // Hash chain is valid. The delivery might still be junk as we're
  725. // downloading batches concurrently (so no way to link the headers
  726. // until gaps are filled); in that case, we'll nuke the peer when
  727. // we detect the fault.
  728. res.Done <- nil
  729. select {
  730. case req.deliver <- &headerResponse{
  731. peer: peer,
  732. reqid: req.id,
  733. headers: headers,
  734. }:
  735. case <-req.cancel:
  736. }
  737. }
  738. }
  739. }
  740. // revertRequests locates all the currently pending requests from a particular
  741. // peer and reverts them, rescheduling for others to fulfill.
  742. func (s *skeleton) revertRequests(peer string) {
  743. // Gather the requests first, revertals need the lock too
  744. var requests []*headerRequest
  745. for _, req := range s.requests {
  746. if req.peer == peer {
  747. requests = append(requests, req)
  748. }
  749. }
  750. // Revert all the requests matching the peer
  751. for _, req := range requests {
  752. s.revertRequest(req)
  753. }
  754. }
  755. // scheduleRevertRequest asks the event loop to clean up a request and return
  756. // all failed retrieval tasks to the scheduler for reassignment.
  757. func (s *skeleton) scheduleRevertRequest(req *headerRequest) {
  758. select {
  759. case req.revert <- req:
  760. // Sync event loop notified
  761. case <-req.cancel:
  762. // Sync cycle got cancelled
  763. case <-req.stale:
  764. // Request already reverted
  765. }
  766. }
  767. // revertRequest cleans up a request and returns all failed retrieval tasks to
  768. // the scheduler for reassignment.
  769. //
  770. // Note, this needs to run on the event runloop thread to reschedule to idle peers.
  771. // On peer threads, use scheduleRevertRequest.
  772. func (s *skeleton) revertRequest(req *headerRequest) {
  773. log.Trace("Reverting header request", "peer", req.peer, "reqid", req.id)
  774. select {
  775. case <-req.stale:
  776. log.Trace("Header request already reverted", "peer", req.peer, "reqid", req.id)
  777. return
  778. default:
  779. }
  780. close(req.stale)
  781. // Remove the request from the tracked set
  782. delete(s.requests, req.id)
  783. // Remove the request from the tracked set and mark the task as not-pending,
  784. // ready for rescheduling
  785. s.scratchOwners[(s.scratchHead-req.head)/requestHeaders] = ""
  786. }
  787. func (s *skeleton) processResponse(res *headerResponse) (linked bool, merged bool) {
  788. res.peer.log.Trace("Processing header response", "head", res.headers[0].Number, "hash", res.headers[0].Hash(), "count", len(res.headers))
  789. // Whether the response is valid, we can mark the peer as idle and notify
  790. // the scheduler to assign a new task. If the response is invalid, we'll
  791. // drop the peer in a bit.
  792. s.idles[res.peer.id] = res.peer
  793. // Ensure the response is for a valid request
  794. if _, ok := s.requests[res.reqid]; !ok {
  795. // Some internal accounting is broken. A request either times out or it
  796. // gets fulfilled successfully. It should not be possible to deliver a
  797. // response to a non-existing request.
  798. res.peer.log.Error("Unexpected header packet")
  799. return false, false
  800. }
  801. delete(s.requests, res.reqid)
  802. // Insert the delivered headers into the scratch space independent of the
  803. // content or continuation; those will be validated in a moment
  804. head := res.headers[0].Number.Uint64()
  805. copy(s.scratchSpace[s.scratchHead-head:], res.headers)
  806. // If there's still a gap in the head of the scratch space, abort
  807. if s.scratchSpace[0] == nil {
  808. return false, false
  809. }
  810. // Try to consume any head headers, validating the boundary conditions
  811. batch := s.db.NewBatch()
  812. for s.scratchSpace[0] != nil {
  813. // Next batch of headers available, cross-reference with the subchain
  814. // we are extending and either accept or discard
  815. if s.progress.Subchains[0].Next != s.scratchSpace[0].Hash() {
  816. // Print a log messages to track what's going on
  817. tail := s.progress.Subchains[0].Tail
  818. want := s.progress.Subchains[0].Next
  819. have := s.scratchSpace[0].Hash()
  820. log.Warn("Invalid skeleton headers", "peer", s.scratchOwners[0], "number", tail-1, "want", want, "have", have)
  821. // The peer delivered junk, or at least not the subchain we are
  822. // syncing to. Free up the scratch space and assignment, reassign
  823. // and drop the original peer.
  824. for i := 0; i < requestHeaders; i++ {
  825. s.scratchSpace[i] = nil
  826. }
  827. s.drop(s.scratchOwners[0])
  828. s.scratchOwners[0] = ""
  829. break
  830. }
  831. // Scratch delivery matches required subchain, deliver the batch of
  832. // headers and push the subchain forward
  833. var consumed int
  834. for _, header := range s.scratchSpace[:requestHeaders] {
  835. if header != nil { // nil when the genesis is reached
  836. consumed++
  837. rawdb.WriteSkeletonHeader(batch, header)
  838. s.pulled++
  839. s.progress.Subchains[0].Tail--
  840. s.progress.Subchains[0].Next = header.ParentHash
  841. // If we've reached an existing block in the chain, stop retrieving
  842. // headers. Note, if we want to support light clients with the same
  843. // code we'd need to switch here based on the downloader mode. That
  844. // said, there's no such functionality for now, so don't complicate.
  845. //
  846. // In the case of full sync it would be enough to check for the body,
  847. // but even a full syncing node will generate a receipt once block
  848. // processing is done, so it's just one more "needless" check.
  849. //
  850. // The weird cascading checks are done to minimize the database reads.
  851. linked = rawdb.HasHeader(s.db, header.ParentHash, header.Number.Uint64()-1) &&
  852. rawdb.HasBody(s.db, header.ParentHash, header.Number.Uint64()-1) &&
  853. rawdb.HasReceipts(s.db, header.ParentHash, header.Number.Uint64()-1)
  854. if linked {
  855. break
  856. }
  857. }
  858. }
  859. head := s.progress.Subchains[0].Head
  860. tail := s.progress.Subchains[0].Tail
  861. next := s.progress.Subchains[0].Next
  862. log.Trace("Primary subchain extended", "head", head, "tail", tail, "next", next)
  863. // If the beacon chain was linked to the local chain, completely swap out
  864. // all internal progress and abort header synchronization.
  865. if linked {
  866. // Linking into the local chain should also mean that there are no
  867. // leftover subchains, but in the case of importing the blocks via
  868. // the engine API, we will not push the subchains forward. This will
  869. // lead to a gap between an old sync cycle and a future one.
  870. if subchains := len(s.progress.Subchains); subchains > 1 {
  871. switch {
  872. // If there are only 2 subchains - the current one and an older
  873. // one - and the old one consists of a single block, then it's
  874. // the expected new sync cycle after some propagated blocks. Log
  875. // it for debugging purposes, explicitly clean and don't escalate.
  876. case subchains == 2 && s.progress.Subchains[1].Head == s.progress.Subchains[1].Tail:
  877. log.Debug("Cleaning previous beacon sync state", "head", s.progress.Subchains[1].Head)
  878. rawdb.DeleteSkeletonHeader(batch, s.progress.Subchains[1].Head)
  879. s.progress.Subchains = s.progress.Subchains[:1]
  880. // If we have more than one header or more than one leftover chain,
  881. // the syncer's internal state is corrupted. Do try to fix it, but
  882. // be very vocal about the fault.
  883. default:
  884. var context []interface{}
  885. for i := range s.progress.Subchains[1:] {
  886. context = append(context, fmt.Sprintf("stale_head_%d", i+1))
  887. context = append(context, s.progress.Subchains[i+1].Head)
  888. context = append(context, fmt.Sprintf("stale_tail_%d", i+1))
  889. context = append(context, s.progress.Subchains[i+1].Tail)
  890. context = append(context, fmt.Sprintf("stale_next_%d", i+1))
  891. context = append(context, s.progress.Subchains[i+1].Next)
  892. }
  893. log.Error("Cleaning spurious beacon sync leftovers", context...)
  894. s.progress.Subchains = s.progress.Subchains[:1]
  895. // Note, here we didn't actually delete the headers at all,
  896. // just the metadata. We could implement a cleanup mechanism,
  897. // but further modifying corrupted state is kind of asking
  898. // for it. Unless there's a good enough reason to risk it,
  899. // better to live with the small database junk.
  900. }
  901. }
  902. break
  903. }
  904. // Batch of headers consumed, shift the download window forward
  905. copy(s.scratchSpace, s.scratchSpace[requestHeaders:])
  906. for i := 0; i < requestHeaders; i++ {
  907. s.scratchSpace[scratchHeaders-i-1] = nil
  908. }
  909. copy(s.scratchOwners, s.scratchOwners[1:])
  910. s.scratchOwners[scratchHeaders/requestHeaders-1] = ""
  911. s.scratchHead -= uint64(consumed)
  912. // If the subchain extended into the next subchain, we need to handle
  913. // the overlap. Since there could be many overlaps (come on), do this
  914. // in a loop.
  915. for len(s.progress.Subchains) > 1 && s.progress.Subchains[1].Head >= s.progress.Subchains[0].Tail {
  916. // Extract some stats from the second subchain
  917. head := s.progress.Subchains[1].Head
  918. tail := s.progress.Subchains[1].Tail
  919. next := s.progress.Subchains[1].Next
  920. // Since we just overwrote part of the next subchain, we need to trim
  921. // its head independent of matching or mismatching content
  922. if s.progress.Subchains[1].Tail >= s.progress.Subchains[0].Tail {
  923. // Fully overwritten, get rid of the subchain as a whole
  924. log.Debug("Previous subchain fully overwritten", "head", head, "tail", tail, "next", next)
  925. s.progress.Subchains = append(s.progress.Subchains[:1], s.progress.Subchains[2:]...)
  926. continue
  927. } else {
  928. // Partially overwritten, trim the head to the overwritten size
  929. log.Debug("Previous subchain partially overwritten", "head", head, "tail", tail, "next", next)
  930. s.progress.Subchains[1].Head = s.progress.Subchains[0].Tail - 1
  931. }
  932. // If the old subchain is an extension of the new one, merge the two
  933. // and let the skeleton syncer restart (to clean internal state)
  934. if rawdb.ReadSkeletonHeader(s.db, s.progress.Subchains[1].Head).Hash() == s.progress.Subchains[0].Next {
  935. log.Debug("Previous subchain merged", "head", head, "tail", tail, "next", next)
  936. s.progress.Subchains[0].Tail = s.progress.Subchains[1].Tail
  937. s.progress.Subchains[0].Next = s.progress.Subchains[1].Next
  938. s.progress.Subchains = append(s.progress.Subchains[:1], s.progress.Subchains[2:]...)
  939. merged = true
  940. }
  941. }
  942. // If subchains were merged, all further available headers in the scratch
  943. // space are invalid since we skipped ahead. Stop processing the scratch
  944. // space to avoid dropping peers thinking they delivered invalid data.
  945. if merged {
  946. break
  947. }
  948. }
  949. s.saveSyncStatus(batch)
  950. if err := batch.Write(); err != nil {
  951. log.Crit("Failed to write skeleton headers and progress", "err", err)
  952. }
  953. // Print a progress report making the UX a bit nicer
  954. left := s.progress.Subchains[0].Tail - 1
  955. if linked {
  956. left = 0
  957. }
  958. if time.Since(s.logged) > 8*time.Second || left == 0 {
  959. s.logged = time.Now()
  960. if s.pulled == 0 {
  961. log.Info("Beacon sync starting", "left", left)
  962. } else {
  963. eta := float64(time.Since(s.started)) / float64(s.pulled) * float64(left)
  964. log.Info("Syncing beacon headers", "downloaded", s.pulled, "left", left, "eta", common.PrettyDuration(eta))
  965. }
  966. }
  967. return linked, merged
  968. }
  969. // cleanStales removes previously synced beacon headers that have become stale
  970. // due to the downloader backfilling past the tracked tail.
  971. func (s *skeleton) cleanStales(filled *types.Header) error {
  972. number := filled.Number.Uint64()
  973. log.Trace("Cleaning stale beacon headers", "filled", number, "hash", filled.Hash())
  974. // If the filled header is below the linked subchain, something's
  975. // corrupted internally. Report and error and refuse to do anything.
  976. if number < s.progress.Subchains[0].Tail {
  977. return fmt.Errorf("filled header below beacon header tail: %d < %d", number, s.progress.Subchains[0].Tail)
  978. }
  979. // Subchain seems trimmable, push the tail forward up to the last
  980. // filled header and delete everything before it - if available. In
  981. // case we filled past the head, recreate the subchain with a new
  982. // head to keep it consistent with the data on disk.
  983. var (
  984. start = s.progress.Subchains[0].Tail // start deleting from the first known header
  985. end = number // delete until the requested threshold
  986. )
  987. s.progress.Subchains[0].Tail = number
  988. s.progress.Subchains[0].Next = filled.ParentHash
  989. if s.progress.Subchains[0].Head < number {
  990. // If more headers were filled than available, push the entire
  991. // subchain forward to keep tracking the node's block imports
  992. end = s.progress.Subchains[0].Head + 1 // delete the entire original range, including the head
  993. s.progress.Subchains[0].Head = number // assign a new head (tail is already assigned to this)
  994. }
  995. // Execute the trimming and the potential rewiring of the progress
  996. batch := s.db.NewBatch()
  997. if end != number {
  998. // The entire original skeleton chain was deleted and a new one
  999. // defined. Make sure the new single-header chain gets pushed to
  1000. // disk to keep internal state consistent.
  1001. rawdb.WriteSkeletonHeader(batch, filled)
  1002. }
  1003. s.saveSyncStatus(batch)
  1004. for n := start; n < end; n++ {
  1005. // If the batch grew too big, flush it and continue with a new batch.
  1006. // The catch is that the sync metadata needs to reflect the actually
  1007. // flushed state, so temporarily change the subchain progress and
  1008. // revert after the flush.
  1009. if batch.ValueSize() >= ethdb.IdealBatchSize {
  1010. tmpTail := s.progress.Subchains[0].Tail
  1011. tmpNext := s.progress.Subchains[0].Next
  1012. s.progress.Subchains[0].Tail = n
  1013. s.progress.Subchains[0].Next = rawdb.ReadSkeletonHeader(s.db, n).ParentHash
  1014. s.saveSyncStatus(batch)
  1015. if err := batch.Write(); err != nil {
  1016. log.Crit("Failed to write beacon trim data", "err", err)
  1017. }
  1018. batch.Reset()
  1019. s.progress.Subchains[0].Tail = tmpTail
  1020. s.progress.Subchains[0].Next = tmpNext
  1021. s.saveSyncStatus(batch)
  1022. }
  1023. rawdb.DeleteSkeletonHeader(batch, n)
  1024. }
  1025. if err := batch.Write(); err != nil {
  1026. log.Crit("Failed to write beacon trim data", "err", err)
  1027. }
  1028. return nil
  1029. }
  1030. // Bounds retrieves the current head and tail tracked by the skeleton syncer.
  1031. // This method is used by the backfiller, whose life cycle is controlled by the
  1032. // skeleton syncer.
  1033. //
  1034. // Note, the method will not use the internal state of the skeleton, but will
  1035. // rather blindly pull stuff from the database. This is fine, because the back-
  1036. // filler will only run when the skeleton chain is fully downloaded and stable.
  1037. // There might be new heads appended, but those are atomic from the perspective
  1038. // of this method. Any head reorg will first tear down the backfiller and only
  1039. // then make the modification.
  1040. func (s *skeleton) Bounds() (head *types.Header, tail *types.Header, err error) {
  1041. // Read the current sync progress from disk and figure out the current head.
  1042. // Although there's a lot of error handling here, these are mostly as sanity
  1043. // checks to avoid crashing if a programming error happens. These should not
  1044. // happen in live code.
  1045. status := rawdb.ReadSkeletonSyncStatus(s.db)
  1046. if len(status) == 0 {
  1047. return nil, nil, errors.New("beacon sync not yet started")
  1048. }
  1049. progress := new(skeletonProgress)
  1050. if err := json.Unmarshal(status, progress); err != nil {
  1051. return nil, nil, err
  1052. }
  1053. head = rawdb.ReadSkeletonHeader(s.db, progress.Subchains[0].Head)
  1054. tail = rawdb.ReadSkeletonHeader(s.db, progress.Subchains[0].Tail)
  1055. return head, tail, nil
  1056. }
  1057. // Header retrieves a specific header tracked by the skeleton syncer. This method
  1058. // is meant to be used by the backfiller, whose life cycle is controlled by the
  1059. // skeleton syncer.
  1060. //
  1061. // Note, outside the permitted runtimes, this method might return nil results and
  1062. // subsequent calls might return headers from different chains.
  1063. func (s *skeleton) Header(number uint64) *types.Header {
  1064. return rawdb.ReadSkeletonHeader(s.db, number)
  1065. }