conversion.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378
  1. // Copyright 2020 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package snapshot
  17. import (
  18. "bytes"
  19. "encoding/binary"
  20. "errors"
  21. "fmt"
  22. "math"
  23. "runtime"
  24. "sync"
  25. "time"
  26. "github.com/ethereum/go-ethereum/common/gopool"
  27. "github.com/ethereum/go-ethereum/common"
  28. "github.com/ethereum/go-ethereum/core/rawdb"
  29. "github.com/ethereum/go-ethereum/ethdb"
  30. "github.com/ethereum/go-ethereum/log"
  31. "github.com/ethereum/go-ethereum/rlp"
  32. "github.com/ethereum/go-ethereum/trie"
  33. )
  34. // trieKV represents a trie key-value pair
  35. type trieKV struct {
  36. key common.Hash
  37. value []byte
  38. }
  39. type (
  40. // trieGeneratorFn is the interface of trie generation which can
  41. // be implemented by different trie algorithm.
  42. trieGeneratorFn func(db ethdb.KeyValueWriter, in chan (trieKV), out chan (common.Hash))
  43. // leafCallbackFn is the callback invoked at the leaves of the trie,
  44. // returns the subtrie root with the specified subtrie identifier.
  45. leafCallbackFn func(db ethdb.KeyValueWriter, accountHash, codeHash common.Hash, stat *generateStats) (common.Hash, error)
  46. )
  47. // GenerateAccountTrieRoot takes an account iterator and reproduces the root hash.
  48. func GenerateAccountTrieRoot(it AccountIterator) (common.Hash, error) {
  49. return generateTrieRoot(nil, it, common.Hash{}, stackTrieGenerate, nil, newGenerateStats(), true)
  50. }
  51. // GenerateStorageTrieRoot takes a storage iterator and reproduces the root hash.
  52. func GenerateStorageTrieRoot(account common.Hash, it StorageIterator) (common.Hash, error) {
  53. return generateTrieRoot(nil, it, account, stackTrieGenerate, nil, newGenerateStats(), true)
  54. }
  55. // GenerateTrie takes the whole snapshot tree as the input, traverses all the
  56. // accounts as well as the corresponding storages and regenerate the whole state
  57. // (account trie + all storage tries).
  58. func GenerateTrie(snaptree *Tree, root common.Hash, src ethdb.Database, dst ethdb.KeyValueWriter) error {
  59. // Traverse all state by snapshot, re-generate the whole state trie
  60. acctIt, err := snaptree.AccountIterator(root, common.Hash{})
  61. if err != nil {
  62. return err // The required snapshot might not exist.
  63. }
  64. defer acctIt.Release()
  65. got, err := generateTrieRoot(dst, acctIt, common.Hash{}, stackTrieGenerate, func(dst ethdb.KeyValueWriter, accountHash, codeHash common.Hash, stat *generateStats) (common.Hash, error) {
  66. // Migrate the code first, commit the contract code into the tmp db.
  67. if codeHash != emptyCode {
  68. code := rawdb.ReadCode(src, codeHash)
  69. if len(code) == 0 {
  70. return common.Hash{}, errors.New("failed to read contract code")
  71. }
  72. rawdb.WriteCode(dst, codeHash, code)
  73. }
  74. // Then migrate all storage trie nodes into the tmp db.
  75. storageIt, err := snaptree.StorageIterator(root, accountHash, common.Hash{})
  76. if err != nil {
  77. return common.Hash{}, err
  78. }
  79. defer storageIt.Release()
  80. hash, err := generateTrieRoot(dst, storageIt, accountHash, stackTrieGenerate, nil, stat, false)
  81. if err != nil {
  82. return common.Hash{}, err
  83. }
  84. return hash, nil
  85. }, newGenerateStats(), true)
  86. if err != nil {
  87. return err
  88. }
  89. if got != root {
  90. return fmt.Errorf("state root hash mismatch: got %x, want %x", got, root)
  91. }
  92. return nil
  93. }
  94. // generateStats is a collection of statistics gathered by the trie generator
  95. // for logging purposes.
  96. type generateStats struct {
  97. head common.Hash
  98. start time.Time
  99. accounts uint64 // Number of accounts done (including those being crawled)
  100. slots uint64 // Number of storage slots done (including those being crawled)
  101. slotsStart map[common.Hash]time.Time // Start time for account slot crawling
  102. slotsHead map[common.Hash]common.Hash // Slot head for accounts being crawled
  103. lock sync.RWMutex
  104. }
  105. // newGenerateStats creates a new generator stats.
  106. func newGenerateStats() *generateStats {
  107. return &generateStats{
  108. slotsStart: make(map[common.Hash]time.Time),
  109. slotsHead: make(map[common.Hash]common.Hash),
  110. start: time.Now(),
  111. }
  112. }
  113. // progressAccounts updates the generator stats for the account range.
  114. func (stat *generateStats) progressAccounts(account common.Hash, done uint64) {
  115. stat.lock.Lock()
  116. defer stat.lock.Unlock()
  117. stat.accounts += done
  118. stat.head = account
  119. }
  120. // finishAccounts updates the gemerator stats for the finished account range.
  121. func (stat *generateStats) finishAccounts(done uint64) {
  122. stat.lock.Lock()
  123. defer stat.lock.Unlock()
  124. stat.accounts += done
  125. }
  126. // progressContract updates the generator stats for a specific in-progress contract.
  127. func (stat *generateStats) progressContract(account common.Hash, slot common.Hash, done uint64) {
  128. stat.lock.Lock()
  129. defer stat.lock.Unlock()
  130. stat.slots += done
  131. stat.slotsHead[account] = slot
  132. if _, ok := stat.slotsStart[account]; !ok {
  133. stat.slotsStart[account] = time.Now()
  134. }
  135. }
  136. // finishContract updates the generator stats for a specific just-finished contract.
  137. func (stat *generateStats) finishContract(account common.Hash, done uint64) {
  138. stat.lock.Lock()
  139. defer stat.lock.Unlock()
  140. stat.slots += done
  141. delete(stat.slotsHead, account)
  142. delete(stat.slotsStart, account)
  143. }
  144. // report prints the cumulative progress statistic smartly.
  145. func (stat *generateStats) report() {
  146. stat.lock.RLock()
  147. defer stat.lock.RUnlock()
  148. ctx := []interface{}{
  149. "accounts", stat.accounts,
  150. "slots", stat.slots,
  151. "elapsed", common.PrettyDuration(time.Since(stat.start)),
  152. }
  153. if stat.accounts > 0 {
  154. // If there's progress on the account trie, estimate the time to finish crawling it
  155. if done := binary.BigEndian.Uint64(stat.head[:8]) / stat.accounts; done > 0 {
  156. var (
  157. left = (math.MaxUint64 - binary.BigEndian.Uint64(stat.head[:8])) / stat.accounts
  158. speed = done/uint64(time.Since(stat.start)/time.Millisecond+1) + 1 // +1s to avoid division by zero
  159. eta = time.Duration(left/speed) * time.Millisecond
  160. )
  161. // If there are large contract crawls in progress, estimate their finish time
  162. for acc, head := range stat.slotsHead {
  163. start := stat.slotsStart[acc]
  164. if done := binary.BigEndian.Uint64(head[:8]); done > 0 {
  165. var (
  166. left = math.MaxUint64 - binary.BigEndian.Uint64(head[:8])
  167. speed = done/uint64(time.Since(start)/time.Millisecond+1) + 1 // +1s to avoid division by zero
  168. )
  169. // Override the ETA if larger than the largest until now
  170. if slotETA := time.Duration(left/speed) * time.Millisecond; eta < slotETA {
  171. eta = slotETA
  172. }
  173. }
  174. }
  175. ctx = append(ctx, []interface{}{
  176. "eta", common.PrettyDuration(eta),
  177. }...)
  178. }
  179. }
  180. log.Info("Iterating state snapshot", ctx...)
  181. }
  182. // reportDone prints the last log when the whole generation is finished.
  183. func (stat *generateStats) reportDone() {
  184. stat.lock.RLock()
  185. defer stat.lock.RUnlock()
  186. var ctx []interface{}
  187. ctx = append(ctx, []interface{}{"accounts", stat.accounts}...)
  188. if stat.slots != 0 {
  189. ctx = append(ctx, []interface{}{"slots", stat.slots}...)
  190. }
  191. ctx = append(ctx, []interface{}{"elapsed", common.PrettyDuration(time.Since(stat.start))}...)
  192. log.Info("Iterated snapshot", ctx...)
  193. }
  194. // runReport periodically prints the progress information.
  195. func runReport(stats *generateStats, stop chan bool) {
  196. timer := time.NewTimer(0)
  197. defer timer.Stop()
  198. for {
  199. select {
  200. case <-timer.C:
  201. stats.report()
  202. timer.Reset(time.Second * 8)
  203. case success := <-stop:
  204. if success {
  205. stats.reportDone()
  206. }
  207. return
  208. }
  209. }
  210. }
  211. // generateTrieRoot generates the trie hash based on the snapshot iterator.
  212. // It can be used for generating account trie, storage trie or even the
  213. // whole state which connects the accounts and the corresponding storages.
  214. func generateTrieRoot(db ethdb.KeyValueWriter, it Iterator, account common.Hash, generatorFn trieGeneratorFn, leafCallback leafCallbackFn, stats *generateStats, report bool) (common.Hash, error) {
  215. var (
  216. in = make(chan trieKV) // chan to pass leaves
  217. out = make(chan common.Hash, 1) // chan to collect result
  218. stoplog = make(chan bool, 1) // 1-size buffer, works when logging is not enabled
  219. wg sync.WaitGroup
  220. )
  221. // Spin up a go-routine for trie hash re-generation
  222. wg.Add(1)
  223. go func() {
  224. defer wg.Done()
  225. generatorFn(db, in, out)
  226. }()
  227. // Spin up a go-routine for progress logging
  228. if report && stats != nil {
  229. wg.Add(1)
  230. go func() {
  231. defer wg.Done()
  232. runReport(stats, stoplog)
  233. }()
  234. }
  235. // Create a semaphore to assign tasks and collect results through. We'll pre-
  236. // fill it with nils, thus using the same channel for both limiting concurrent
  237. // processing and gathering results.
  238. threads := runtime.NumCPU()
  239. results := make(chan error, threads)
  240. for i := 0; i < threads; i++ {
  241. results <- nil // fill the semaphore
  242. }
  243. // stop is a helper function to shutdown the background threads
  244. // and return the re-generated trie hash.
  245. stop := func(fail error) (common.Hash, error) {
  246. close(in)
  247. result := <-out
  248. for i := 0; i < threads; i++ {
  249. if err := <-results; err != nil && fail == nil {
  250. fail = err
  251. }
  252. }
  253. stoplog <- fail == nil
  254. wg.Wait()
  255. return result, fail
  256. }
  257. var (
  258. logged = time.Now()
  259. processed = uint64(0)
  260. leaf trieKV
  261. )
  262. // Start to feed leaves
  263. for it.Next() {
  264. if account == (common.Hash{}) {
  265. var (
  266. err error
  267. fullData []byte
  268. )
  269. if leafCallback == nil {
  270. fullData, err = FullAccountRLP(it.(AccountIterator).Account())
  271. if err != nil {
  272. return stop(err)
  273. }
  274. } else {
  275. // Wait until the semaphore allows us to continue, aborting if
  276. // a sub-task failed
  277. if err := <-results; err != nil {
  278. results <- nil // stop will drain the results, add a noop back for this error we just consumed
  279. return stop(err)
  280. }
  281. // Fetch the next account and process it concurrently
  282. account, err := FullAccount(it.(AccountIterator).Account())
  283. if err != nil {
  284. return stop(err)
  285. }
  286. hash := it.Hash()
  287. gopool.Submit(func() {
  288. subroot, err := leafCallback(db, hash, common.BytesToHash(account.CodeHash), stats)
  289. if err != nil {
  290. results <- err
  291. return
  292. }
  293. if !bytes.Equal(account.Root, subroot.Bytes()) {
  294. results <- fmt.Errorf("invalid subroot(path %x), want %x, have %x", hash, account.Root, subroot)
  295. return
  296. }
  297. results <- nil
  298. })
  299. fullData, err = rlp.EncodeToBytes(account)
  300. if err != nil {
  301. return stop(err)
  302. }
  303. }
  304. leaf = trieKV{it.Hash(), fullData}
  305. } else {
  306. leaf = trieKV{it.Hash(), common.CopyBytes(it.(StorageIterator).Slot())}
  307. }
  308. in <- leaf
  309. // Accumulate the generation statistic if it's required.
  310. processed++
  311. if time.Since(logged) > 3*time.Second && stats != nil {
  312. if account == (common.Hash{}) {
  313. stats.progressAccounts(it.Hash(), processed)
  314. } else {
  315. stats.progressContract(account, it.Hash(), processed)
  316. }
  317. logged, processed = time.Now(), 0
  318. }
  319. }
  320. // Commit the last part statistic.
  321. if processed > 0 && stats != nil {
  322. if account == (common.Hash{}) {
  323. stats.finishAccounts(processed)
  324. } else {
  325. stats.finishContract(account, processed)
  326. }
  327. }
  328. return stop(nil)
  329. }
  330. func stackTrieGenerate(db ethdb.KeyValueWriter, in chan trieKV, out chan common.Hash) {
  331. t := trie.NewStackTrie(db)
  332. for leaf := range in {
  333. t.TryUpdate(leaf.key[:], leaf.value)
  334. }
  335. var root common.Hash
  336. if db == nil {
  337. root = t.Hash()
  338. } else {
  339. root, _ = t.Commit()
  340. }
  341. out <- root
  342. }