generate.go 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747
  1. // Copyright 2019 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package snapshot
  17. import (
  18. "bytes"
  19. "errors"
  20. "fmt"
  21. "math/big"
  22. "time"
  23. "github.com/VictoriaMetrics/fastcache"
  24. "github.com/ethereum/go-ethereum/common"
  25. "github.com/ethereum/go-ethereum/common/hexutil"
  26. "github.com/ethereum/go-ethereum/core/rawdb"
  27. "github.com/ethereum/go-ethereum/crypto"
  28. "github.com/ethereum/go-ethereum/ethdb"
  29. "github.com/ethereum/go-ethereum/ethdb/memorydb"
  30. "github.com/ethereum/go-ethereum/log"
  31. "github.com/ethereum/go-ethereum/rlp"
  32. "github.com/ethereum/go-ethereum/trie"
  33. )
  34. var (
  35. // emptyRoot is the known root hash of an empty trie.
  36. emptyRoot = common.HexToHash("56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421")
  37. // emptyCode is the known hash of the empty EVM bytecode.
  38. emptyCode = crypto.Keccak256Hash(nil)
  39. // accountCheckRange is the upper limit of the number of accounts involved in
  40. // each range check. This is a value estimated based on experience. If this
  41. // range is too large, the failure rate of range proof will increase. Otherwise,
  42. // if the range is too small, the efficiency of the state recovery will decrease.
  43. accountCheckRange = 128
  44. // storageCheckRange is the upper limit of the number of storage slots involved
  45. // in each range check. This is a value estimated based on experience. If this
  46. // range is too large, the failure rate of range proof will increase. Otherwise,
  47. // if the range is too small, the efficiency of the state recovery will decrease.
  48. storageCheckRange = 1024
  49. // errMissingTrie is returned if the target trie is missing while the generation
  50. // is running. In this case the generation is aborted and wait the new signal.
  51. errMissingTrie = errors.New("missing trie")
  52. )
  53. // generateSnapshot regenerates a brand new snapshot based on an existing state
  54. // database and head block asynchronously. The snapshot is returned immediately
  55. // and generation is continued in the background until done.
  56. func generateSnapshot(diskdb ethdb.KeyValueStore, triedb *trie.Database, cache int, root common.Hash) *diskLayer {
  57. // Create a new disk layer with an initialized state marker at zero
  58. var (
  59. stats = &generatorStats{start: time.Now()}
  60. batch = diskdb.NewBatch()
  61. genMarker = []byte{} // Initialized but empty!
  62. )
  63. rawdb.WriteSnapshotRoot(batch, root)
  64. journalProgress(batch, genMarker, stats)
  65. if err := batch.Write(); err != nil {
  66. log.Crit("Failed to write initialized state marker", "err", err)
  67. }
  68. base := &diskLayer{
  69. diskdb: diskdb,
  70. triedb: triedb,
  71. root: root,
  72. cache: fastcache.New(cache * 1024 * 1024),
  73. genMarker: genMarker,
  74. genPending: make(chan struct{}),
  75. genAbort: make(chan chan *generatorStats),
  76. }
  77. go base.generate(stats)
  78. log.Debug("Start snapshot generation", "root", root)
  79. return base
  80. }
  81. // journalProgress persists the generator stats into the database to resume later.
  82. func journalProgress(db ethdb.KeyValueWriter, marker []byte, stats *generatorStats) {
  83. // Write out the generator marker. Note it's a standalone disk layer generator
  84. // which is not mixed with journal. It's ok if the generator is persisted while
  85. // journal is not.
  86. entry := journalGenerator{
  87. Done: marker == nil,
  88. Marker: marker,
  89. }
  90. if stats != nil {
  91. entry.Accounts = stats.accounts
  92. entry.Slots = stats.slots
  93. entry.Storage = uint64(stats.storage)
  94. }
  95. blob, err := rlp.EncodeToBytes(entry)
  96. if err != nil {
  97. panic(err) // Cannot happen, here to catch dev errors
  98. }
  99. var logstr string
  100. switch {
  101. case marker == nil:
  102. logstr = "done"
  103. case bytes.Equal(marker, []byte{}):
  104. logstr = "empty"
  105. case len(marker) == common.HashLength:
  106. logstr = fmt.Sprintf("%#x", marker)
  107. default:
  108. logstr = fmt.Sprintf("%#x:%#x", marker[:common.HashLength], marker[common.HashLength:])
  109. }
  110. log.Debug("Journalled generator progress", "progress", logstr)
  111. rawdb.WriteSnapshotGenerator(db, blob)
  112. }
  113. // proofResult contains the output of range proving which can be used
  114. // for further processing regardless if it is successful or not.
  115. type proofResult struct {
  116. keys [][]byte // The key set of all elements being iterated, even proving is failed
  117. vals [][]byte // The val set of all elements being iterated, even proving is failed
  118. diskMore bool // Set when the database has extra snapshot states since last iteration
  119. trieMore bool // Set when the trie has extra snapshot states(only meaningful for successful proving)
  120. proofErr error // Indicator whether the given state range is valid or not
  121. tr *trie.Trie // The trie, in case the trie was resolved by the prover (may be nil)
  122. }
  123. // valid returns the indicator that range proof is successful or not.
  124. func (result *proofResult) valid() bool {
  125. return result.proofErr == nil
  126. }
  127. // last returns the last verified element key regardless of whether the range proof is
  128. // successful or not. Nil is returned if nothing involved in the proving.
  129. func (result *proofResult) last() []byte {
  130. var last []byte
  131. if len(result.keys) > 0 {
  132. last = result.keys[len(result.keys)-1]
  133. }
  134. return last
  135. }
  136. // forEach iterates all the visited elements and applies the given callback on them.
  137. // The iteration is aborted if the callback returns non-nil error.
  138. func (result *proofResult) forEach(callback func(key []byte, val []byte) error) error {
  139. for i := 0; i < len(result.keys); i++ {
  140. key, val := result.keys[i], result.vals[i]
  141. if err := callback(key, val); err != nil {
  142. return err
  143. }
  144. }
  145. return nil
  146. }
  147. // proveRange proves the snapshot segment with particular prefix is "valid".
  148. // The iteration start point will be assigned if the iterator is restored from
  149. // the last interruption. Max will be assigned in order to limit the maximum
  150. // amount of data involved in each iteration.
  151. //
  152. // The proof result will be returned if the range proving is finished, otherwise
  153. // the error will be returned to abort the entire procedure.
  154. func (dl *diskLayer) proveRange(ctx *generatorContext, owner common.Hash, root common.Hash, prefix []byte, kind string, origin []byte, max int, valueConvertFn func([]byte) ([]byte, error)) (*proofResult, error) {
  155. var (
  156. keys [][]byte
  157. vals [][]byte
  158. proof = rawdb.NewMemoryDatabase()
  159. diskMore = false
  160. iter = ctx.iterator(kind)
  161. start = time.Now()
  162. min = append(prefix, origin...)
  163. )
  164. for iter.Next() {
  165. // Ensure the iterated item is always equal or larger than the given origin.
  166. key := iter.Key()
  167. if bytes.Compare(key, min) < 0 {
  168. return nil, errors.New("invalid iteration position")
  169. }
  170. // Ensure the iterated item still fall in the specified prefix. If
  171. // not which means the items in the specified area are all visited.
  172. // Move the iterator a step back since we iterate one extra element
  173. // out.
  174. if !bytes.Equal(key[:len(prefix)], prefix) {
  175. iter.Hold()
  176. break
  177. }
  178. // Break if we've reached the max size, and signal that we're not
  179. // done yet. Move the iterator a step back since we iterate one
  180. // extra element out.
  181. if len(keys) == max {
  182. iter.Hold()
  183. diskMore = true
  184. break
  185. }
  186. keys = append(keys, common.CopyBytes(key[len(prefix):]))
  187. if valueConvertFn == nil {
  188. vals = append(vals, common.CopyBytes(iter.Value()))
  189. } else {
  190. val, err := valueConvertFn(iter.Value())
  191. if err != nil {
  192. // Special case, the state data is corrupted (invalid slim-format account),
  193. // don't abort the entire procedure directly. Instead, let the fallback
  194. // generation to heal the invalid data.
  195. //
  196. // Here append the original value to ensure that the number of key and
  197. // value are aligned.
  198. vals = append(vals, common.CopyBytes(iter.Value()))
  199. log.Error("Failed to convert account state data", "err", err)
  200. } else {
  201. vals = append(vals, val)
  202. }
  203. }
  204. }
  205. // Update metrics for database iteration and merkle proving
  206. if kind == snapStorage {
  207. snapStorageSnapReadCounter.Inc(time.Since(start).Nanoseconds())
  208. } else {
  209. snapAccountSnapReadCounter.Inc(time.Since(start).Nanoseconds())
  210. }
  211. defer func(start time.Time) {
  212. if kind == snapStorage {
  213. snapStorageProveCounter.Inc(time.Since(start).Nanoseconds())
  214. } else {
  215. snapAccountProveCounter.Inc(time.Since(start).Nanoseconds())
  216. }
  217. }(time.Now())
  218. // The snap state is exhausted, pass the entire key/val set for verification
  219. if origin == nil && !diskMore {
  220. stackTr := trie.NewStackTrieWithOwner(nil, owner)
  221. for i, key := range keys {
  222. stackTr.TryUpdate(key, vals[i])
  223. }
  224. if gotRoot := stackTr.Hash(); gotRoot != root {
  225. return &proofResult{
  226. keys: keys,
  227. vals: vals,
  228. proofErr: fmt.Errorf("wrong root: have %#x want %#x", gotRoot, root),
  229. }, nil
  230. }
  231. return &proofResult{keys: keys, vals: vals}, nil
  232. }
  233. // Snap state is chunked, generate edge proofs for verification.
  234. tr, err := trie.New(owner, root, dl.triedb)
  235. if err != nil {
  236. ctx.stats.Log("Trie missing, state snapshotting paused", dl.root, dl.genMarker)
  237. return nil, errMissingTrie
  238. }
  239. // Firstly find out the key of last iterated element.
  240. var last []byte
  241. if len(keys) > 0 {
  242. last = keys[len(keys)-1]
  243. }
  244. // Generate the Merkle proofs for the first and last element
  245. if origin == nil {
  246. origin = common.Hash{}.Bytes()
  247. }
  248. if err := tr.Prove(origin, 0, proof); err != nil {
  249. log.Debug("Failed to prove range", "kind", kind, "origin", origin, "err", err)
  250. return &proofResult{
  251. keys: keys,
  252. vals: vals,
  253. diskMore: diskMore,
  254. proofErr: err,
  255. tr: tr,
  256. }, nil
  257. }
  258. if last != nil {
  259. if err := tr.Prove(last, 0, proof); err != nil {
  260. log.Debug("Failed to prove range", "kind", kind, "last", last, "err", err)
  261. return &proofResult{
  262. keys: keys,
  263. vals: vals,
  264. diskMore: diskMore,
  265. proofErr: err,
  266. tr: tr,
  267. }, nil
  268. }
  269. }
  270. // Verify the snapshot segment with range prover, ensure that all flat states
  271. // in this range correspond to merkle trie.
  272. cont, err := trie.VerifyRangeProof(root, origin, last, keys, vals, proof)
  273. return &proofResult{
  274. keys: keys,
  275. vals: vals,
  276. diskMore: diskMore,
  277. trieMore: cont,
  278. proofErr: err,
  279. tr: tr},
  280. nil
  281. }
  282. // onStateCallback is a function that is called by generateRange, when processing a range of
  283. // accounts or storage slots. For each element, the callback is invoked.
  284. //
  285. // - If 'delete' is true, then this element (and potential slots) needs to be deleted from the snapshot.
  286. // - If 'write' is true, then this element needs to be updated with the 'val'.
  287. // - If 'write' is false, then this element is already correct, and needs no update.
  288. // The 'val' is the canonical encoding of the value (not the slim format for accounts)
  289. //
  290. // However, for accounts, the storage trie of the account needs to be checked. Also,
  291. // dangling storages(storage exists but the corresponding account is missing) need to
  292. // be cleaned up.
  293. type onStateCallback func(key []byte, val []byte, write bool, delete bool) error
  294. // generateRange generates the state segment with particular prefix. Generation can
  295. // either verify the correctness of existing state through range-proof and skip
  296. // generation, or iterate trie to regenerate state on demand.
  297. func (dl *diskLayer) generateRange(ctx *generatorContext, owner common.Hash, root common.Hash, prefix []byte, kind string, origin []byte, max int, onState onStateCallback, valueConvertFn func([]byte) ([]byte, error)) (bool, []byte, error) {
  298. // Use range prover to check the validity of the flat state in the range
  299. result, err := dl.proveRange(ctx, owner, root, prefix, kind, origin, max, valueConvertFn)
  300. if err != nil {
  301. return false, nil, err
  302. }
  303. last := result.last()
  304. // Construct contextual logger
  305. logCtx := []interface{}{"kind", kind, "prefix", hexutil.Encode(prefix)}
  306. if len(origin) > 0 {
  307. logCtx = append(logCtx, "origin", hexutil.Encode(origin))
  308. }
  309. logger := log.New(logCtx...)
  310. // The range prover says the range is correct, skip trie iteration
  311. if result.valid() {
  312. snapSuccessfulRangeProofMeter.Mark(1)
  313. logger.Trace("Proved state range", "last", hexutil.Encode(last))
  314. // The verification is passed, process each state with the given
  315. // callback function. If this state represents a contract, the
  316. // corresponding storage check will be performed in the callback
  317. if err := result.forEach(func(key []byte, val []byte) error { return onState(key, val, false, false) }); err != nil {
  318. return false, nil, err
  319. }
  320. // Only abort the iteration when both database and trie are exhausted
  321. return !result.diskMore && !result.trieMore, last, nil
  322. }
  323. logger.Trace("Detected outdated state range", "last", hexutil.Encode(last), "err", result.proofErr)
  324. snapFailedRangeProofMeter.Mark(1)
  325. // Special case, the entire trie is missing. In the original trie scheme,
  326. // all the duplicated subtries will be filtered out (only one copy of data
  327. // will be stored). While in the snapshot model, all the storage tries
  328. // belong to different contracts will be kept even they are duplicated.
  329. // Track it to a certain extent remove the noise data used for statistics.
  330. if origin == nil && last == nil {
  331. meter := snapMissallAccountMeter
  332. if kind == snapStorage {
  333. meter = snapMissallStorageMeter
  334. }
  335. meter.Mark(1)
  336. }
  337. // We use the snap data to build up a cache which can be used by the
  338. // main account trie as a primary lookup when resolving hashes
  339. var snapNodeCache ethdb.KeyValueStore
  340. if len(result.keys) > 0 {
  341. snapNodeCache = memorydb.New()
  342. snapTrieDb := trie.NewDatabase(snapNodeCache)
  343. snapTrie, _ := trie.New(owner, common.Hash{}, snapTrieDb)
  344. for i, key := range result.keys {
  345. snapTrie.Update(key, result.vals[i])
  346. }
  347. root, nodes, _ := snapTrie.Commit(false)
  348. if nodes != nil {
  349. snapTrieDb.Update(trie.NewWithNodeSet(nodes))
  350. }
  351. snapTrieDb.Commit(root, false, nil)
  352. }
  353. // Construct the trie for state iteration, reuse the trie
  354. // if it's already opened with some nodes resolved.
  355. tr := result.tr
  356. if tr == nil {
  357. tr, err = trie.New(owner, root, dl.triedb)
  358. if err != nil {
  359. ctx.stats.Log("Trie missing, state snapshotting paused", dl.root, dl.genMarker)
  360. return false, nil, errMissingTrie
  361. }
  362. }
  363. var (
  364. trieMore bool
  365. nodeIt = tr.NodeIterator(origin)
  366. iter = trie.NewIterator(nodeIt)
  367. kvkeys, kvvals = result.keys, result.vals
  368. // counters
  369. count = 0 // number of states delivered by iterator
  370. created = 0 // states created from the trie
  371. updated = 0 // states updated from the trie
  372. deleted = 0 // states not in trie, but were in snapshot
  373. untouched = 0 // states already correct
  374. // timers
  375. start = time.Now()
  376. internal time.Duration
  377. )
  378. nodeIt.AddResolver(snapNodeCache)
  379. for iter.Next() {
  380. if last != nil && bytes.Compare(iter.Key, last) > 0 {
  381. trieMore = true
  382. break
  383. }
  384. count++
  385. write := true
  386. created++
  387. for len(kvkeys) > 0 {
  388. if cmp := bytes.Compare(kvkeys[0], iter.Key); cmp < 0 {
  389. // delete the key
  390. istart := time.Now()
  391. if err := onState(kvkeys[0], nil, false, true); err != nil {
  392. return false, nil, err
  393. }
  394. kvkeys = kvkeys[1:]
  395. kvvals = kvvals[1:]
  396. deleted++
  397. internal += time.Since(istart)
  398. continue
  399. } else if cmp == 0 {
  400. // the snapshot key can be overwritten
  401. created--
  402. if write = !bytes.Equal(kvvals[0], iter.Value); write {
  403. updated++
  404. } else {
  405. untouched++
  406. }
  407. kvkeys = kvkeys[1:]
  408. kvvals = kvvals[1:]
  409. }
  410. break
  411. }
  412. istart := time.Now()
  413. if err := onState(iter.Key, iter.Value, write, false); err != nil {
  414. return false, nil, err
  415. }
  416. internal += time.Since(istart)
  417. }
  418. if iter.Err != nil {
  419. return false, nil, iter.Err
  420. }
  421. // Delete all stale snapshot states remaining
  422. istart := time.Now()
  423. for _, key := range kvkeys {
  424. if err := onState(key, nil, false, true); err != nil {
  425. return false, nil, err
  426. }
  427. deleted += 1
  428. }
  429. internal += time.Since(istart)
  430. // Update metrics for counting trie iteration
  431. if kind == snapStorage {
  432. snapStorageTrieReadCounter.Inc((time.Since(start) - internal).Nanoseconds())
  433. } else {
  434. snapAccountTrieReadCounter.Inc((time.Since(start) - internal).Nanoseconds())
  435. }
  436. logger.Debug("Regenerated state range", "root", root, "last", hexutil.Encode(last),
  437. "count", count, "created", created, "updated", updated, "untouched", untouched, "deleted", deleted)
  438. // If there are either more trie items, or there are more snap items
  439. // (in the next segment), then we need to keep working
  440. return !trieMore && !result.diskMore, last, nil
  441. }
  442. // checkAndFlush checks if an interruption signal is received or the
  443. // batch size has exceeded the allowance.
  444. func (dl *diskLayer) checkAndFlush(ctx *generatorContext, current []byte) error {
  445. var abort chan *generatorStats
  446. select {
  447. case abort = <-dl.genAbort:
  448. default:
  449. }
  450. if ctx.batch.ValueSize() > ethdb.IdealBatchSize || abort != nil {
  451. if bytes.Compare(current, dl.genMarker) < 0 {
  452. log.Error("Snapshot generator went backwards", "current", fmt.Sprintf("%x", current), "genMarker", fmt.Sprintf("%x", dl.genMarker))
  453. }
  454. // Flush out the batch anyway no matter it's empty or not.
  455. // It's possible that all the states are recovered and the
  456. // generation indeed makes progress.
  457. journalProgress(ctx.batch, current, ctx.stats)
  458. if err := ctx.batch.Write(); err != nil {
  459. return err
  460. }
  461. ctx.batch.Reset()
  462. dl.lock.Lock()
  463. dl.genMarker = current
  464. dl.lock.Unlock()
  465. if abort != nil {
  466. ctx.stats.Log("Aborting state snapshot generation", dl.root, current)
  467. return newAbortErr(abort) // bubble up an error for interruption
  468. }
  469. // Don't hold the iterators too long, release them to let compactor works
  470. ctx.reopenIterator(snapAccount)
  471. ctx.reopenIterator(snapStorage)
  472. }
  473. if time.Since(ctx.logged) > 8*time.Second {
  474. ctx.stats.Log("Generating state snapshot", dl.root, current)
  475. ctx.logged = time.Now()
  476. }
  477. return nil
  478. }
  479. // generateStorages generates the missing storage slots of the specific contract.
  480. // It's supposed to restart the generation from the given origin position.
  481. func generateStorages(ctx *generatorContext, dl *diskLayer, account common.Hash, storageRoot common.Hash, storeMarker []byte) error {
  482. onStorage := func(key []byte, val []byte, write bool, delete bool) error {
  483. defer func(start time.Time) {
  484. snapStorageWriteCounter.Inc(time.Since(start).Nanoseconds())
  485. }(time.Now())
  486. if delete {
  487. rawdb.DeleteStorageSnapshot(ctx.batch, account, common.BytesToHash(key))
  488. snapWipedStorageMeter.Mark(1)
  489. return nil
  490. }
  491. if write {
  492. rawdb.WriteStorageSnapshot(ctx.batch, account, common.BytesToHash(key), val)
  493. snapGeneratedStorageMeter.Mark(1)
  494. } else {
  495. snapRecoveredStorageMeter.Mark(1)
  496. }
  497. ctx.stats.storage += common.StorageSize(1 + 2*common.HashLength + len(val))
  498. ctx.stats.slots++
  499. // If we've exceeded our batch allowance or termination was requested, flush to disk
  500. if err := dl.checkAndFlush(ctx, append(account[:], key...)); err != nil {
  501. return err
  502. }
  503. return nil
  504. }
  505. // Loop for re-generating the missing storage slots.
  506. var origin = common.CopyBytes(storeMarker)
  507. for {
  508. exhausted, last, err := dl.generateRange(ctx, account, storageRoot, append(rawdb.SnapshotStoragePrefix, account.Bytes()...), snapStorage, origin, storageCheckRange, onStorage, nil)
  509. if err != nil {
  510. return err // The procedure it aborted, either by external signal or internal error.
  511. }
  512. // Abort the procedure if the entire contract storage is generated
  513. if exhausted {
  514. break
  515. }
  516. if origin = increaseKey(last); origin == nil {
  517. break // special case, the last is 0xffffffff...fff
  518. }
  519. }
  520. return nil
  521. }
  522. // generateAccounts generates the missing snapshot accounts as well as their
  523. // storage slots in the main trie. It's supposed to restart the generation
  524. // from the given origin position.
  525. func generateAccounts(ctx *generatorContext, dl *diskLayer, accMarker []byte) error {
  526. onAccount := func(key []byte, val []byte, write bool, delete bool) error {
  527. // Make sure to clear all dangling storages before this account
  528. account := common.BytesToHash(key)
  529. ctx.removeStorageBefore(account)
  530. start := time.Now()
  531. if delete {
  532. rawdb.DeleteAccountSnapshot(ctx.batch, account)
  533. snapWipedAccountMeter.Mark(1)
  534. snapAccountWriteCounter.Inc(time.Since(start).Nanoseconds())
  535. ctx.removeStorageAt(account)
  536. return nil
  537. }
  538. // Retrieve the current account and flatten it into the internal format
  539. var acc struct {
  540. Nonce uint64
  541. Balance *big.Int
  542. Root common.Hash
  543. CodeHash []byte
  544. }
  545. if err := rlp.DecodeBytes(val, &acc); err != nil {
  546. log.Crit("Invalid account encountered during snapshot creation", "err", err)
  547. }
  548. // If the account is not yet in-progress, write it out
  549. if accMarker == nil || !bytes.Equal(account[:], accMarker) {
  550. dataLen := len(val) // Approximate size, saves us a round of RLP-encoding
  551. if !write {
  552. if bytes.Equal(acc.CodeHash, emptyCode[:]) {
  553. dataLen -= 32
  554. }
  555. if acc.Root == emptyRoot {
  556. dataLen -= 32
  557. }
  558. snapRecoveredAccountMeter.Mark(1)
  559. } else {
  560. data := SlimAccountRLP(acc.Nonce, acc.Balance, acc.Root, acc.CodeHash)
  561. dataLen = len(data)
  562. rawdb.WriteAccountSnapshot(ctx.batch, account, data)
  563. snapGeneratedAccountMeter.Mark(1)
  564. }
  565. ctx.stats.storage += common.StorageSize(1 + common.HashLength + dataLen)
  566. ctx.stats.accounts++
  567. }
  568. // If the snap generation goes here after interrupted, genMarker may go backward
  569. // when last genMarker is consisted of accountHash and storageHash
  570. marker := account[:]
  571. if accMarker != nil && bytes.Equal(marker, accMarker) && len(dl.genMarker) > common.HashLength {
  572. marker = dl.genMarker[:]
  573. }
  574. // If we've exceeded our batch allowance or termination was requested, flush to disk
  575. if err := dl.checkAndFlush(ctx, marker); err != nil {
  576. return err
  577. }
  578. snapAccountWriteCounter.Inc(time.Since(start).Nanoseconds()) // let's count flush time as well
  579. // If the iterated account is the contract, create a further loop to
  580. // verify or regenerate the contract storage.
  581. if acc.Root == emptyRoot {
  582. ctx.removeStorageAt(account)
  583. } else {
  584. var storeMarker []byte
  585. if accMarker != nil && bytes.Equal(account[:], accMarker) && len(dl.genMarker) > common.HashLength {
  586. storeMarker = dl.genMarker[common.HashLength:]
  587. }
  588. if err := generateStorages(ctx, dl, account, acc.Root, storeMarker); err != nil {
  589. return err
  590. }
  591. }
  592. // Some account processed, unmark the marker
  593. accMarker = nil
  594. return nil
  595. }
  596. // Always reset the initial account range as 1 whenever recover from the
  597. // interruption. TODO(rjl493456442) can we remove it?
  598. var accountRange = accountCheckRange
  599. if len(accMarker) > 0 {
  600. accountRange = 1
  601. }
  602. origin := common.CopyBytes(accMarker)
  603. for {
  604. exhausted, last, err := dl.generateRange(ctx, common.Hash{}, dl.root, rawdb.SnapshotAccountPrefix, snapAccount, origin, accountRange, onAccount, FullAccountRLP)
  605. if err != nil {
  606. return err // The procedure it aborted, either by external signal or internal error.
  607. }
  608. origin = increaseKey(last)
  609. // Last step, cleanup the storages after the last account.
  610. // All the left storages should be treated as dangling.
  611. if origin == nil || exhausted {
  612. ctx.removeStorageLeft()
  613. break
  614. }
  615. accountRange = accountCheckRange
  616. }
  617. return nil
  618. }
  619. // generate is a background thread that iterates over the state and storage tries,
  620. // constructing the state snapshot. All the arguments are purely for statistics
  621. // gathering and logging, since the method surfs the blocks as they arrive, often
  622. // being restarted.
  623. func (dl *diskLayer) generate(stats *generatorStats) {
  624. var (
  625. accMarker []byte
  626. abort chan *generatorStats
  627. )
  628. if len(dl.genMarker) > 0 { // []byte{} is the start, use nil for that
  629. accMarker = dl.genMarker[:common.HashLength]
  630. }
  631. stats.Log("Resuming state snapshot generation", dl.root, dl.genMarker)
  632. // Initialize the global generator context. The snapshot iterators are
  633. // opened at the interrupted position because the assumption is held
  634. // that all the snapshot data are generated correctly before the marker.
  635. // Even if the snapshot data is updated during the interruption (before
  636. // or at the marker), the assumption is still held.
  637. // For the account or storage slot at the interruption, they will be
  638. // processed twice by the generator(they are already processed in the
  639. // last run) but it's fine.
  640. ctx := newGeneratorContext(stats, dl.diskdb, accMarker, dl.genMarker)
  641. defer ctx.close()
  642. if err := generateAccounts(ctx, dl, accMarker); err != nil {
  643. // Extract the received interruption signal if exists
  644. if aerr, ok := err.(*abortErr); ok {
  645. abort = aerr.abort
  646. }
  647. // Aborted by internal error, wait the signal
  648. if abort == nil {
  649. abort = <-dl.genAbort
  650. }
  651. abort <- stats
  652. return
  653. }
  654. // Snapshot fully generated, set the marker to nil.
  655. // Note even there is nothing to commit, persist the
  656. // generator anyway to mark the snapshot is complete.
  657. journalProgress(ctx.batch, nil, stats)
  658. if err := ctx.batch.Write(); err != nil {
  659. log.Error("Failed to flush batch", "err", err)
  660. abort = <-dl.genAbort
  661. abort <- stats
  662. return
  663. }
  664. ctx.batch.Reset()
  665. log.Info("Generated state snapshot", "accounts", stats.accounts, "slots", stats.slots,
  666. "storage", stats.storage, "dangling", stats.dangling, "elapsed", common.PrettyDuration(time.Since(stats.start)))
  667. dl.lock.Lock()
  668. dl.genMarker = nil
  669. close(dl.genPending)
  670. dl.lock.Unlock()
  671. // Someone will be looking for us, wait it out
  672. abort = <-dl.genAbort
  673. abort <- nil
  674. }
  675. // increaseKey increase the input key by one bit. Return nil if the entire
  676. // addition operation overflows.
  677. func increaseKey(key []byte) []byte {
  678. for i := len(key) - 1; i >= 0; i-- {
  679. key[i]++
  680. if key[i] != 0x0 {
  681. return key
  682. }
  683. }
  684. return nil
  685. }
  686. // abortErr wraps an interruption signal received to represent the
  687. // generation is aborted by external processes.
  688. type abortErr struct {
  689. abort chan *generatorStats
  690. }
  691. func newAbortErr(abort chan *generatorStats) error {
  692. return &abortErr{abort: abort}
  693. }
  694. func (err *abortErr) Error() string {
  695. return "aborted"
  696. }