chain_iterator.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351
  1. // Copyright 2019 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package rawdb
  17. import (
  18. "runtime"
  19. "sync/atomic"
  20. "time"
  21. "github.com/ethereum/go-ethereum/common"
  22. "github.com/ethereum/go-ethereum/common/gopool"
  23. "github.com/ethereum/go-ethereum/common/prque"
  24. "github.com/ethereum/go-ethereum/core/types"
  25. "github.com/ethereum/go-ethereum/ethdb"
  26. "github.com/ethereum/go-ethereum/log"
  27. "github.com/ethereum/go-ethereum/rlp"
  28. )
  29. // InitDatabaseFromFreezer reinitializes an empty database from a previous batch
  30. // of frozen ancient blocks. The method iterates over all the frozen blocks and
  31. // injects into the database the block hash->number mappings.
  32. func InitDatabaseFromFreezer(db ethdb.Database) {
  33. // If we can't access the freezer or it's empty, abort
  34. frozen, err := db.ItemAmountInAncient()
  35. if err != nil || frozen == 0 {
  36. return
  37. }
  38. var (
  39. batch = db.NewBatch()
  40. start = time.Now()
  41. logged = start.Add(-7 * time.Second) // Unindex during import is fast, don't double log
  42. hash common.Hash
  43. offset = db.AncientOffSet()
  44. )
  45. for i := uint64(0) + offset; i < frozen+offset; i++ {
  46. // Since the freezer has all data in sequential order on a file,
  47. // it would be 'neat' to read more data in one go, and let the
  48. // freezerdb return N items (e.g up to 1000 items per go)
  49. // That would require an API change in Ancients though
  50. if h, err := db.Ancient(freezerHashTable, i); err != nil {
  51. log.Crit("Failed to init database from freezer", "err", err)
  52. } else {
  53. hash = common.BytesToHash(h)
  54. }
  55. WriteHeaderNumber(batch, hash, i)
  56. // If enough data was accumulated in memory or we're at the last block, dump to disk
  57. if batch.ValueSize() > ethdb.IdealBatchSize {
  58. if err := batch.Write(); err != nil {
  59. log.Crit("Failed to write data to db", "err", err)
  60. }
  61. batch.Reset()
  62. }
  63. // If we've spent too much time already, notify the user of what we're doing
  64. if time.Since(logged) > 8*time.Second {
  65. log.Info("Initializing database from freezer", "total", frozen, "number", i, "hash", hash, "elapsed", common.PrettyDuration(time.Since(start)))
  66. logged = time.Now()
  67. }
  68. }
  69. if err := batch.Write(); err != nil {
  70. log.Crit("Failed to write data to db", "err", err)
  71. }
  72. batch.Reset()
  73. WriteHeadHeaderHash(db, hash)
  74. WriteHeadFastBlockHash(db, hash)
  75. log.Info("Initialized database from freezer", "blocks", frozen, "elapsed", common.PrettyDuration(time.Since(start)))
  76. }
  77. type blockTxHashes struct {
  78. number uint64
  79. hashes []common.Hash
  80. }
  81. // iterateTransactions iterates over all transactions in the (canon) block
  82. // number(s) given, and yields the hashes on a channel. If there is a signal
  83. // received from interrupt channel, the iteration will be aborted and result
  84. // channel will be closed.
  85. func iterateTransactions(db ethdb.Database, from uint64, to uint64, reverse bool, interrupt chan struct{}) chan *blockTxHashes {
  86. // One thread sequentially reads data from db
  87. type numberRlp struct {
  88. number uint64
  89. rlp rlp.RawValue
  90. }
  91. if to == from {
  92. return nil
  93. }
  94. threads := to - from
  95. if cpus := runtime.NumCPU(); threads > uint64(cpus) {
  96. threads = uint64(cpus)
  97. }
  98. var (
  99. rlpCh = make(chan *numberRlp, threads*2) // we send raw rlp over this channel
  100. hashesCh = make(chan *blockTxHashes, threads*2) // send hashes over hashesCh
  101. )
  102. // lookup runs in one instance
  103. lookup := func() {
  104. n, end := from, to
  105. if reverse {
  106. n, end = to-1, from-1
  107. }
  108. defer close(rlpCh)
  109. for n != end {
  110. data := ReadCanonicalBodyRLP(db, n)
  111. // Feed the block to the aggregator, or abort on interrupt
  112. select {
  113. case rlpCh <- &numberRlp{n, data}:
  114. case <-interrupt:
  115. return
  116. }
  117. if reverse {
  118. n--
  119. } else {
  120. n++
  121. }
  122. }
  123. }
  124. // process runs in parallel
  125. nThreadsAlive := int32(threads)
  126. process := func() {
  127. defer func() {
  128. // Last processor closes the result channel
  129. if atomic.AddInt32(&nThreadsAlive, -1) == 0 {
  130. close(hashesCh)
  131. }
  132. }()
  133. for data := range rlpCh {
  134. var body types.Body
  135. if err := rlp.DecodeBytes(data.rlp, &body); err != nil {
  136. log.Warn("Failed to decode block body", "block", data.number, "error", err)
  137. return
  138. }
  139. var hashes []common.Hash
  140. for _, tx := range body.Transactions {
  141. hashes = append(hashes, tx.Hash())
  142. }
  143. result := &blockTxHashes{
  144. hashes: hashes,
  145. number: data.number,
  146. }
  147. // Feed the block to the aggregator, or abort on interrupt
  148. select {
  149. case hashesCh <- result:
  150. case <-interrupt:
  151. return
  152. }
  153. }
  154. }
  155. go lookup() // start the sequential db accessor
  156. for i := 0; i < int(threads); i++ {
  157. gopool.Submit(func() {
  158. process()
  159. })
  160. }
  161. return hashesCh
  162. }
  163. // indexTransactions creates txlookup indices of the specified block range.
  164. //
  165. // This function iterates canonical chain in reverse order, it has one main advantage:
  166. // We can write tx index tail flag periodically even without the whole indexing
  167. // procedure is finished. So that we can resume indexing procedure next time quickly.
  168. //
  169. // There is a passed channel, the whole procedure will be interrupted if any
  170. // signal received.
  171. func indexTransactions(db ethdb.Database, from uint64, to uint64, interrupt chan struct{}, hook func(uint64) bool) {
  172. // short circuit for invalid range
  173. if from >= to {
  174. return
  175. }
  176. var (
  177. hashesCh = iterateTransactions(db, from, to, true, interrupt)
  178. batch = db.NewBatch()
  179. start = time.Now()
  180. logged = start.Add(-7 * time.Second)
  181. // Since we iterate in reverse, we expect the first number to come
  182. // in to be [to-1]. Therefore, setting lastNum to means that the
  183. // prqueue gap-evaluation will work correctly
  184. lastNum = to
  185. queue = prque.New(nil)
  186. // for stats reporting
  187. blocks, txs = 0, 0
  188. )
  189. for chanDelivery := range hashesCh {
  190. // Push the delivery into the queue and process contiguous ranges.
  191. // Since we iterate in reverse, so lower numbers have lower prio, and
  192. // we can use the number directly as prio marker
  193. queue.Push(chanDelivery, int64(chanDelivery.number))
  194. for !queue.Empty() {
  195. // If the next available item is gapped, return
  196. if _, priority := queue.Peek(); priority != int64(lastNum-1) {
  197. break
  198. }
  199. // For testing
  200. if hook != nil && !hook(lastNum-1) {
  201. break
  202. }
  203. // Next block available, pop it off and index it
  204. delivery := queue.PopItem().(*blockTxHashes)
  205. lastNum = delivery.number
  206. WriteTxLookupEntries(batch, delivery.number, delivery.hashes)
  207. blocks++
  208. txs += len(delivery.hashes)
  209. // If enough data was accumulated in memory or we're at the last block, dump to disk
  210. if batch.ValueSize() > ethdb.IdealBatchSize {
  211. WriteTxIndexTail(batch, lastNum) // Also write the tail here
  212. if err := batch.Write(); err != nil {
  213. log.Crit("Failed writing batch to db", "error", err)
  214. return
  215. }
  216. batch.Reset()
  217. }
  218. // If we've spent too much time already, notify the user of what we're doing
  219. if time.Since(logged) > 8*time.Second {
  220. log.Info("Indexing transactions", "blocks", blocks, "txs", txs, "tail", lastNum, "total", to-from, "elapsed", common.PrettyDuration(time.Since(start)))
  221. logged = time.Now()
  222. }
  223. }
  224. }
  225. // Flush the new indexing tail and the last committed data. It can also happen
  226. // that the last batch is empty because nothing to index, but the tail has to
  227. // be flushed anyway.
  228. WriteTxIndexTail(batch, lastNum)
  229. if err := batch.Write(); err != nil {
  230. log.Crit("Failed writing batch to db", "error", err)
  231. return
  232. }
  233. select {
  234. case <-interrupt:
  235. log.Debug("Transaction indexing interrupted", "blocks", blocks, "txs", txs, "tail", lastNum, "elapsed", common.PrettyDuration(time.Since(start)))
  236. default:
  237. log.Info("Indexed transactions", "blocks", blocks, "txs", txs, "tail", lastNum, "elapsed", common.PrettyDuration(time.Since(start)))
  238. }
  239. }
  240. // IndexTransactions creates txlookup indices of the specified block range.
  241. //
  242. // This function iterates canonical chain in reverse order, it has one main advantage:
  243. // We can write tx index tail flag periodically even without the whole indexing
  244. // procedure is finished. So that we can resume indexing procedure next time quickly.
  245. //
  246. // There is a passed channel, the whole procedure will be interrupted if any
  247. // signal received.
  248. func IndexTransactions(db ethdb.Database, from uint64, to uint64, interrupt chan struct{}) {
  249. indexTransactions(db, from, to, interrupt, nil)
  250. }
  251. // indexTransactionsForTesting is the internal debug version with an additional hook.
  252. func indexTransactionsForTesting(db ethdb.Database, from uint64, to uint64, interrupt chan struct{}, hook func(uint64) bool) {
  253. indexTransactions(db, from, to, interrupt, hook)
  254. }
  255. // unindexTransactions removes txlookup indices of the specified block range.
  256. //
  257. // There is a passed channel, the whole procedure will be interrupted if any
  258. // signal received.
  259. func unindexTransactions(db ethdb.Database, from uint64, to uint64, interrupt chan struct{}, hook func(uint64) bool) {
  260. // short circuit for invalid range
  261. if from >= to {
  262. return
  263. }
  264. var (
  265. hashesCh = iterateTransactions(db, from, to, false, interrupt)
  266. batch = db.NewBatch()
  267. start = time.Now()
  268. logged = start.Add(-7 * time.Second)
  269. // we expect the first number to come in to be [from]. Therefore, setting
  270. // nextNum to from means that the prqueue gap-evaluation will work correctly
  271. nextNum = from
  272. queue = prque.New(nil)
  273. // for stats reporting
  274. blocks, txs = 0, 0
  275. )
  276. // Otherwise spin up the concurrent iterator and unindexer
  277. for delivery := range hashesCh {
  278. // Push the delivery into the queue and process contiguous ranges.
  279. queue.Push(delivery, -int64(delivery.number))
  280. for !queue.Empty() {
  281. // If the next available item is gapped, return
  282. if _, priority := queue.Peek(); -priority != int64(nextNum) {
  283. break
  284. }
  285. // For testing
  286. if hook != nil && !hook(nextNum) {
  287. break
  288. }
  289. delivery := queue.PopItem().(*blockTxHashes)
  290. nextNum = delivery.number + 1
  291. DeleteTxLookupEntries(batch, delivery.hashes)
  292. txs += len(delivery.hashes)
  293. blocks++
  294. // If enough data was accumulated in memory or we're at the last block, dump to disk
  295. // A batch counts the size of deletion as '1', so we need to flush more
  296. // often than that.
  297. if blocks%1000 == 0 {
  298. WriteTxIndexTail(batch, nextNum)
  299. if err := batch.Write(); err != nil {
  300. log.Crit("Failed writing batch to db", "error", err)
  301. return
  302. }
  303. batch.Reset()
  304. }
  305. // If we've spent too much time already, notify the user of what we're doing
  306. if time.Since(logged) > 8*time.Second {
  307. log.Info("Unindexing transactions", "blocks", blocks, "txs", txs, "total", to-from, "elapsed", common.PrettyDuration(time.Since(start)))
  308. logged = time.Now()
  309. }
  310. }
  311. }
  312. // Flush the new indexing tail and the last committed data. It can also happen
  313. // that the last batch is empty because nothing to unindex, but the tail has to
  314. // be flushed anyway.
  315. WriteTxIndexTail(batch, nextNum)
  316. if err := batch.Write(); err != nil {
  317. log.Crit("Failed writing batch to db", "error", err)
  318. return
  319. }
  320. select {
  321. case <-interrupt:
  322. log.Debug("Transaction unindexing interrupted", "blocks", blocks, "txs", txs, "tail", to, "elapsed", common.PrettyDuration(time.Since(start)))
  323. default:
  324. log.Info("Unindexed transactions", "blocks", blocks, "txs", txs, "tail", to, "elapsed", common.PrettyDuration(time.Since(start)))
  325. }
  326. }
  327. // UnindexTransactions removes txlookup indices of the specified block range.
  328. //
  329. // There is a passed channel, the whole procedure will be interrupted if any
  330. // signal received.
  331. func UnindexTransactions(db ethdb.Database, from uint64, to uint64, interrupt chan struct{}) {
  332. unindexTransactions(db, from, to, interrupt, nil)
  333. }
  334. // unindexTransactionsForTesting is the internal debug version with an additional hook.
  335. func unindexTransactionsForTesting(db ethdb.Database, from uint64, to uint64, interrupt chan struct{}, hook func(uint64) bool) {
  336. unindexTransactions(db, from, to, interrupt, hook)
  337. }