chain_iterator.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350
  1. // Copyright 2019 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package rawdb
  17. import (
  18. "runtime"
  19. "sync/atomic"
  20. "time"
  21. "github.com/ethereum/go-ethereum/common"
  22. "github.com/ethereum/go-ethereum/common/gopool"
  23. "github.com/ethereum/go-ethereum/common/prque"
  24. "github.com/ethereum/go-ethereum/core/types"
  25. "github.com/ethereum/go-ethereum/ethdb"
  26. "github.com/ethereum/go-ethereum/log"
  27. "github.com/ethereum/go-ethereum/rlp"
  28. )
  29. // InitDatabaseFromFreezer reinitializes an empty database from a previous batch
  30. // of frozen ancient blocks. The method iterates over all the frozen blocks and
  31. // injects into the database the block hash->number mappings.
  32. func InitDatabaseFromFreezer(db ethdb.Database) {
  33. // If we can't access the freezer or it's empty, abort
  34. frozen, err := db.Ancients()
  35. if err != nil || frozen == 0 {
  36. return
  37. }
  38. var (
  39. batch = db.NewBatch()
  40. start = time.Now()
  41. logged = start.Add(-7 * time.Second) // Unindex during import is fast, don't double log
  42. hash common.Hash
  43. )
  44. for i := uint64(0); i < frozen; i++ {
  45. // Since the freezer has all data in sequential order on a file,
  46. // it would be 'neat' to read more data in one go, and let the
  47. // freezerdb return N items (e.g up to 1000 items per go)
  48. // That would require an API change in Ancients though
  49. if h, err := db.Ancient(freezerHashTable, i); err != nil {
  50. log.Crit("Failed to init database from freezer", "err", err)
  51. } else {
  52. hash = common.BytesToHash(h)
  53. }
  54. WriteHeaderNumber(batch, hash, i)
  55. // If enough data was accumulated in memory or we're at the last block, dump to disk
  56. if batch.ValueSize() > ethdb.IdealBatchSize {
  57. if err := batch.Write(); err != nil {
  58. log.Crit("Failed to write data to db", "err", err)
  59. }
  60. batch.Reset()
  61. }
  62. // If we've spent too much time already, notify the user of what we're doing
  63. if time.Since(logged) > 8*time.Second {
  64. log.Info("Initializing database from freezer", "total", frozen, "number", i, "hash", hash, "elapsed", common.PrettyDuration(time.Since(start)))
  65. logged = time.Now()
  66. }
  67. }
  68. if err := batch.Write(); err != nil {
  69. log.Crit("Failed to write data to db", "err", err)
  70. }
  71. batch.Reset()
  72. WriteHeadHeaderHash(db, hash)
  73. WriteHeadFastBlockHash(db, hash)
  74. log.Info("Initialized database from freezer", "blocks", frozen, "elapsed", common.PrettyDuration(time.Since(start)))
  75. }
  76. type blockTxHashes struct {
  77. number uint64
  78. hashes []common.Hash
  79. }
  80. // iterateTransactions iterates over all transactions in the (canon) block
  81. // number(s) given, and yields the hashes on a channel. If there is a signal
  82. // received from interrupt channel, the iteration will be aborted and result
  83. // channel will be closed.
  84. func iterateTransactions(db ethdb.Database, from uint64, to uint64, reverse bool, interrupt chan struct{}) chan *blockTxHashes {
  85. // One thread sequentially reads data from db
  86. type numberRlp struct {
  87. number uint64
  88. rlp rlp.RawValue
  89. }
  90. if to == from {
  91. return nil
  92. }
  93. threads := to - from
  94. if cpus := runtime.NumCPU(); threads > uint64(cpus) {
  95. threads = uint64(cpus)
  96. }
  97. var (
  98. rlpCh = make(chan *numberRlp, threads*2) // we send raw rlp over this channel
  99. hashesCh = make(chan *blockTxHashes, threads*2) // send hashes over hashesCh
  100. )
  101. // lookup runs in one instance
  102. lookup := func() {
  103. n, end := from, to
  104. if reverse {
  105. n, end = to-1, from-1
  106. }
  107. defer close(rlpCh)
  108. for n != end {
  109. data := ReadCanonicalBodyRLP(db, n)
  110. // Feed the block to the aggregator, or abort on interrupt
  111. select {
  112. case rlpCh <- &numberRlp{n, data}:
  113. case <-interrupt:
  114. return
  115. }
  116. if reverse {
  117. n--
  118. } else {
  119. n++
  120. }
  121. }
  122. }
  123. // process runs in parallel
  124. nThreadsAlive := int32(threads)
  125. process := func() {
  126. defer func() {
  127. // Last processor closes the result channel
  128. if atomic.AddInt32(&nThreadsAlive, -1) == 0 {
  129. close(hashesCh)
  130. }
  131. }()
  132. for data := range rlpCh {
  133. var body types.Body
  134. if err := rlp.DecodeBytes(data.rlp, &body); err != nil {
  135. log.Warn("Failed to decode block body", "block", data.number, "error", err)
  136. return
  137. }
  138. var hashes []common.Hash
  139. for _, tx := range body.Transactions {
  140. hashes = append(hashes, tx.Hash())
  141. }
  142. result := &blockTxHashes{
  143. hashes: hashes,
  144. number: data.number,
  145. }
  146. // Feed the block to the aggregator, or abort on interrupt
  147. select {
  148. case hashesCh <- result:
  149. case <-interrupt:
  150. return
  151. }
  152. }
  153. }
  154. go lookup() // start the sequential db accessor
  155. for i := 0; i < int(threads); i++ {
  156. gopool.Submit(func() {
  157. process()
  158. })
  159. }
  160. return hashesCh
  161. }
  162. // indexTransactions creates txlookup indices of the specified block range.
  163. //
  164. // This function iterates canonical chain in reverse order, it has one main advantage:
  165. // We can write tx index tail flag periodically even without the whole indexing
  166. // procedure is finished. So that we can resume indexing procedure next time quickly.
  167. //
  168. // There is a passed channel, the whole procedure will be interrupted if any
  169. // signal received.
  170. func indexTransactions(db ethdb.Database, from uint64, to uint64, interrupt chan struct{}, hook func(uint64) bool) {
  171. // short circuit for invalid range
  172. if from >= to {
  173. return
  174. }
  175. var (
  176. hashesCh = iterateTransactions(db, from, to, true, interrupt)
  177. batch = db.NewBatch()
  178. start = time.Now()
  179. logged = start.Add(-7 * time.Second)
  180. // Since we iterate in reverse, we expect the first number to come
  181. // in to be [to-1]. Therefore, setting lastNum to means that the
  182. // prqueue gap-evaluation will work correctly
  183. lastNum = to
  184. queue = prque.New(nil)
  185. // for stats reporting
  186. blocks, txs = 0, 0
  187. )
  188. for chanDelivery := range hashesCh {
  189. // Push the delivery into the queue and process contiguous ranges.
  190. // Since we iterate in reverse, so lower numbers have lower prio, and
  191. // we can use the number directly as prio marker
  192. queue.Push(chanDelivery, int64(chanDelivery.number))
  193. for !queue.Empty() {
  194. // If the next available item is gapped, return
  195. if _, priority := queue.Peek(); priority != int64(lastNum-1) {
  196. break
  197. }
  198. // For testing
  199. if hook != nil && !hook(lastNum-1) {
  200. break
  201. }
  202. // Next block available, pop it off and index it
  203. delivery := queue.PopItem().(*blockTxHashes)
  204. lastNum = delivery.number
  205. WriteTxLookupEntries(batch, delivery.number, delivery.hashes)
  206. blocks++
  207. txs += len(delivery.hashes)
  208. // If enough data was accumulated in memory or we're at the last block, dump to disk
  209. if batch.ValueSize() > ethdb.IdealBatchSize {
  210. WriteTxIndexTail(batch, lastNum) // Also write the tail here
  211. if err := batch.Write(); err != nil {
  212. log.Crit("Failed writing batch to db", "error", err)
  213. return
  214. }
  215. batch.Reset()
  216. }
  217. // If we've spent too much time already, notify the user of what we're doing
  218. if time.Since(logged) > 8*time.Second {
  219. log.Info("Indexing transactions", "blocks", blocks, "txs", txs, "tail", lastNum, "total", to-from, "elapsed", common.PrettyDuration(time.Since(start)))
  220. logged = time.Now()
  221. }
  222. }
  223. }
  224. // Flush the new indexing tail and the last committed data. It can also happen
  225. // that the last batch is empty because nothing to index, but the tail has to
  226. // be flushed anyway.
  227. WriteTxIndexTail(batch, lastNum)
  228. if err := batch.Write(); err != nil {
  229. log.Crit("Failed writing batch to db", "error", err)
  230. return
  231. }
  232. select {
  233. case <-interrupt:
  234. log.Debug("Transaction indexing interrupted", "blocks", blocks, "txs", txs, "tail", lastNum, "elapsed", common.PrettyDuration(time.Since(start)))
  235. default:
  236. log.Info("Indexed transactions", "blocks", blocks, "txs", txs, "tail", lastNum, "elapsed", common.PrettyDuration(time.Since(start)))
  237. }
  238. }
  239. // IndexTransactions creates txlookup indices of the specified block range.
  240. //
  241. // This function iterates canonical chain in reverse order, it has one main advantage:
  242. // We can write tx index tail flag periodically even without the whole indexing
  243. // procedure is finished. So that we can resume indexing procedure next time quickly.
  244. //
  245. // There is a passed channel, the whole procedure will be interrupted if any
  246. // signal received.
  247. func IndexTransactions(db ethdb.Database, from uint64, to uint64, interrupt chan struct{}) {
  248. indexTransactions(db, from, to, interrupt, nil)
  249. }
  250. // indexTransactionsForTesting is the internal debug version with an additional hook.
  251. func indexTransactionsForTesting(db ethdb.Database, from uint64, to uint64, interrupt chan struct{}, hook func(uint64) bool) {
  252. indexTransactions(db, from, to, interrupt, hook)
  253. }
  254. // unindexTransactions removes txlookup indices of the specified block range.
  255. //
  256. // There is a passed channel, the whole procedure will be interrupted if any
  257. // signal received.
  258. func unindexTransactions(db ethdb.Database, from uint64, to uint64, interrupt chan struct{}, hook func(uint64) bool) {
  259. // short circuit for invalid range
  260. if from >= to {
  261. return
  262. }
  263. var (
  264. hashesCh = iterateTransactions(db, from, to, false, interrupt)
  265. batch = db.NewBatch()
  266. start = time.Now()
  267. logged = start.Add(-7 * time.Second)
  268. // we expect the first number to come in to be [from]. Therefore, setting
  269. // nextNum to from means that the prqueue gap-evaluation will work correctly
  270. nextNum = from
  271. queue = prque.New(nil)
  272. // for stats reporting
  273. blocks, txs = 0, 0
  274. )
  275. // Otherwise spin up the concurrent iterator and unindexer
  276. for delivery := range hashesCh {
  277. // Push the delivery into the queue and process contiguous ranges.
  278. queue.Push(delivery, -int64(delivery.number))
  279. for !queue.Empty() {
  280. // If the next available item is gapped, return
  281. if _, priority := queue.Peek(); -priority != int64(nextNum) {
  282. break
  283. }
  284. // For testing
  285. if hook != nil && !hook(nextNum) {
  286. break
  287. }
  288. delivery := queue.PopItem().(*blockTxHashes)
  289. nextNum = delivery.number + 1
  290. DeleteTxLookupEntries(batch, delivery.hashes)
  291. txs += len(delivery.hashes)
  292. blocks++
  293. // If enough data was accumulated in memory or we're at the last block, dump to disk
  294. // A batch counts the size of deletion as '1', so we need to flush more
  295. // often than that.
  296. if blocks%1000 == 0 {
  297. WriteTxIndexTail(batch, nextNum)
  298. if err := batch.Write(); err != nil {
  299. log.Crit("Failed writing batch to db", "error", err)
  300. return
  301. }
  302. batch.Reset()
  303. }
  304. // If we've spent too much time already, notify the user of what we're doing
  305. if time.Since(logged) > 8*time.Second {
  306. log.Info("Unindexing transactions", "blocks", blocks, "txs", txs, "total", to-from, "elapsed", common.PrettyDuration(time.Since(start)))
  307. logged = time.Now()
  308. }
  309. }
  310. }
  311. // Flush the new indexing tail and the last committed data. It can also happen
  312. // that the last batch is empty because nothing to unindex, but the tail has to
  313. // be flushed anyway.
  314. WriteTxIndexTail(batch, nextNum)
  315. if err := batch.Write(); err != nil {
  316. log.Crit("Failed writing batch to db", "error", err)
  317. return
  318. }
  319. select {
  320. case <-interrupt:
  321. log.Debug("Transaction unindexing interrupted", "blocks", blocks, "txs", txs, "tail", to, "elapsed", common.PrettyDuration(time.Since(start)))
  322. default:
  323. log.Info("Unindexed transactions", "blocks", blocks, "txs", txs, "tail", to, "elapsed", common.PrettyDuration(time.Since(start)))
  324. }
  325. }
  326. // UnindexTransactions removes txlookup indices of the specified block range.
  327. //
  328. // There is a passed channel, the whole procedure will be interrupted if any
  329. // signal received.
  330. func UnindexTransactions(db ethdb.Database, from uint64, to uint64, interrupt chan struct{}) {
  331. unindexTransactions(db, from, to, interrupt, nil)
  332. }
  333. // unindexTransactionsForTesting is the internal debug version with an additional hook.
  334. func unindexTransactionsForTesting(db ethdb.Database, from uint64, to uint64, interrupt chan struct{}, hook func(uint64) bool) {
  335. unindexTransactions(db, from, to, interrupt, hook)
  336. }