chain_iterator.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364
  1. // Copyright 2019 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package rawdb
  17. import (
  18. "runtime"
  19. "sync/atomic"
  20. "time"
  21. "github.com/ethereum/go-ethereum/common"
  22. "github.com/ethereum/go-ethereum/common/prque"
  23. "github.com/ethereum/go-ethereum/ethdb"
  24. "github.com/ethereum/go-ethereum/log"
  25. "github.com/ethereum/go-ethereum/rlp"
  26. "golang.org/x/crypto/sha3"
  27. )
  28. // InitDatabaseFromFreezer reinitializes an empty database from a previous batch
  29. // of frozen ancient blocks. The method iterates over all the frozen blocks and
  30. // injects into the database the block hash->number mappings.
  31. func InitDatabaseFromFreezer(db ethdb.Database) {
  32. // If we can't access the freezer or it's empty, abort
  33. frozen, err := db.Ancients()
  34. if err != nil || frozen == 0 {
  35. return
  36. }
  37. var (
  38. batch = db.NewBatch()
  39. start = time.Now()
  40. logged = start.Add(-7 * time.Second) // Unindex during import is fast, don't double log
  41. hash common.Hash
  42. )
  43. for i := uint64(0); i < frozen; i++ {
  44. // Since the freezer has all data in sequential order on a file,
  45. // it would be 'neat' to read more data in one go, and let the
  46. // freezerdb return N items (e.g up to 1000 items per go)
  47. // That would require an API change in Ancients though
  48. if h, err := db.Ancient(freezerHashTable, i); err != nil {
  49. log.Crit("Failed to init database from freezer", "err", err)
  50. } else {
  51. hash = common.BytesToHash(h)
  52. }
  53. WriteHeaderNumber(batch, hash, i)
  54. // If enough data was accumulated in memory or we're at the last block, dump to disk
  55. if batch.ValueSize() > ethdb.IdealBatchSize {
  56. if err := batch.Write(); err != nil {
  57. log.Crit("Failed to write data to db", "err", err)
  58. }
  59. batch.Reset()
  60. }
  61. // If we've spent too much time already, notify the user of what we're doing
  62. if time.Since(logged) > 8*time.Second {
  63. log.Info("Initializing database from freezer", "total", frozen, "number", i, "hash", hash, "elapsed", common.PrettyDuration(time.Since(start)))
  64. logged = time.Now()
  65. }
  66. }
  67. if err := batch.Write(); err != nil {
  68. log.Crit("Failed to write data to db", "err", err)
  69. }
  70. batch.Reset()
  71. WriteHeadHeaderHash(db, hash)
  72. WriteHeadFastBlockHash(db, hash)
  73. log.Info("Initialized database from freezer", "blocks", frozen, "elapsed", common.PrettyDuration(time.Since(start)))
  74. }
  75. type blockTxHashes struct {
  76. number uint64
  77. hashes []common.Hash
  78. }
  79. // iterateTransactions iterates over all transactions in the (canon) block
  80. // number(s) given, and yields the hashes on a channel. If there is a signal
  81. // received from interrupt channel, the iteration will be aborted and result
  82. // channel will be closed.
  83. func iterateTransactions(db ethdb.Database, from uint64, to uint64, reverse bool, interrupt chan struct{}) chan *blockTxHashes {
  84. // One thread sequentially reads data from db
  85. type numberRlp struct {
  86. number uint64
  87. rlp rlp.RawValue
  88. }
  89. if to == from {
  90. return nil
  91. }
  92. threads := to - from
  93. if cpus := runtime.NumCPU(); threads > uint64(cpus) {
  94. threads = uint64(cpus)
  95. }
  96. var (
  97. rlpCh = make(chan *numberRlp, threads*2) // we send raw rlp over this channel
  98. hashesCh = make(chan *blockTxHashes, threads*2) // send hashes over hashesCh
  99. )
  100. // lookup runs in one instance
  101. lookup := func() {
  102. n, end := from, to
  103. if reverse {
  104. n, end = to-1, from-1
  105. }
  106. defer close(rlpCh)
  107. for n != end {
  108. data := ReadCanonicalBodyRLP(db, n)
  109. // Feed the block to the aggregator, or abort on interrupt
  110. select {
  111. case rlpCh <- &numberRlp{n, data}:
  112. case <-interrupt:
  113. return
  114. }
  115. if reverse {
  116. n--
  117. } else {
  118. n++
  119. }
  120. }
  121. }
  122. // process runs in parallel
  123. nThreadsAlive := int32(threads)
  124. process := func() {
  125. defer func() {
  126. // Last processor closes the result channel
  127. if atomic.AddInt32(&nThreadsAlive, -1) == 0 {
  128. close(hashesCh)
  129. }
  130. }()
  131. var hasher = sha3.NewLegacyKeccak256()
  132. for data := range rlpCh {
  133. it, err := rlp.NewListIterator(data.rlp)
  134. if err != nil {
  135. log.Warn("tx iteration error", "error", err)
  136. return
  137. }
  138. it.Next()
  139. txs := it.Value()
  140. txIt, err := rlp.NewListIterator(txs)
  141. if err != nil {
  142. log.Warn("tx iteration error", "error", err)
  143. return
  144. }
  145. var hashes []common.Hash
  146. for txIt.Next() {
  147. if err := txIt.Err(); err != nil {
  148. log.Warn("tx iteration error", "error", err)
  149. return
  150. }
  151. var txHash common.Hash
  152. hasher.Reset()
  153. hasher.Write(txIt.Value())
  154. hasher.Sum(txHash[:0])
  155. hashes = append(hashes, txHash)
  156. }
  157. result := &blockTxHashes{
  158. hashes: hashes,
  159. number: data.number,
  160. }
  161. // Feed the block to the aggregator, or abort on interrupt
  162. select {
  163. case hashesCh <- result:
  164. case <-interrupt:
  165. return
  166. }
  167. }
  168. }
  169. go lookup() // start the sequential db accessor
  170. for i := 0; i < int(threads); i++ {
  171. go process()
  172. }
  173. return hashesCh
  174. }
  175. // indexTransactions creates txlookup indices of the specified block range.
  176. //
  177. // This function iterates canonical chain in reverse order, it has one main advantage:
  178. // We can write tx index tail flag periodically even without the whole indexing
  179. // procedure is finished. So that we can resume indexing procedure next time quickly.
  180. //
  181. // There is a passed channel, the whole procedure will be interrupted if any
  182. // signal received.
  183. func indexTransactions(db ethdb.Database, from uint64, to uint64, interrupt chan struct{}, hook func(uint64) bool) {
  184. // short circuit for invalid range
  185. if from >= to {
  186. return
  187. }
  188. var (
  189. hashesCh = iterateTransactions(db, from, to, true, interrupt)
  190. batch = db.NewBatch()
  191. start = time.Now()
  192. logged = start.Add(-7 * time.Second)
  193. // Since we iterate in reverse, we expect the first number to come
  194. // in to be [to-1]. Therefore, setting lastNum to means that the
  195. // prqueue gap-evaluation will work correctly
  196. lastNum = to
  197. queue = prque.New(nil)
  198. // for stats reporting
  199. blocks, txs = 0, 0
  200. )
  201. for chanDelivery := range hashesCh {
  202. // Push the delivery into the queue and process contiguous ranges.
  203. // Since we iterate in reverse, so lower numbers have lower prio, and
  204. // we can use the number directly as prio marker
  205. queue.Push(chanDelivery, int64(chanDelivery.number))
  206. for !queue.Empty() {
  207. // If the next available item is gapped, return
  208. if _, priority := queue.Peek(); priority != int64(lastNum-1) {
  209. break
  210. }
  211. // For testing
  212. if hook != nil && !hook(lastNum-1) {
  213. break
  214. }
  215. // Next block available, pop it off and index it
  216. delivery := queue.PopItem().(*blockTxHashes)
  217. lastNum = delivery.number
  218. WriteTxLookupEntries(batch, delivery.number, delivery.hashes)
  219. blocks++
  220. txs += len(delivery.hashes)
  221. // If enough data was accumulated in memory or we're at the last block, dump to disk
  222. if batch.ValueSize() > ethdb.IdealBatchSize {
  223. WriteTxIndexTail(batch, lastNum) // Also write the tail here
  224. if err := batch.Write(); err != nil {
  225. log.Crit("Failed writing batch to db", "error", err)
  226. return
  227. }
  228. batch.Reset()
  229. }
  230. // If we've spent too much time already, notify the user of what we're doing
  231. if time.Since(logged) > 8*time.Second {
  232. log.Info("Indexing transactions", "blocks", blocks, "txs", txs, "tail", lastNum, "total", to-from, "elapsed", common.PrettyDuration(time.Since(start)))
  233. logged = time.Now()
  234. }
  235. }
  236. }
  237. // If there exists uncommitted data, flush them.
  238. if batch.ValueSize() > 0 {
  239. WriteTxIndexTail(batch, lastNum) // Also write the tail there
  240. if err := batch.Write(); err != nil {
  241. log.Crit("Failed writing batch to db", "error", err)
  242. return
  243. }
  244. }
  245. select {
  246. case <-interrupt:
  247. log.Debug("Transaction indexing interrupted", "blocks", blocks, "txs", txs, "tail", lastNum, "elapsed", common.PrettyDuration(time.Since(start)))
  248. default:
  249. log.Info("Indexed transactions", "blocks", blocks, "txs", txs, "tail", lastNum, "elapsed", common.PrettyDuration(time.Since(start)))
  250. }
  251. }
  252. // IndexTransactions creates txlookup indices of the specified block range.
  253. //
  254. // This function iterates canonical chain in reverse order, it has one main advantage:
  255. // We can write tx index tail flag periodically even without the whole indexing
  256. // procedure is finished. So that we can resume indexing procedure next time quickly.
  257. //
  258. // There is a passed channel, the whole procedure will be interrupted if any
  259. // signal received.
  260. func IndexTransactions(db ethdb.Database, from uint64, to uint64, interrupt chan struct{}) {
  261. indexTransactions(db, from, to, interrupt, nil)
  262. }
  263. // indexTransactionsForTesting is the internal debug version with an additional hook.
  264. func indexTransactionsForTesting(db ethdb.Database, from uint64, to uint64, interrupt chan struct{}, hook func(uint64) bool) {
  265. indexTransactions(db, from, to, interrupt, hook)
  266. }
  267. // unindexTransactions removes txlookup indices of the specified block range.
  268. //
  269. // There is a passed channel, the whole procedure will be interrupted if any
  270. // signal received.
  271. func unindexTransactions(db ethdb.Database, from uint64, to uint64, interrupt chan struct{}, hook func(uint64) bool) {
  272. // short circuit for invalid range
  273. if from >= to {
  274. return
  275. }
  276. var (
  277. hashesCh = iterateTransactions(db, from, to, false, interrupt)
  278. batch = db.NewBatch()
  279. start = time.Now()
  280. logged = start.Add(-7 * time.Second)
  281. // we expect the first number to come in to be [from]. Therefore, setting
  282. // nextNum to from means that the prqueue gap-evaluation will work correctly
  283. nextNum = from
  284. queue = prque.New(nil)
  285. // for stats reporting
  286. blocks, txs = 0, 0
  287. )
  288. // Otherwise spin up the concurrent iterator and unindexer
  289. for delivery := range hashesCh {
  290. // Push the delivery into the queue and process contiguous ranges.
  291. queue.Push(delivery, -int64(delivery.number))
  292. for !queue.Empty() {
  293. // If the next available item is gapped, return
  294. if _, priority := queue.Peek(); -priority != int64(nextNum) {
  295. break
  296. }
  297. // For testing
  298. if hook != nil && !hook(nextNum) {
  299. break
  300. }
  301. delivery := queue.PopItem().(*blockTxHashes)
  302. nextNum = delivery.number + 1
  303. DeleteTxLookupEntries(batch, delivery.hashes)
  304. txs += len(delivery.hashes)
  305. blocks++
  306. // If enough data was accumulated in memory or we're at the last block, dump to disk
  307. // A batch counts the size of deletion as '1', so we need to flush more
  308. // often than that.
  309. if blocks%1000 == 0 {
  310. WriteTxIndexTail(batch, nextNum)
  311. if err := batch.Write(); err != nil {
  312. log.Crit("Failed writing batch to db", "error", err)
  313. return
  314. }
  315. batch.Reset()
  316. }
  317. // If we've spent too much time already, notify the user of what we're doing
  318. if time.Since(logged) > 8*time.Second {
  319. log.Info("Unindexing transactions", "blocks", blocks, "txs", txs, "total", to-from, "elapsed", common.PrettyDuration(time.Since(start)))
  320. logged = time.Now()
  321. }
  322. }
  323. }
  324. // Commit the last batch if there exists uncommitted data
  325. if batch.ValueSize() > 0 {
  326. WriteTxIndexTail(batch, nextNum)
  327. if err := batch.Write(); err != nil {
  328. log.Crit("Failed writing batch to db", "error", err)
  329. return
  330. }
  331. }
  332. select {
  333. case <-interrupt:
  334. log.Debug("Transaction unindexing interrupted", "blocks", blocks, "txs", txs, "tail", to, "elapsed", common.PrettyDuration(time.Since(start)))
  335. default:
  336. log.Info("Unindexed transactions", "blocks", blocks, "txs", txs, "tail", to, "elapsed", common.PrettyDuration(time.Since(start)))
  337. }
  338. }
  339. // UnindexTransactions removes txlookup indices of the specified block range.
  340. //
  341. // There is a passed channel, the whole procedure will be interrupted if any
  342. // signal received.
  343. func UnindexTransactions(db ethdb.Database, from uint64, to uint64, interrupt chan struct{}) {
  344. unindexTransactions(db, from, to, interrupt, nil)
  345. }
  346. // unindexTransactionsForTesting is the internal debug version with an additional hook.
  347. func unindexTransactionsForTesting(db ethdb.Database, from uint64, to uint64, interrupt chan struct{}, hook func(uint64) bool) {
  348. unindexTransactions(db, from, to, interrupt, hook)
  349. }