generate.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337
  1. // Copyright 2019 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package snapshot
  17. import (
  18. "bytes"
  19. "encoding/binary"
  20. "fmt"
  21. "math/big"
  22. "time"
  23. "github.com/VictoriaMetrics/fastcache"
  24. "github.com/ethereum/go-ethereum/common"
  25. "github.com/ethereum/go-ethereum/common/math"
  26. "github.com/ethereum/go-ethereum/core/rawdb"
  27. "github.com/ethereum/go-ethereum/crypto"
  28. "github.com/ethereum/go-ethereum/ethdb"
  29. "github.com/ethereum/go-ethereum/log"
  30. "github.com/ethereum/go-ethereum/rlp"
  31. "github.com/ethereum/go-ethereum/trie"
  32. )
  33. var (
  34. // emptyRoot is the known root hash of an empty trie.
  35. emptyRoot = common.HexToHash("56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421")
  36. // emptyCode is the known hash of the empty EVM bytecode.
  37. emptyCode = crypto.Keccak256Hash(nil)
  38. )
  39. // generatorStats is a collection of statistics gathered by the snapshot generator
  40. // for logging purposes.
  41. type generatorStats struct {
  42. wiping chan struct{} // Notification channel if wiping is in progress
  43. origin uint64 // Origin prefix where generation started
  44. start time.Time // Timestamp when generation started
  45. accounts uint64 // Number of accounts indexed
  46. slots uint64 // Number of storage slots indexed
  47. storage common.StorageSize // Account and storage slot size
  48. }
  49. // Log creates an contextual log with the given message and the context pulled
  50. // from the internally maintained statistics.
  51. func (gs *generatorStats) Log(msg string, root common.Hash, marker []byte) {
  52. var ctx []interface{}
  53. if root != (common.Hash{}) {
  54. ctx = append(ctx, []interface{}{"root", root}...)
  55. }
  56. // Figure out whether we're after or within an account
  57. switch len(marker) {
  58. case common.HashLength:
  59. ctx = append(ctx, []interface{}{"at", common.BytesToHash(marker)}...)
  60. case 2 * common.HashLength:
  61. ctx = append(ctx, []interface{}{
  62. "in", common.BytesToHash(marker[:common.HashLength]),
  63. "at", common.BytesToHash(marker[common.HashLength:]),
  64. }...)
  65. }
  66. // Add the usual measurements
  67. ctx = append(ctx, []interface{}{
  68. "accounts", gs.accounts,
  69. "slots", gs.slots,
  70. "storage", gs.storage,
  71. "elapsed", common.PrettyDuration(time.Since(gs.start)),
  72. }...)
  73. // Calculate the estimated indexing time based on current stats
  74. if len(marker) > 0 {
  75. if done := binary.BigEndian.Uint64(marker[:8]) - gs.origin; done > 0 {
  76. left := math.MaxUint64 - binary.BigEndian.Uint64(marker[:8])
  77. speed := done/uint64(time.Since(gs.start)/time.Millisecond+1) + 1 // +1s to avoid division by zero
  78. ctx = append(ctx, []interface{}{
  79. "eta", common.PrettyDuration(time.Duration(left/speed) * time.Millisecond),
  80. }...)
  81. }
  82. }
  83. log.Info(msg, ctx...)
  84. }
  85. // generateSnapshot regenerates a brand new snapshot based on an existing state
  86. // database and head block asynchronously. The snapshot is returned immediately
  87. // and generation is continued in the background until done.
  88. func generateSnapshot(diskdb ethdb.KeyValueStore, triedb *trie.Database, cache int, root common.Hash, wiper chan struct{}) *diskLayer {
  89. // Wipe any previously existing snapshot from the database if no wiper is
  90. // currently in progress.
  91. if wiper == nil {
  92. wiper = wipeSnapshot(diskdb, true)
  93. }
  94. // Create a new disk layer with an initialized state marker at zero
  95. var (
  96. stats = &generatorStats{wiping: wiper, start: time.Now()}
  97. batch = diskdb.NewBatch()
  98. genMarker = []byte{} // Initialized but empty!
  99. )
  100. rawdb.WriteSnapshotRoot(batch, root)
  101. journalProgress(batch, genMarker, stats)
  102. if err := batch.Write(); err != nil {
  103. log.Crit("Failed to write initialized state marker", "error", err)
  104. }
  105. base := &diskLayer{
  106. diskdb: diskdb,
  107. triedb: triedb,
  108. root: root,
  109. cache: fastcache.New(cache * 1024 * 1024),
  110. genMarker: genMarker,
  111. genPending: make(chan struct{}),
  112. genAbort: make(chan chan *generatorStats),
  113. }
  114. go base.generate(stats)
  115. log.Debug("Start snapshot generation", "root", root)
  116. return base
  117. }
  118. // journalProgress persists the generator stats into the database to resume later.
  119. func journalProgress(db ethdb.KeyValueWriter, marker []byte, stats *generatorStats) {
  120. // Write out the generator marker. Note it's a standalone disk layer generator
  121. // which is not mixed with journal. It's ok if the generator is persisted while
  122. // journal is not.
  123. entry := journalGenerator{
  124. Done: marker == nil,
  125. Marker: marker,
  126. }
  127. if stats != nil {
  128. entry.Wiping = (stats.wiping != nil)
  129. entry.Accounts = stats.accounts
  130. entry.Slots = stats.slots
  131. entry.Storage = uint64(stats.storage)
  132. }
  133. blob, err := rlp.EncodeToBytes(entry)
  134. if err != nil {
  135. panic(err) // Cannot happen, here to catch dev errors
  136. }
  137. var logstr string
  138. switch {
  139. case marker == nil:
  140. logstr = "done"
  141. case bytes.Equal(marker, []byte{}):
  142. logstr = "empty"
  143. case len(marker) == common.HashLength:
  144. logstr = fmt.Sprintf("%#x", marker)
  145. default:
  146. logstr = fmt.Sprintf("%#x:%#x", marker[:common.HashLength], marker[common.HashLength:])
  147. }
  148. log.Debug("Journalled generator progress", "progress", logstr)
  149. rawdb.WriteSnapshotGenerator(db, blob)
  150. }
  151. // generate is a background thread that iterates over the state and storage tries,
  152. // constructing the state snapshot. All the arguments are purely for statistics
  153. // gathering and logging, since the method surfs the blocks as they arrive, often
  154. // being restarted.
  155. func (dl *diskLayer) generate(stats *generatorStats) {
  156. // If a database wipe is in operation, wait until it's done
  157. if stats.wiping != nil {
  158. stats.Log("Wiper running, state snapshotting paused", common.Hash{}, dl.genMarker)
  159. select {
  160. // If wiper is done, resume normal mode of operation
  161. case <-stats.wiping:
  162. stats.wiping = nil
  163. stats.start = time.Now()
  164. // If generator was aborted during wipe, return
  165. case abort := <-dl.genAbort:
  166. abort <- stats
  167. return
  168. }
  169. }
  170. // Create an account and state iterator pointing to the current generator marker
  171. accTrie, err := trie.NewSecure(dl.root, dl.triedb)
  172. if err != nil {
  173. // The account trie is missing (GC), surf the chain until one becomes available
  174. stats.Log("Trie missing, state snapshotting paused", dl.root, dl.genMarker)
  175. abort := <-dl.genAbort
  176. abort <- stats
  177. return
  178. }
  179. stats.Log("Resuming state snapshot generation", dl.root, dl.genMarker)
  180. var accMarker []byte
  181. if len(dl.genMarker) > 0 { // []byte{} is the start, use nil for that
  182. accMarker = dl.genMarker[:common.HashLength]
  183. }
  184. accIt := trie.NewIterator(accTrie.NodeIterator(accMarker))
  185. batch := dl.diskdb.NewBatch()
  186. // Iterate from the previous marker and continue generating the state snapshot
  187. logged := time.Now()
  188. for accIt.Next() {
  189. // Retrieve the current account and flatten it into the internal format
  190. accountHash := common.BytesToHash(accIt.Key)
  191. var acc struct {
  192. Nonce uint64
  193. Balance *big.Int
  194. Root common.Hash
  195. CodeHash []byte
  196. }
  197. if err := rlp.DecodeBytes(accIt.Value, &acc); err != nil {
  198. log.Crit("Invalid account encountered during snapshot creation", "err", err)
  199. }
  200. data := SlimAccountRLP(acc.Nonce, acc.Balance, acc.Root, acc.CodeHash)
  201. // If the account is not yet in-progress, write it out
  202. if accMarker == nil || !bytes.Equal(accountHash[:], accMarker) {
  203. rawdb.WriteAccountSnapshot(batch, accountHash, data)
  204. stats.storage += common.StorageSize(1 + common.HashLength + len(data))
  205. stats.accounts++
  206. }
  207. // If we've exceeded our batch allowance or termination was requested, flush to disk
  208. var abort chan *generatorStats
  209. select {
  210. case abort = <-dl.genAbort:
  211. default:
  212. }
  213. if batch.ValueSize() > ethdb.IdealBatchSize || abort != nil {
  214. // Only write and set the marker if we actually did something useful
  215. if batch.ValueSize() > 0 {
  216. // Ensure the generator entry is in sync with the data
  217. marker := accountHash[:]
  218. journalProgress(batch, marker, stats)
  219. batch.Write()
  220. batch.Reset()
  221. dl.lock.Lock()
  222. dl.genMarker = marker
  223. dl.lock.Unlock()
  224. }
  225. if abort != nil {
  226. stats.Log("Aborting state snapshot generation", dl.root, accountHash[:])
  227. abort <- stats
  228. return
  229. }
  230. }
  231. // If the account is in-progress, continue where we left off (otherwise iterate all)
  232. if acc.Root != emptyRoot {
  233. storeTrie, err := trie.NewSecure(acc.Root, dl.triedb)
  234. if err != nil {
  235. log.Error("Generator failed to access storage trie", "root", dl.root, "account", accountHash, "stroot", acc.Root, "err", err)
  236. abort := <-dl.genAbort
  237. abort <- stats
  238. return
  239. }
  240. var storeMarker []byte
  241. if accMarker != nil && bytes.Equal(accountHash[:], accMarker) && len(dl.genMarker) > common.HashLength {
  242. storeMarker = dl.genMarker[common.HashLength:]
  243. }
  244. storeIt := trie.NewIterator(storeTrie.NodeIterator(storeMarker))
  245. for storeIt.Next() {
  246. rawdb.WriteStorageSnapshot(batch, accountHash, common.BytesToHash(storeIt.Key), storeIt.Value)
  247. stats.storage += common.StorageSize(1 + 2*common.HashLength + len(storeIt.Value))
  248. stats.slots++
  249. // If we've exceeded our batch allowance or termination was requested, flush to disk
  250. var abort chan *generatorStats
  251. select {
  252. case abort = <-dl.genAbort:
  253. default:
  254. }
  255. if batch.ValueSize() > ethdb.IdealBatchSize || abort != nil {
  256. // Only write and set the marker if we actually did something useful
  257. if batch.ValueSize() > 0 {
  258. // Ensure the generator entry is in sync with the data
  259. marker := append(accountHash[:], storeIt.Key...)
  260. journalProgress(batch, marker, stats)
  261. batch.Write()
  262. batch.Reset()
  263. dl.lock.Lock()
  264. dl.genMarker = marker
  265. dl.lock.Unlock()
  266. }
  267. if abort != nil {
  268. stats.Log("Aborting state snapshot generation", dl.root, append(accountHash[:], storeIt.Key...))
  269. abort <- stats
  270. return
  271. }
  272. if time.Since(logged) > 8*time.Second {
  273. stats.Log("Generating state snapshot", dl.root, append(accountHash[:], storeIt.Key...))
  274. logged = time.Now()
  275. }
  276. }
  277. }
  278. if err := storeIt.Err; err != nil {
  279. log.Error("Generator failed to iterate storage trie", "accroot", dl.root, "acchash", common.BytesToHash(accIt.Key), "stroot", acc.Root, "err", err)
  280. abort := <-dl.genAbort
  281. abort <- stats
  282. return
  283. }
  284. }
  285. if time.Since(logged) > 8*time.Second {
  286. stats.Log("Generating state snapshot", dl.root, accIt.Key)
  287. logged = time.Now()
  288. }
  289. // Some account processed, unmark the marker
  290. accMarker = nil
  291. }
  292. if err := accIt.Err; err != nil {
  293. log.Error("Generator failed to iterate account trie", "root", dl.root, "err", err)
  294. abort := <-dl.genAbort
  295. abort <- stats
  296. return
  297. }
  298. // Snapshot fully generated, set the marker to nil.
  299. // Note even there is nothing to commit, persist the
  300. // generator anyway to mark the snapshot is complete.
  301. journalProgress(batch, nil, stats)
  302. batch.Write()
  303. log.Info("Generated state snapshot", "accounts", stats.accounts, "slots", stats.slots,
  304. "storage", stats.storage, "elapsed", common.PrettyDuration(time.Since(stats.start)))
  305. dl.lock.Lock()
  306. dl.genMarker = nil
  307. close(dl.genPending)
  308. dl.lock.Unlock()
  309. // Someone will be looking for us, wait it out
  310. abort := <-dl.genAbort
  311. abort <- nil
  312. }