chain_freezer.go 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303
  1. // Copyright 2022 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package rawdb
  17. import (
  18. "fmt"
  19. "sync"
  20. "sync/atomic"
  21. "time"
  22. "github.com/ethereum/go-ethereum/common"
  23. "github.com/ethereum/go-ethereum/ethdb"
  24. "github.com/ethereum/go-ethereum/log"
  25. "github.com/ethereum/go-ethereum/params"
  26. )
  27. const (
  28. // freezerRecheckInterval is the frequency to check the key-value database for
  29. // chain progression that might permit new blocks to be frozen into immutable
  30. // storage.
  31. freezerRecheckInterval = time.Minute
  32. // freezerBatchLimit is the maximum number of blocks to freeze in one batch
  33. // before doing an fsync and deleting it from the key-value store.
  34. freezerBatchLimit = 30000
  35. )
  36. // chainFreezer is a wrapper of freezer with additional chain freezing feature.
  37. // The background thread will keep moving ancient chain segments from key-value
  38. // database to flat files for saving space on live database.
  39. type chainFreezer struct {
  40. // WARNING: The `threshold` field is accessed atomically. On 32 bit platforms, only
  41. // 64-bit aligned fields can be atomic. The struct is guaranteed to be so aligned,
  42. // so take advantage of that (https://golang.org/pkg/sync/atomic/#pkg-note-BUG).
  43. threshold uint64 // Number of recent blocks not to freeze (params.FullImmutabilityThreshold apart from tests)
  44. *Freezer
  45. quit chan struct{}
  46. wg sync.WaitGroup
  47. trigger chan chan struct{} // Manual blocking freeze trigger, test determinism
  48. }
  49. // newChainFreezer initializes the freezer for ancient chain data.
  50. func newChainFreezer(datadir string, namespace string, readonly bool, maxTableSize uint32, tables map[string]bool) (*chainFreezer, error) {
  51. freezer, err := NewFreezer(datadir, namespace, readonly, maxTableSize, tables)
  52. if err != nil {
  53. return nil, err
  54. }
  55. return &chainFreezer{
  56. Freezer: freezer,
  57. threshold: params.FullImmutabilityThreshold,
  58. quit: make(chan struct{}),
  59. trigger: make(chan chan struct{}),
  60. }, nil
  61. }
  62. // Close closes the chain freezer instance and terminates the background thread.
  63. func (f *chainFreezer) Close() error {
  64. err := f.Freezer.Close()
  65. select {
  66. case <-f.quit:
  67. default:
  68. close(f.quit)
  69. }
  70. f.wg.Wait()
  71. return err
  72. }
  73. // freeze is a background thread that periodically checks the blockchain for any
  74. // import progress and moves ancient data from the fast database into the freezer.
  75. //
  76. // This functionality is deliberately broken off from block importing to avoid
  77. // incurring additional data shuffling delays on block propagation.
  78. func (f *chainFreezer) freeze(db ethdb.KeyValueStore) {
  79. nfdb := &nofreezedb{KeyValueStore: db}
  80. var (
  81. backoff bool
  82. triggered chan struct{} // Used in tests
  83. )
  84. for {
  85. select {
  86. case <-f.quit:
  87. log.Info("Freezer shutting down")
  88. return
  89. default:
  90. }
  91. if backoff {
  92. // If we were doing a manual trigger, notify it
  93. if triggered != nil {
  94. triggered <- struct{}{}
  95. triggered = nil
  96. }
  97. select {
  98. case <-time.NewTimer(freezerRecheckInterval).C:
  99. backoff = false
  100. case triggered = <-f.trigger:
  101. backoff = false
  102. case <-f.quit:
  103. return
  104. }
  105. }
  106. // Retrieve the freezing threshold.
  107. hash := ReadHeadBlockHash(nfdb)
  108. if hash == (common.Hash{}) {
  109. log.Debug("Current full block hash unavailable") // new chain, empty database
  110. backoff = true
  111. continue
  112. }
  113. number := ReadHeaderNumber(nfdb, hash)
  114. threshold := atomic.LoadUint64(&f.threshold)
  115. frozen := atomic.LoadUint64(&f.frozen)
  116. switch {
  117. case number == nil:
  118. log.Error("Current full block number unavailable", "hash", hash)
  119. backoff = true
  120. continue
  121. case *number < threshold:
  122. log.Debug("Current full block not old enough", "number", *number, "hash", hash, "delay", threshold)
  123. backoff = true
  124. continue
  125. case *number-threshold <= frozen:
  126. log.Debug("Ancient blocks frozen already", "number", *number, "hash", hash, "frozen", frozen)
  127. backoff = true
  128. continue
  129. }
  130. head := ReadHeader(nfdb, hash, *number)
  131. if head == nil {
  132. log.Error("Current full block unavailable", "number", *number, "hash", hash)
  133. backoff = true
  134. continue
  135. }
  136. // Seems we have data ready to be frozen, process in usable batches
  137. var (
  138. start = time.Now()
  139. first, _ = f.Ancients()
  140. limit = *number - threshold
  141. )
  142. if limit-first > freezerBatchLimit {
  143. limit = first + freezerBatchLimit
  144. }
  145. ancients, err := f.freezeRange(nfdb, first, limit)
  146. if err != nil {
  147. log.Error("Error in block freeze operation", "err", err)
  148. backoff = true
  149. continue
  150. }
  151. // Batch of blocks have been frozen, flush them before wiping from leveldb
  152. if err := f.Sync(); err != nil {
  153. log.Crit("Failed to flush frozen tables", "err", err)
  154. }
  155. // Wipe out all data from the active database
  156. batch := db.NewBatch()
  157. for i := 0; i < len(ancients); i++ {
  158. // Always keep the genesis block in active database
  159. if first+uint64(i) != 0 {
  160. DeleteBlockWithoutNumber(batch, ancients[i], first+uint64(i))
  161. DeleteCanonicalHash(batch, first+uint64(i))
  162. }
  163. }
  164. if err := batch.Write(); err != nil {
  165. log.Crit("Failed to delete frozen canonical blocks", "err", err)
  166. }
  167. batch.Reset()
  168. // Wipe out side chains also and track dangling side chains
  169. var dangling []common.Hash
  170. frozen = atomic.LoadUint64(&f.frozen) // Needs reload after during freezeRange
  171. for number := first; number < frozen; number++ {
  172. // Always keep the genesis block in active database
  173. if number != 0 {
  174. dangling = ReadAllHashes(db, number)
  175. for _, hash := range dangling {
  176. log.Trace("Deleting side chain", "number", number, "hash", hash)
  177. DeleteBlock(batch, hash, number)
  178. }
  179. }
  180. }
  181. if err := batch.Write(); err != nil {
  182. log.Crit("Failed to delete frozen side blocks", "err", err)
  183. }
  184. batch.Reset()
  185. // Step into the future and delete and dangling side chains
  186. if frozen > 0 {
  187. tip := frozen
  188. for len(dangling) > 0 {
  189. drop := make(map[common.Hash]struct{})
  190. for _, hash := range dangling {
  191. log.Debug("Dangling parent from Freezer", "number", tip-1, "hash", hash)
  192. drop[hash] = struct{}{}
  193. }
  194. children := ReadAllHashes(db, tip)
  195. for i := 0; i < len(children); i++ {
  196. // Dig up the child and ensure it's dangling
  197. child := ReadHeader(nfdb, children[i], tip)
  198. if child == nil {
  199. log.Error("Missing dangling header", "number", tip, "hash", children[i])
  200. continue
  201. }
  202. if _, ok := drop[child.ParentHash]; !ok {
  203. children = append(children[:i], children[i+1:]...)
  204. i--
  205. continue
  206. }
  207. // Delete all block data associated with the child
  208. log.Debug("Deleting dangling block", "number", tip, "hash", children[i], "parent", child.ParentHash)
  209. DeleteBlock(batch, children[i], tip)
  210. }
  211. dangling = children
  212. tip++
  213. }
  214. if err := batch.Write(); err != nil {
  215. log.Crit("Failed to delete dangling side blocks", "err", err)
  216. }
  217. }
  218. // Log something friendly for the user
  219. context := []interface{}{
  220. "blocks", frozen - first, "elapsed", common.PrettyDuration(time.Since(start)), "number", frozen - 1,
  221. }
  222. if n := len(ancients); n > 0 {
  223. context = append(context, []interface{}{"hash", ancients[n-1]}...)
  224. }
  225. log.Debug("Deep froze chain segment", context...)
  226. // Avoid database thrashing with tiny writes
  227. if frozen-first < freezerBatchLimit {
  228. backoff = true
  229. }
  230. }
  231. }
  232. func (f *chainFreezer) freezeRange(nfdb *nofreezedb, number, limit uint64) (hashes []common.Hash, err error) {
  233. hashes = make([]common.Hash, 0, limit-number)
  234. _, err = f.ModifyAncients(func(op ethdb.AncientWriteOp) error {
  235. for ; number <= limit; number++ {
  236. // Retrieve all the components of the canonical block.
  237. hash := ReadCanonicalHash(nfdb, number)
  238. if hash == (common.Hash{}) {
  239. return fmt.Errorf("canonical hash missing, can't freeze block %d", number)
  240. }
  241. header := ReadHeaderRLP(nfdb, hash, number)
  242. if len(header) == 0 {
  243. return fmt.Errorf("block header missing, can't freeze block %d", number)
  244. }
  245. body := ReadBodyRLP(nfdb, hash, number)
  246. if len(body) == 0 {
  247. return fmt.Errorf("block body missing, can't freeze block %d", number)
  248. }
  249. receipts := ReadReceiptsRLP(nfdb, hash, number)
  250. if len(receipts) == 0 {
  251. return fmt.Errorf("block receipts missing, can't freeze block %d", number)
  252. }
  253. td := ReadTdRLP(nfdb, hash, number)
  254. if len(td) == 0 {
  255. return fmt.Errorf("total difficulty missing, can't freeze block %d", number)
  256. }
  257. // Write to the batch.
  258. if err := op.AppendRaw(chainFreezerHashTable, number, hash[:]); err != nil {
  259. return fmt.Errorf("can't write hash to Freezer: %v", err)
  260. }
  261. if err := op.AppendRaw(chainFreezerHeaderTable, number, header); err != nil {
  262. return fmt.Errorf("can't write header to Freezer: %v", err)
  263. }
  264. if err := op.AppendRaw(chainFreezerBodiesTable, number, body); err != nil {
  265. return fmt.Errorf("can't write body to Freezer: %v", err)
  266. }
  267. if err := op.AppendRaw(chainFreezerReceiptTable, number, receipts); err != nil {
  268. return fmt.Errorf("can't write receipts to Freezer: %v", err)
  269. }
  270. if err := op.AppendRaw(chainFreezerDifficultyTable, number, td); err != nil {
  271. return fmt.Errorf("can't write td to Freezer: %v", err)
  272. }
  273. hashes = append(hashes, hash)
  274. }
  275. return nil
  276. })
  277. return hashes, err
  278. }