beaconsync.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351
  1. // Copyright 2022 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package downloader
  17. import (
  18. "fmt"
  19. "sync"
  20. "sync/atomic"
  21. "time"
  22. "github.com/ethereum/go-ethereum/common"
  23. "github.com/ethereum/go-ethereum/core/types"
  24. "github.com/ethereum/go-ethereum/log"
  25. )
  26. // beaconBackfiller is the chain and state backfilling that can be commenced once
  27. // the skeleton syncer has successfully reverse downloaded all the headers up to
  28. // the genesis block or an existing header in the database. Its operation is fully
  29. // directed by the skeleton sync's head/tail events.
  30. type beaconBackfiller struct {
  31. downloader *Downloader // Downloader to direct via this callback implementation
  32. syncMode SyncMode // Sync mode to use for backfilling the skeleton chains
  33. success func() // Callback to run on successful sync cycle completion
  34. filling bool // Flag whether the downloader is backfilling or not
  35. filled *types.Header // Last header filled by the last terminated sync loop
  36. started chan struct{} // Notification channel whether the downloader inited
  37. lock sync.Mutex // Mutex protecting the sync lock
  38. }
  39. // newBeaconBackfiller is a helper method to create the backfiller.
  40. func newBeaconBackfiller(dl *Downloader, success func()) backfiller {
  41. return &beaconBackfiller{
  42. downloader: dl,
  43. success: success,
  44. }
  45. }
  46. // suspend cancels any background downloader threads and returns the last header
  47. // that has been successfully backfilled.
  48. func (b *beaconBackfiller) suspend() *types.Header {
  49. // If no filling is running, don't waste cycles
  50. b.lock.Lock()
  51. filling := b.filling
  52. filled := b.filled
  53. started := b.started
  54. b.lock.Unlock()
  55. if !filling {
  56. return filled // Return the filled header on the previous sync completion
  57. }
  58. // A previous filling should be running, though it may happen that it hasn't
  59. // yet started (being done on a new goroutine). Many concurrent beacon head
  60. // announcements can lead to sync start/stop thrashing. In that case we need
  61. // to wait for initialization before we can safely cancel it. It is safe to
  62. // read this channel multiple times, it gets closed on startup.
  63. <-started
  64. // Now that we're sure the downloader successfully started up, we can cancel
  65. // it safely without running the risk of data races.
  66. b.downloader.Cancel()
  67. // Sync cycle was just terminated, retrieve and return the last filled header.
  68. // Can't use `filled` as that contains a stale value from before cancellation.
  69. return b.downloader.blockchain.CurrentFastBlock().Header()
  70. }
  71. // resume starts the downloader threads for backfilling state and chain data.
  72. func (b *beaconBackfiller) resume() {
  73. b.lock.Lock()
  74. if b.filling {
  75. // If a previous filling cycle is still running, just ignore this start
  76. // request. // TODO(karalabe): We should make this channel driven
  77. b.lock.Unlock()
  78. return
  79. }
  80. b.filling = true
  81. b.filled = nil
  82. b.started = make(chan struct{})
  83. mode := b.syncMode
  84. b.lock.Unlock()
  85. // Start the backfilling on its own thread since the downloader does not have
  86. // its own lifecycle runloop.
  87. go func() {
  88. // Set the backfiller to non-filling when download completes
  89. defer func() {
  90. b.lock.Lock()
  91. b.filling = false
  92. b.filled = b.downloader.blockchain.CurrentFastBlock().Header()
  93. b.lock.Unlock()
  94. }()
  95. // If the downloader fails, report an error as in beacon chain mode there
  96. // should be no errors as long as the chain we're syncing to is valid.
  97. if err := b.downloader.synchronise("", common.Hash{}, nil, nil, mode, true, b.started); err != nil {
  98. log.Error("Beacon backfilling failed", "err", err)
  99. return
  100. }
  101. // Synchronization succeeded. Since this happens async, notify the outer
  102. // context to disable snap syncing and enable transaction propagation.
  103. if b.success != nil {
  104. b.success()
  105. }
  106. }()
  107. }
  108. // setMode updates the sync mode from the current one to the requested one. If
  109. // there's an active sync in progress, it will be cancelled and restarted.
  110. func (b *beaconBackfiller) setMode(mode SyncMode) {
  111. // Update the old sync mode and track if it was changed
  112. b.lock.Lock()
  113. updated := b.syncMode != mode
  114. filling := b.filling
  115. b.syncMode = mode
  116. b.lock.Unlock()
  117. // If the sync mode was changed mid-sync, restart. This should never ever
  118. // really happen, we just handle it to detect programming errors.
  119. if !updated || !filling {
  120. return
  121. }
  122. log.Error("Downloader sync mode changed mid-run", "old", mode.String(), "new", mode.String())
  123. b.suspend()
  124. b.resume()
  125. }
  126. // SetBadBlockCallback sets the callback to run when a bad block is hit by the
  127. // block processor. This method is not thread safe and should be set only once
  128. // on startup before system events are fired.
  129. func (d *Downloader) SetBadBlockCallback(onBadBlock badBlockFn) {
  130. d.badBlock = onBadBlock
  131. }
  132. // BeaconSync is the post-merge version of the chain synchronization, where the
  133. // chain is not downloaded from genesis onward, rather from trusted head announces
  134. // backwards.
  135. //
  136. // Internally backfilling and state sync is done the same way, but the header
  137. // retrieval and scheduling is replaced.
  138. func (d *Downloader) BeaconSync(mode SyncMode, head *types.Header) error {
  139. return d.beaconSync(mode, head, true)
  140. }
  141. // BeaconExtend is an optimistic version of BeaconSync, where an attempt is made
  142. // to extend the current beacon chain with a new header, but in case of a mismatch,
  143. // the old sync will not be terminated and reorged, rather the new head is dropped.
  144. //
  145. // This is useful if a beacon client is feeding us large chunks of payloads to run,
  146. // but is not setting the head after each.
  147. func (d *Downloader) BeaconExtend(mode SyncMode, head *types.Header) error {
  148. return d.beaconSync(mode, head, false)
  149. }
  150. // beaconSync is the post-merge version of the chain synchronization, where the
  151. // chain is not downloaded from genesis onward, rather from trusted head announces
  152. // backwards.
  153. //
  154. // Internally backfilling and state sync is done the same way, but the header
  155. // retrieval and scheduling is replaced.
  156. func (d *Downloader) beaconSync(mode SyncMode, head *types.Header, force bool) error {
  157. // When the downloader starts a sync cycle, it needs to be aware of the sync
  158. // mode to use (full, snap). To keep the skeleton chain oblivious, inject the
  159. // mode into the backfiller directly.
  160. //
  161. // Super crazy dangerous type cast. Should be fine (TM), we're only using a
  162. // different backfiller implementation for skeleton tests.
  163. d.skeleton.filler.(*beaconBackfiller).setMode(mode)
  164. // Signal the skeleton sync to switch to a new head, however it wants
  165. if err := d.skeleton.Sync(head, force); err != nil {
  166. return err
  167. }
  168. return nil
  169. }
  170. // findBeaconAncestor tries to locate the common ancestor link of the local chain
  171. // and the beacon chain just requested. In the general case when our node was in
  172. // sync and on the correct chain, checking the top N links should already get us
  173. // a match. In the rare scenario when we ended up on a long reorganisation (i.e.
  174. // none of the head links match), we do a binary search to find the ancestor.
  175. func (d *Downloader) findBeaconAncestor() (uint64, error) {
  176. // Figure out the current local head position
  177. var chainHead *types.Header
  178. switch d.getMode() {
  179. case FullSync:
  180. chainHead = d.blockchain.CurrentBlock().Header()
  181. case SnapSync:
  182. chainHead = d.blockchain.CurrentFastBlock().Header()
  183. default:
  184. chainHead = d.lightchain.CurrentHeader()
  185. }
  186. number := chainHead.Number.Uint64()
  187. // Retrieve the skeleton bounds and ensure they are linked to the local chain
  188. beaconHead, beaconTail, err := d.skeleton.Bounds()
  189. if err != nil {
  190. // This is a programming error. The chain backfiller was called with an
  191. // invalid beacon sync state. Ideally we would panic here, but erroring
  192. // gives us at least a remote chance to recover. It's still a big fault!
  193. log.Error("Failed to retrieve beacon bounds", "err", err)
  194. return 0, err
  195. }
  196. var linked bool
  197. switch d.getMode() {
  198. case FullSync:
  199. linked = d.blockchain.HasBlock(beaconTail.ParentHash, beaconTail.Number.Uint64()-1)
  200. case SnapSync:
  201. linked = d.blockchain.HasFastBlock(beaconTail.ParentHash, beaconTail.Number.Uint64()-1)
  202. default:
  203. linked = d.blockchain.HasHeader(beaconTail.ParentHash, beaconTail.Number.Uint64()-1)
  204. }
  205. if !linked {
  206. // This is a programming error. The chain backfiller was called with a
  207. // tail that's not linked to the local chain. Whilst this should never
  208. // happen, there might be some weirdnesses if beacon sync backfilling
  209. // races with the user (or beacon client) calling setHead. Whilst panic
  210. // would be the ideal thing to do, it is safer long term to attempt a
  211. // recovery and fix any noticed issue after the fact.
  212. log.Error("Beacon sync linkup unavailable", "number", beaconTail.Number.Uint64()-1, "hash", beaconTail.ParentHash)
  213. return 0, fmt.Errorf("beacon linkup unavailable locally: %d [%x]", beaconTail.Number.Uint64()-1, beaconTail.ParentHash)
  214. }
  215. // Binary search to find the ancestor
  216. start, end := beaconTail.Number.Uint64()-1, number
  217. if number := beaconHead.Number.Uint64(); end > number {
  218. // This shouldn't really happen in a healthy network, but if the consensus
  219. // clients feeds us a shorter chain as the canonical, we should not attempt
  220. // to access non-existent skeleton items.
  221. log.Warn("Beacon head lower than local chain", "beacon", number, "local", end)
  222. end = number
  223. }
  224. for start+1 < end {
  225. // Split our chain interval in two, and request the hash to cross check
  226. check := (start + end) / 2
  227. h := d.skeleton.Header(check)
  228. n := h.Number.Uint64()
  229. var known bool
  230. switch d.getMode() {
  231. case FullSync:
  232. known = d.blockchain.HasBlock(h.Hash(), n)
  233. case SnapSync:
  234. known = d.blockchain.HasFastBlock(h.Hash(), n)
  235. default:
  236. known = d.lightchain.HasHeader(h.Hash(), n)
  237. }
  238. if !known {
  239. end = check
  240. continue
  241. }
  242. start = check
  243. }
  244. return start, nil
  245. }
  246. // fetchBeaconHeaders feeds skeleton headers to the downloader queue for scheduling
  247. // until sync errors or is finished.
  248. func (d *Downloader) fetchBeaconHeaders(from uint64) error {
  249. head, tail, err := d.skeleton.Bounds()
  250. if err != nil {
  251. return err
  252. }
  253. // A part of headers are not in the skeleton space, try to resolve
  254. // them from the local chain. Note the range should be very short
  255. // and it should only happen when there are less than 64 post-merge
  256. // blocks in the network.
  257. var localHeaders []*types.Header
  258. if from < tail.Number.Uint64() {
  259. count := tail.Number.Uint64() - from
  260. if count > uint64(fsMinFullBlocks) {
  261. return fmt.Errorf("invalid origin (%d) of beacon sync (%d)", from, tail.Number)
  262. }
  263. localHeaders = d.readHeaderRange(tail, int(count))
  264. log.Warn("Retrieved beacon headers from local", "from", from, "count", count)
  265. }
  266. for {
  267. // Retrieve a batch of headers and feed it to the header processor
  268. var (
  269. headers = make([]*types.Header, 0, maxHeadersProcess)
  270. hashes = make([]common.Hash, 0, maxHeadersProcess)
  271. )
  272. for i := 0; i < maxHeadersProcess && from <= head.Number.Uint64(); i++ {
  273. header := d.skeleton.Header(from)
  274. // The header is not found in skeleton space, try to find it in local chain.
  275. if header == nil && from < tail.Number.Uint64() {
  276. dist := tail.Number.Uint64() - from
  277. if len(localHeaders) >= int(dist) {
  278. header = localHeaders[dist-1]
  279. }
  280. }
  281. // The header is still missing, the beacon sync is corrupted and bail out
  282. // the error here.
  283. if header == nil {
  284. return fmt.Errorf("missing beacon header %d", from)
  285. }
  286. headers = append(headers, header)
  287. hashes = append(hashes, headers[i].Hash())
  288. from++
  289. }
  290. if len(headers) > 0 {
  291. log.Trace("Scheduling new beacon headers", "count", len(headers), "from", from-uint64(len(headers)))
  292. select {
  293. case d.headerProcCh <- &headerTask{
  294. headers: headers,
  295. hashes: hashes,
  296. }:
  297. case <-d.cancelCh:
  298. return errCanceled
  299. }
  300. }
  301. // If we still have headers to import, loop and keep pushing them
  302. if from <= head.Number.Uint64() {
  303. continue
  304. }
  305. // If the pivot block is committed, signal header sync termination
  306. if atomic.LoadInt32(&d.committed) == 1 {
  307. select {
  308. case d.headerProcCh <- nil:
  309. return nil
  310. case <-d.cancelCh:
  311. return errCanceled
  312. }
  313. }
  314. // State sync still going, wait a bit for new headers and retry
  315. log.Trace("Pivot not yet committed, waiting...")
  316. select {
  317. case <-time.After(fsHeaderContCheck):
  318. case <-d.cancelCh:
  319. return errCanceled
  320. }
  321. head, _, err = d.skeleton.Bounds()
  322. if err != nil {
  323. return err
  324. }
  325. }
  326. }