freezer_reinit.go 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127
  1. // Copyright 2019 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package rawdb
  17. import (
  18. "errors"
  19. "runtime"
  20. "sync/atomic"
  21. "time"
  22. "github.com/ethereum/go-ethereum/common"
  23. "github.com/ethereum/go-ethereum/common/prque"
  24. "github.com/ethereum/go-ethereum/core/types"
  25. "github.com/ethereum/go-ethereum/ethdb"
  26. "github.com/ethereum/go-ethereum/log"
  27. )
  28. // InitDatabaseFromFreezer reinitializes an empty database from a previous batch
  29. // of frozen ancient blocks. The method iterates over all the frozen blocks and
  30. // injects into the database the block hash->number mappings and the transaction
  31. // lookup entries.
  32. func InitDatabaseFromFreezer(db ethdb.Database) error {
  33. // If we can't access the freezer or it's empty, abort
  34. frozen, err := db.Ancients()
  35. if err != nil || frozen == 0 {
  36. return err
  37. }
  38. // Blocks previously frozen, iterate over- and hash them concurrently
  39. var (
  40. number = ^uint64(0) // -1
  41. results = make(chan *types.Block, 4*runtime.NumCPU())
  42. )
  43. abort := make(chan struct{})
  44. defer close(abort)
  45. for i := 0; i < runtime.NumCPU(); i++ {
  46. go func() {
  47. for {
  48. // Fetch the next task number, terminating if everything's done
  49. n := atomic.AddUint64(&number, 1)
  50. if n >= frozen {
  51. return
  52. }
  53. // Retrieve the block from the freezer. If successful, pre-cache
  54. // the block hash and the individual transaction hashes for storing
  55. // into the database.
  56. block := ReadBlock(db, ReadCanonicalHash(db, n), n)
  57. if block != nil {
  58. block.Hash()
  59. for _, tx := range block.Transactions() {
  60. tx.Hash()
  61. }
  62. }
  63. // Feed the block to the aggregator, or abort on interrupt
  64. select {
  65. case results <- block:
  66. case <-abort:
  67. return
  68. }
  69. }
  70. }()
  71. }
  72. // Reassemble the blocks into a contiguous stream and push them out to disk
  73. var (
  74. queue = prque.New(nil)
  75. next = int64(0)
  76. batch = db.NewBatch()
  77. start = time.Now()
  78. logged time.Time
  79. )
  80. for i := uint64(0); i < frozen; i++ {
  81. // Retrieve the next result and bail if it's nil
  82. block := <-results
  83. if block == nil {
  84. return errors.New("broken ancient database")
  85. }
  86. // Push the block into the import queue and process contiguous ranges
  87. queue.Push(block, -int64(block.NumberU64()))
  88. for !queue.Empty() {
  89. // If the next available item is gapped, return
  90. if _, priority := queue.Peek(); -priority != next {
  91. break
  92. }
  93. // Next block available, pop it off and index it
  94. block = queue.PopItem().(*types.Block)
  95. next++
  96. // Inject hash<->number mapping and txlookup indexes
  97. WriteHeaderNumber(batch, block.Hash(), block.NumberU64())
  98. WriteTxLookupEntries(batch, block)
  99. // If enough data was accumulated in memory or we're at the last block, dump to disk
  100. if batch.ValueSize() > ethdb.IdealBatchSize || uint64(next) == frozen {
  101. if err := batch.Write(); err != nil {
  102. return err
  103. }
  104. batch.Reset()
  105. }
  106. // If we've spent too much time already, notify the user of what we're doing
  107. if time.Since(logged) > 8*time.Second {
  108. log.Info("Initializing chain from ancient data", "number", block.Number(), "hash", block.Hash(), "total", frozen-1, "elapsed", common.PrettyDuration(time.Since(start)))
  109. logged = time.Now()
  110. }
  111. }
  112. }
  113. hash := ReadCanonicalHash(db, frozen-1)
  114. WriteHeadHeaderHash(db, hash)
  115. WriteHeadFastBlockHash(db, hash)
  116. log.Info("Initialized chain from ancient data", "number", frozen-1, "hash", hash, "elapsed", common.PrettyDuration(time.Since(start)))
  117. return nil
  118. }