journal.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360
  1. // Copyright 2019 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package snapshot
  17. import (
  18. "bytes"
  19. "encoding/binary"
  20. "errors"
  21. "fmt"
  22. "io"
  23. "time"
  24. "github.com/VictoriaMetrics/fastcache"
  25. "github.com/ethereum/go-ethereum/common"
  26. "github.com/ethereum/go-ethereum/core/rawdb"
  27. "github.com/ethereum/go-ethereum/ethdb"
  28. "github.com/ethereum/go-ethereum/log"
  29. "github.com/ethereum/go-ethereum/rlp"
  30. "github.com/ethereum/go-ethereum/trie"
  31. )
  32. const journalVersion uint64 = 0
  33. // journalGenerator is a disk layer entry containing the generator progress marker.
  34. type journalGenerator struct {
  35. // Indicator that whether the database was in progress of being wiped.
  36. // It's deprecated but keep it here for background compatibility.
  37. Wiping bool
  38. Done bool // Whether the generator finished creating the snapshot
  39. Marker []byte
  40. Accounts uint64
  41. Slots uint64
  42. Storage uint64
  43. }
  44. // journalDestruct is an account deletion entry in a diffLayer's disk journal.
  45. type journalDestruct struct {
  46. Hash common.Hash
  47. }
  48. // journalAccount is an account entry in a diffLayer's disk journal.
  49. type journalAccount struct {
  50. Hash common.Hash
  51. Blob []byte
  52. }
  53. // journalStorage is an account's storage map in a diffLayer's disk journal.
  54. type journalStorage struct {
  55. Hash common.Hash
  56. Keys []common.Hash
  57. Vals [][]byte
  58. }
  59. func ParseGeneratorStatus(generatorBlob []byte) string {
  60. if len(generatorBlob) == 0 {
  61. return ""
  62. }
  63. var generator journalGenerator
  64. if err := rlp.DecodeBytes(generatorBlob, &generator); err != nil {
  65. log.Warn("failed to decode snapshot generator", "err", err)
  66. return ""
  67. }
  68. // Figure out whether we're after or within an account
  69. var m string
  70. switch marker := generator.Marker; len(marker) {
  71. case common.HashLength:
  72. m = fmt.Sprintf("at %#x", marker)
  73. case 2 * common.HashLength:
  74. m = fmt.Sprintf("in %#x at %#x", marker[:common.HashLength], marker[common.HashLength:])
  75. default:
  76. m = fmt.Sprintf("%#x", marker)
  77. }
  78. return fmt.Sprintf(`Done: %v, Accounts: %d, Slots: %d, Storage: %d, Marker: %s`,
  79. generator.Done, generator.Accounts, generator.Slots, generator.Storage, m)
  80. }
  81. // loadAndParseJournal tries to parse the snapshot journal in latest format.
  82. func loadAndParseJournal(db ethdb.KeyValueStore, base *diskLayer) (snapshot, journalGenerator, error) {
  83. // Retrieve the disk layer generator. It must exist, no matter the
  84. // snapshot is fully generated or not. Otherwise the entire disk
  85. // layer is invalid.
  86. generatorBlob := rawdb.ReadSnapshotGenerator(db)
  87. if len(generatorBlob) == 0 {
  88. return nil, journalGenerator{}, errors.New("missing snapshot generator")
  89. }
  90. var generator journalGenerator
  91. if err := rlp.DecodeBytes(generatorBlob, &generator); err != nil {
  92. return nil, journalGenerator{}, fmt.Errorf("failed to decode snapshot generator: %v", err)
  93. }
  94. // Retrieve the diff layer journal. It's possible that the journal is
  95. // not existent, e.g. the disk layer is generating while that the Geth
  96. // crashes without persisting the diff journal.
  97. // So if there is no journal, or the journal is invalid(e.g. the journal
  98. // is not matched with disk layer; or the it's the legacy-format journal,
  99. // etc.), we just discard all diffs and try to recover them later.
  100. var current snapshot = base
  101. err := iterateJournal(db, func(parent common.Hash, root common.Hash, destructSet map[common.Hash]struct{}, accountData map[common.Hash][]byte, storageData map[common.Hash]map[common.Hash][]byte) error {
  102. current = newDiffLayer(current, root, destructSet, accountData, storageData)
  103. return nil
  104. })
  105. if err != nil {
  106. return base, generator, nil
  107. }
  108. return current, generator, nil
  109. }
  110. // loadSnapshot loads a pre-existing state snapshot backed by a key-value store.
  111. func loadSnapshot(diskdb ethdb.KeyValueStore, triedb *trie.Database, cache int, root common.Hash, recovery bool) (snapshot, bool, error) {
  112. // If snapshotting is disabled (initial sync in progress), don't do anything,
  113. // wait for the chain to permit us to do something meaningful
  114. if rawdb.ReadSnapshotDisabled(diskdb) {
  115. return nil, true, nil
  116. }
  117. // Retrieve the block number and hash of the snapshot, failing if no snapshot
  118. // is present in the database (or crashed mid-update).
  119. baseRoot := rawdb.ReadSnapshotRoot(diskdb)
  120. if baseRoot == (common.Hash{}) {
  121. return nil, false, errors.New("missing or corrupted snapshot")
  122. }
  123. base := &diskLayer{
  124. diskdb: diskdb,
  125. triedb: triedb,
  126. cache: fastcache.New(cache * 1024 * 1024),
  127. root: baseRoot,
  128. }
  129. snapshot, generator, err := loadAndParseJournal(diskdb, base)
  130. if err != nil {
  131. log.Warn("Failed to load new-format journal", "error", err)
  132. return nil, false, err
  133. }
  134. // Entire snapshot journal loaded, sanity check the head. If the loaded
  135. // snapshot is not matched with current state root, print a warning log
  136. // or discard the entire snapshot it's legacy snapshot.
  137. //
  138. // Possible scenario: Geth was crashed without persisting journal and then
  139. // restart, the head is rewound to the point with available state(trie)
  140. // which is below the snapshot. In this case the snapshot can be recovered
  141. // by re-executing blocks but right now it's unavailable.
  142. if head := snapshot.Root(); head != root {
  143. // If it's legacy snapshot, or it's new-format snapshot but
  144. // it's not in recovery mode, returns the error here for
  145. // rebuilding the entire snapshot forcibly.
  146. if !recovery {
  147. return nil, false, fmt.Errorf("head doesn't match snapshot: have %#x, want %#x", head, root)
  148. }
  149. // It's in snapshot recovery, the assumption is held that
  150. // the disk layer is always higher than chain head. It can
  151. // be eventually recovered when the chain head beyonds the
  152. // disk layer.
  153. log.Warn("Snapshot is not continuous with chain", "snaproot", head, "chainroot", root)
  154. }
  155. // Everything loaded correctly, resume any suspended operations
  156. if !generator.Done {
  157. // Whether or not wiping was in progress, load any generator progress too
  158. base.genMarker = generator.Marker
  159. if base.genMarker == nil {
  160. base.genMarker = []byte{}
  161. }
  162. base.genPending = make(chan struct{})
  163. base.genAbort = make(chan chan *generatorStats)
  164. var origin uint64
  165. if len(generator.Marker) >= 8 {
  166. origin = binary.BigEndian.Uint64(generator.Marker)
  167. }
  168. go base.generate(&generatorStats{
  169. origin: origin,
  170. start: time.Now(),
  171. accounts: generator.Accounts,
  172. slots: generator.Slots,
  173. storage: common.StorageSize(generator.Storage),
  174. })
  175. }
  176. return snapshot, false, nil
  177. }
  178. // Journal terminates any in-progress snapshot generation, also implicitly pushing
  179. // the progress into the database.
  180. func (dl *diskLayer) Journal(buffer *bytes.Buffer) (common.Hash, error) {
  181. // If the snapshot is currently being generated, abort it
  182. var stats *generatorStats
  183. if dl.genAbort != nil {
  184. abort := make(chan *generatorStats)
  185. dl.genAbort <- abort
  186. if stats = <-abort; stats != nil {
  187. stats.Log("Journalling in-progress snapshot", dl.root, dl.genMarker)
  188. }
  189. }
  190. // Ensure the layer didn't get stale
  191. dl.lock.RLock()
  192. defer dl.lock.RUnlock()
  193. if dl.stale {
  194. return common.Hash{}, ErrSnapshotStale
  195. }
  196. // Ensure the generator stats is written even if none was ran this cycle
  197. journalProgress(dl.diskdb, dl.genMarker, stats)
  198. log.Debug("Journalled disk layer", "root", dl.root)
  199. return dl.root, nil
  200. }
  201. // Journal writes the memory layer contents into a buffer to be stored in the
  202. // database as the snapshot journal.
  203. func (dl *diffLayer) Journal(buffer *bytes.Buffer) (common.Hash, error) {
  204. // Journal the parent first
  205. base, err := dl.parent.Journal(buffer)
  206. if err != nil {
  207. return common.Hash{}, err
  208. }
  209. // Ensure the layer didn't get stale
  210. dl.lock.RLock()
  211. defer dl.lock.RUnlock()
  212. if dl.Stale() {
  213. return common.Hash{}, ErrSnapshotStale
  214. }
  215. // Everything below was journalled, persist this layer too
  216. if err := rlp.Encode(buffer, dl.root); err != nil {
  217. return common.Hash{}, err
  218. }
  219. destructs := make([]journalDestruct, 0, len(dl.destructSet))
  220. for hash := range dl.destructSet {
  221. destructs = append(destructs, journalDestruct{Hash: hash})
  222. }
  223. if err := rlp.Encode(buffer, destructs); err != nil {
  224. return common.Hash{}, err
  225. }
  226. accounts := make([]journalAccount, 0, len(dl.accountData))
  227. for hash, blob := range dl.accountData {
  228. accounts = append(accounts, journalAccount{Hash: hash, Blob: blob})
  229. }
  230. if err := rlp.Encode(buffer, accounts); err != nil {
  231. return common.Hash{}, err
  232. }
  233. storage := make([]journalStorage, 0, len(dl.storageData))
  234. for hash, slots := range dl.storageData {
  235. keys := make([]common.Hash, 0, len(slots))
  236. vals := make([][]byte, 0, len(slots))
  237. for key, val := range slots {
  238. keys = append(keys, key)
  239. vals = append(vals, val)
  240. }
  241. storage = append(storage, journalStorage{Hash: hash, Keys: keys, Vals: vals})
  242. }
  243. if err := rlp.Encode(buffer, storage); err != nil {
  244. return common.Hash{}, err
  245. }
  246. log.Debug("Journalled diff layer", "root", dl.root, "parent", dl.parent.Root())
  247. return base, nil
  248. }
  249. // journalCallback is a function which is invoked by iterateJournal, every
  250. // time a difflayer is loaded from disk.
  251. type journalCallback = func(parent common.Hash, root common.Hash, destructs map[common.Hash]struct{}, accounts map[common.Hash][]byte, storage map[common.Hash]map[common.Hash][]byte) error
  252. // iterateJournal iterates through the journalled difflayers, loading them from
  253. // the database, and invoking the callback for each loaded layer.
  254. // The order is incremental; starting with the bottom-most difflayer, going towards
  255. // the most recent layer.
  256. // This method returns error either if there was some error reading from disk,
  257. // OR if the callback returns an error when invoked.
  258. func iterateJournal(db ethdb.KeyValueReader, callback journalCallback) error {
  259. journal := rawdb.ReadSnapshotJournal(db)
  260. if len(journal) == 0 {
  261. log.Warn("Loaded snapshot journal", "diffs", "missing")
  262. return nil
  263. }
  264. r := rlp.NewStream(bytes.NewReader(journal), 0)
  265. // Firstly, resolve the first element as the journal version
  266. version, err := r.Uint64()
  267. if err != nil {
  268. log.Warn("Failed to resolve the journal version", "error", err)
  269. return errors.New("failed to resolve journal version")
  270. }
  271. if version != journalVersion {
  272. log.Warn("Discarded the snapshot journal with wrong version", "required", journalVersion, "got", version)
  273. return errors.New("wrong journal version")
  274. }
  275. // Secondly, resolve the disk layer root, ensure it's continuous
  276. // with disk layer. Note now we can ensure it's the snapshot journal
  277. // correct version, so we expect everything can be resolved properly.
  278. var parent common.Hash
  279. if err := r.Decode(&parent); err != nil {
  280. return errors.New("missing disk layer root")
  281. }
  282. if baseRoot := rawdb.ReadSnapshotRoot(db); baseRoot != parent {
  283. log.Warn("Loaded snapshot journal", "diskroot", baseRoot, "diffs", "unmatched")
  284. return fmt.Errorf("mismatched disk and diff layers")
  285. }
  286. for {
  287. var (
  288. root common.Hash
  289. destructs []journalDestruct
  290. accounts []journalAccount
  291. storage []journalStorage
  292. destructSet = make(map[common.Hash]struct{})
  293. accountData = make(map[common.Hash][]byte)
  294. storageData = make(map[common.Hash]map[common.Hash][]byte)
  295. )
  296. // Read the next diff journal entry
  297. if err := r.Decode(&root); err != nil {
  298. // The first read may fail with EOF, marking the end of the journal
  299. if errors.Is(err, io.EOF) {
  300. return nil
  301. }
  302. return fmt.Errorf("load diff root: %v", err)
  303. }
  304. if err := r.Decode(&destructs); err != nil {
  305. return fmt.Errorf("load diff destructs: %v", err)
  306. }
  307. if err := r.Decode(&accounts); err != nil {
  308. return fmt.Errorf("load diff accounts: %v", err)
  309. }
  310. if err := r.Decode(&storage); err != nil {
  311. return fmt.Errorf("load diff storage: %v", err)
  312. }
  313. for _, entry := range destructs {
  314. destructSet[entry.Hash] = struct{}{}
  315. }
  316. for _, entry := range accounts {
  317. if len(entry.Blob) > 0 { // RLP loses nil-ness, but `[]byte{}` is not a valid item, so reinterpret that
  318. accountData[entry.Hash] = entry.Blob
  319. } else {
  320. accountData[entry.Hash] = nil
  321. }
  322. }
  323. for _, entry := range storage {
  324. slots := make(map[common.Hash][]byte)
  325. for i, key := range entry.Keys {
  326. if len(entry.Vals[i]) > 0 { // RLP loses nil-ness, but `[]byte{}` is not a valid item, so reinterpret that
  327. slots[key] = entry.Vals[i]
  328. } else {
  329. slots[key] = nil
  330. }
  331. }
  332. storageData[entry.Hash] = slots
  333. }
  334. if err := callback(parent, root, destructSet, accountData, storageData); err != nil {
  335. return err
  336. }
  337. parent = root
  338. }
  339. }