chain_iterator.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354
  1. // Copyright 2020 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package rawdb
  17. import (
  18. "runtime"
  19. "sync/atomic"
  20. "time"
  21. "github.com/ethereum/go-ethereum/common"
  22. "github.com/ethereum/go-ethereum/common/prque"
  23. "github.com/ethereum/go-ethereum/core/types"
  24. "github.com/ethereum/go-ethereum/ethdb"
  25. "github.com/ethereum/go-ethereum/log"
  26. "github.com/ethereum/go-ethereum/rlp"
  27. )
  28. // InitDatabaseFromFreezer reinitializes an empty database from a previous batch
  29. // of frozen ancient blocks. The method iterates over all the frozen blocks and
  30. // injects into the database the block hash->number mappings.
  31. func InitDatabaseFromFreezer(db ethdb.Database) {
  32. // If we can't access the freezer or it's empty, abort
  33. frozen, err := db.Ancients()
  34. if err != nil || frozen == 0 {
  35. return
  36. }
  37. var (
  38. batch = db.NewBatch()
  39. start = time.Now()
  40. logged = start.Add(-7 * time.Second) // Unindex during import is fast, don't double log
  41. hash common.Hash
  42. )
  43. for i := uint64(0); i < frozen; {
  44. // We read 100K hashes at a time, for a total of 3.2M
  45. count := uint64(100_000)
  46. if i+count > frozen {
  47. count = frozen - i
  48. }
  49. data, err := db.AncientRange(chainFreezerHashTable, i, count, 32*count)
  50. if err != nil {
  51. log.Crit("Failed to init database from freezer", "err", err)
  52. }
  53. for j, h := range data {
  54. number := i + uint64(j)
  55. hash = common.BytesToHash(h)
  56. WriteHeaderNumber(batch, hash, number)
  57. // If enough data was accumulated in memory or we're at the last block, dump to disk
  58. if batch.ValueSize() > ethdb.IdealBatchSize {
  59. if err := batch.Write(); err != nil {
  60. log.Crit("Failed to write data to db", "err", err)
  61. }
  62. batch.Reset()
  63. }
  64. }
  65. i += uint64(len(data))
  66. // If we've spent too much time already, notify the user of what we're doing
  67. if time.Since(logged) > 8*time.Second {
  68. log.Info("Initializing database from freezer", "total", frozen, "number", i, "hash", hash, "elapsed", common.PrettyDuration(time.Since(start)))
  69. logged = time.Now()
  70. }
  71. }
  72. if err := batch.Write(); err != nil {
  73. log.Crit("Failed to write data to db", "err", err)
  74. }
  75. batch.Reset()
  76. WriteHeadHeaderHash(db, hash)
  77. WriteHeadFastBlockHash(db, hash)
  78. log.Info("Initialized database from freezer", "blocks", frozen, "elapsed", common.PrettyDuration(time.Since(start)))
  79. }
  80. type blockTxHashes struct {
  81. number uint64
  82. hashes []common.Hash
  83. }
  84. // iterateTransactions iterates over all transactions in the (canon) block
  85. // number(s) given, and yields the hashes on a channel. If there is a signal
  86. // received from interrupt channel, the iteration will be aborted and result
  87. // channel will be closed.
  88. func iterateTransactions(db ethdb.Database, from uint64, to uint64, reverse bool, interrupt chan struct{}) chan *blockTxHashes {
  89. // One thread sequentially reads data from db
  90. type numberRlp struct {
  91. number uint64
  92. rlp rlp.RawValue
  93. }
  94. if to == from {
  95. return nil
  96. }
  97. threads := to - from
  98. if cpus := runtime.NumCPU(); threads > uint64(cpus) {
  99. threads = uint64(cpus)
  100. }
  101. var (
  102. rlpCh = make(chan *numberRlp, threads*2) // we send raw rlp over this channel
  103. hashesCh = make(chan *blockTxHashes, threads*2) // send hashes over hashesCh
  104. )
  105. // lookup runs in one instance
  106. lookup := func() {
  107. n, end := from, to
  108. if reverse {
  109. n, end = to-1, from-1
  110. }
  111. defer close(rlpCh)
  112. for n != end {
  113. data := ReadCanonicalBodyRLP(db, n)
  114. // Feed the block to the aggregator, or abort on interrupt
  115. select {
  116. case rlpCh <- &numberRlp{n, data}:
  117. case <-interrupt:
  118. return
  119. }
  120. if reverse {
  121. n--
  122. } else {
  123. n++
  124. }
  125. }
  126. }
  127. // process runs in parallel
  128. nThreadsAlive := int32(threads)
  129. process := func() {
  130. defer func() {
  131. // Last processor closes the result channel
  132. if atomic.AddInt32(&nThreadsAlive, -1) == 0 {
  133. close(hashesCh)
  134. }
  135. }()
  136. for data := range rlpCh {
  137. var body types.Body
  138. if err := rlp.DecodeBytes(data.rlp, &body); err != nil {
  139. log.Warn("Failed to decode block body", "block", data.number, "error", err)
  140. return
  141. }
  142. var hashes []common.Hash
  143. for _, tx := range body.Transactions {
  144. hashes = append(hashes, tx.Hash())
  145. }
  146. result := &blockTxHashes{
  147. hashes: hashes,
  148. number: data.number,
  149. }
  150. // Feed the block to the aggregator, or abort on interrupt
  151. select {
  152. case hashesCh <- result:
  153. case <-interrupt:
  154. return
  155. }
  156. }
  157. }
  158. go lookup() // start the sequential db accessor
  159. for i := 0; i < int(threads); i++ {
  160. go process()
  161. }
  162. return hashesCh
  163. }
  164. // indexTransactions creates txlookup indices of the specified block range.
  165. //
  166. // This function iterates canonical chain in reverse order, it has one main advantage:
  167. // We can write tx index tail flag periodically even without the whole indexing
  168. // procedure is finished. So that we can resume indexing procedure next time quickly.
  169. //
  170. // There is a passed channel, the whole procedure will be interrupted if any
  171. // signal received.
  172. func indexTransactions(db ethdb.Database, from uint64, to uint64, interrupt chan struct{}, hook func(uint64) bool) {
  173. // short circuit for invalid range
  174. if from >= to {
  175. return
  176. }
  177. var (
  178. hashesCh = iterateTransactions(db, from, to, true, interrupt)
  179. batch = db.NewBatch()
  180. start = time.Now()
  181. logged = start.Add(-7 * time.Second)
  182. // Since we iterate in reverse, we expect the first number to come
  183. // in to be [to-1]. Therefore, setting lastNum to means that the
  184. // prqueue gap-evaluation will work correctly
  185. lastNum = to
  186. queue = prque.New(nil)
  187. // for stats reporting
  188. blocks, txs = 0, 0
  189. )
  190. for chanDelivery := range hashesCh {
  191. // Push the delivery into the queue and process contiguous ranges.
  192. // Since we iterate in reverse, so lower numbers have lower prio, and
  193. // we can use the number directly as prio marker
  194. queue.Push(chanDelivery, int64(chanDelivery.number))
  195. for !queue.Empty() {
  196. // If the next available item is gapped, return
  197. if _, priority := queue.Peek(); priority != int64(lastNum-1) {
  198. break
  199. }
  200. // For testing
  201. if hook != nil && !hook(lastNum-1) {
  202. break
  203. }
  204. // Next block available, pop it off and index it
  205. delivery := queue.PopItem().(*blockTxHashes)
  206. lastNum = delivery.number
  207. WriteTxLookupEntries(batch, delivery.number, delivery.hashes)
  208. blocks++
  209. txs += len(delivery.hashes)
  210. // If enough data was accumulated in memory or we're at the last block, dump to disk
  211. if batch.ValueSize() > ethdb.IdealBatchSize {
  212. WriteTxIndexTail(batch, lastNum) // Also write the tail here
  213. if err := batch.Write(); err != nil {
  214. log.Crit("Failed writing batch to db", "error", err)
  215. return
  216. }
  217. batch.Reset()
  218. }
  219. // If we've spent too much time already, notify the user of what we're doing
  220. if time.Since(logged) > 8*time.Second {
  221. log.Info("Indexing transactions", "blocks", blocks, "txs", txs, "tail", lastNum, "total", to-from, "elapsed", common.PrettyDuration(time.Since(start)))
  222. logged = time.Now()
  223. }
  224. }
  225. }
  226. // Flush the new indexing tail and the last committed data. It can also happen
  227. // that the last batch is empty because nothing to index, but the tail has to
  228. // be flushed anyway.
  229. WriteTxIndexTail(batch, lastNum)
  230. if err := batch.Write(); err != nil {
  231. log.Crit("Failed writing batch to db", "error", err)
  232. return
  233. }
  234. select {
  235. case <-interrupt:
  236. log.Debug("Transaction indexing interrupted", "blocks", blocks, "txs", txs, "tail", lastNum, "elapsed", common.PrettyDuration(time.Since(start)))
  237. default:
  238. log.Info("Indexed transactions", "blocks", blocks, "txs", txs, "tail", lastNum, "elapsed", common.PrettyDuration(time.Since(start)))
  239. }
  240. }
  241. // IndexTransactions creates txlookup indices of the specified block range. The from
  242. // is included while to is excluded.
  243. //
  244. // This function iterates canonical chain in reverse order, it has one main advantage:
  245. // We can write tx index tail flag periodically even without the whole indexing
  246. // procedure is finished. So that we can resume indexing procedure next time quickly.
  247. //
  248. // There is a passed channel, the whole procedure will be interrupted if any
  249. // signal received.
  250. func IndexTransactions(db ethdb.Database, from uint64, to uint64, interrupt chan struct{}) {
  251. indexTransactions(db, from, to, interrupt, nil)
  252. }
  253. // indexTransactionsForTesting is the internal debug version with an additional hook.
  254. func indexTransactionsForTesting(db ethdb.Database, from uint64, to uint64, interrupt chan struct{}, hook func(uint64) bool) {
  255. indexTransactions(db, from, to, interrupt, hook)
  256. }
  257. // unindexTransactions removes txlookup indices of the specified block range.
  258. //
  259. // There is a passed channel, the whole procedure will be interrupted if any
  260. // signal received.
  261. func unindexTransactions(db ethdb.Database, from uint64, to uint64, interrupt chan struct{}, hook func(uint64) bool) {
  262. // short circuit for invalid range
  263. if from >= to {
  264. return
  265. }
  266. var (
  267. hashesCh = iterateTransactions(db, from, to, false, interrupt)
  268. batch = db.NewBatch()
  269. start = time.Now()
  270. logged = start.Add(-7 * time.Second)
  271. // we expect the first number to come in to be [from]. Therefore, setting
  272. // nextNum to from means that the prqueue gap-evaluation will work correctly
  273. nextNum = from
  274. queue = prque.New(nil)
  275. // for stats reporting
  276. blocks, txs = 0, 0
  277. )
  278. // Otherwise spin up the concurrent iterator and unindexer
  279. for delivery := range hashesCh {
  280. // Push the delivery into the queue and process contiguous ranges.
  281. queue.Push(delivery, -int64(delivery.number))
  282. for !queue.Empty() {
  283. // If the next available item is gapped, return
  284. if _, priority := queue.Peek(); -priority != int64(nextNum) {
  285. break
  286. }
  287. // For testing
  288. if hook != nil && !hook(nextNum) {
  289. break
  290. }
  291. delivery := queue.PopItem().(*blockTxHashes)
  292. nextNum = delivery.number + 1
  293. DeleteTxLookupEntries(batch, delivery.hashes)
  294. txs += len(delivery.hashes)
  295. blocks++
  296. // If enough data was accumulated in memory or we're at the last block, dump to disk
  297. // A batch counts the size of deletion as '1', so we need to flush more
  298. // often than that.
  299. if blocks%1000 == 0 {
  300. WriteTxIndexTail(batch, nextNum)
  301. if err := batch.Write(); err != nil {
  302. log.Crit("Failed writing batch to db", "error", err)
  303. return
  304. }
  305. batch.Reset()
  306. }
  307. // If we've spent too much time already, notify the user of what we're doing
  308. if time.Since(logged) > 8*time.Second {
  309. log.Info("Unindexing transactions", "blocks", blocks, "txs", txs, "total", to-from, "elapsed", common.PrettyDuration(time.Since(start)))
  310. logged = time.Now()
  311. }
  312. }
  313. }
  314. // Flush the new indexing tail and the last committed data. It can also happen
  315. // that the last batch is empty because nothing to unindex, but the tail has to
  316. // be flushed anyway.
  317. WriteTxIndexTail(batch, nextNum)
  318. if err := batch.Write(); err != nil {
  319. log.Crit("Failed writing batch to db", "error", err)
  320. return
  321. }
  322. select {
  323. case <-interrupt:
  324. log.Debug("Transaction unindexing interrupted", "blocks", blocks, "txs", txs, "tail", to, "elapsed", common.PrettyDuration(time.Since(start)))
  325. default:
  326. log.Info("Unindexed transactions", "blocks", blocks, "txs", txs, "tail", to, "elapsed", common.PrettyDuration(time.Since(start)))
  327. }
  328. }
  329. // UnindexTransactions removes txlookup indices of the specified block range.
  330. // The from is included while to is excluded.
  331. //
  332. // There is a passed channel, the whole procedure will be interrupted if any
  333. // signal received.
  334. func UnindexTransactions(db ethdb.Database, from uint64, to uint64, interrupt chan struct{}) {
  335. unindexTransactions(db, from, to, interrupt, nil)
  336. }
  337. // unindexTransactionsForTesting is the internal debug version with an additional hook.
  338. func unindexTransactionsForTesting(db ethdb.Database, from uint64, to uint64, interrupt chan struct{}, hook func(uint64) bool) {
  339. unindexTransactions(db, from, to, interrupt, hook)
  340. }