chain_iterator.go 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304
  1. // Copyright 2019 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package rawdb
  17. import (
  18. "runtime"
  19. "sync/atomic"
  20. "time"
  21. "github.com/ethereum/go-ethereum/common"
  22. "github.com/ethereum/go-ethereum/common/prque"
  23. "github.com/ethereum/go-ethereum/ethdb"
  24. "github.com/ethereum/go-ethereum/log"
  25. "github.com/ethereum/go-ethereum/rlp"
  26. "golang.org/x/crypto/sha3"
  27. )
  28. // InitDatabaseFromFreezer reinitializes an empty database from a previous batch
  29. // of frozen ancient blocks. The method iterates over all the frozen blocks and
  30. // injects into the database the block hash->number mappings.
  31. func InitDatabaseFromFreezer(db ethdb.Database) {
  32. // If we can't access the freezer or it's empty, abort
  33. frozen, err := db.Ancients()
  34. if err != nil || frozen == 0 {
  35. return
  36. }
  37. var (
  38. batch = db.NewBatch()
  39. start = time.Now()
  40. logged = start.Add(-7 * time.Second) // Unindex during import is fast, don't double log
  41. hash common.Hash
  42. )
  43. for i := uint64(0); i < frozen; i++ {
  44. // Since the freezer has all data in sequential order on a file,
  45. // it would be 'neat' to read more data in one go, and let the
  46. // freezerdb return N items (e.g up to 1000 items per go)
  47. // That would require an API change in Ancients though
  48. if h, err := db.Ancient(freezerHashTable, i); err != nil {
  49. log.Crit("Failed to init database from freezer", "err", err)
  50. } else {
  51. hash = common.BytesToHash(h)
  52. }
  53. WriteHeaderNumber(batch, hash, i)
  54. // If enough data was accumulated in memory or we're at the last block, dump to disk
  55. if batch.ValueSize() > ethdb.IdealBatchSize {
  56. if err := batch.Write(); err != nil {
  57. log.Crit("Failed to write data to db", "err", err)
  58. }
  59. batch.Reset()
  60. }
  61. // If we've spent too much time already, notify the user of what we're doing
  62. if time.Since(logged) > 8*time.Second {
  63. log.Info("Initializing database from freezer", "total", frozen, "number", i, "hash", hash, "elapsed", common.PrettyDuration(time.Since(start)))
  64. logged = time.Now()
  65. }
  66. }
  67. if err := batch.Write(); err != nil {
  68. log.Crit("Failed to write data to db", "err", err)
  69. }
  70. batch.Reset()
  71. WriteHeadHeaderHash(db, hash)
  72. WriteHeadFastBlockHash(db, hash)
  73. log.Info("Initialized database from freezer", "blocks", frozen, "elapsed", common.PrettyDuration(time.Since(start)))
  74. }
  75. type blockTxHashes struct {
  76. number uint64
  77. hashes []common.Hash
  78. }
  79. // iterateTransactions iterates over all transactions in the (canon) block
  80. // number(s) given, and yields the hashes on a channel
  81. func iterateTransactions(db ethdb.Database, from uint64, to uint64, reverse bool) (chan *blockTxHashes, chan struct{}) {
  82. // One thread sequentially reads data from db
  83. type numberRlp struct {
  84. number uint64
  85. rlp rlp.RawValue
  86. }
  87. if to == from {
  88. return nil, nil
  89. }
  90. threads := to - from
  91. if cpus := runtime.NumCPU(); threads > uint64(cpus) {
  92. threads = uint64(cpus)
  93. }
  94. var (
  95. rlpCh = make(chan *numberRlp, threads*2) // we send raw rlp over this channel
  96. hashesCh = make(chan *blockTxHashes, threads*2) // send hashes over hashesCh
  97. abortCh = make(chan struct{})
  98. )
  99. // lookup runs in one instance
  100. lookup := func() {
  101. n, end := from, to
  102. if reverse {
  103. n, end = to-1, from-1
  104. }
  105. defer close(rlpCh)
  106. for n != end {
  107. data := ReadCanonicalBodyRLP(db, n)
  108. // Feed the block to the aggregator, or abort on interrupt
  109. select {
  110. case rlpCh <- &numberRlp{n, data}:
  111. case <-abortCh:
  112. return
  113. }
  114. if reverse {
  115. n--
  116. } else {
  117. n++
  118. }
  119. }
  120. }
  121. // process runs in parallel
  122. nThreadsAlive := int32(threads)
  123. process := func() {
  124. defer func() {
  125. // Last processor closes the result channel
  126. if atomic.AddInt32(&nThreadsAlive, -1) == 0 {
  127. close(hashesCh)
  128. }
  129. }()
  130. var hasher = sha3.NewLegacyKeccak256()
  131. for data := range rlpCh {
  132. it, err := rlp.NewListIterator(data.rlp)
  133. if err != nil {
  134. log.Warn("tx iteration error", "error", err)
  135. return
  136. }
  137. it.Next()
  138. txs := it.Value()
  139. txIt, err := rlp.NewListIterator(txs)
  140. if err != nil {
  141. log.Warn("tx iteration error", "error", err)
  142. return
  143. }
  144. var hashes []common.Hash
  145. for txIt.Next() {
  146. if err := txIt.Err(); err != nil {
  147. log.Warn("tx iteration error", "error", err)
  148. return
  149. }
  150. var txHash common.Hash
  151. hasher.Reset()
  152. hasher.Write(txIt.Value())
  153. hasher.Sum(txHash[:0])
  154. hashes = append(hashes, txHash)
  155. }
  156. result := &blockTxHashes{
  157. hashes: hashes,
  158. number: data.number,
  159. }
  160. // Feed the block to the aggregator, or abort on interrupt
  161. select {
  162. case hashesCh <- result:
  163. case <-abortCh:
  164. return
  165. }
  166. }
  167. }
  168. go lookup() // start the sequential db accessor
  169. for i := 0; i < int(threads); i++ {
  170. go process()
  171. }
  172. return hashesCh, abortCh
  173. }
  174. // IndexTransactions creates txlookup indices of the specified block range.
  175. //
  176. // This function iterates canonical chain in reverse order, it has one main advantage:
  177. // We can write tx index tail flag periodically even without the whole indexing
  178. // procedure is finished. So that we can resume indexing procedure next time quickly.
  179. func IndexTransactions(db ethdb.Database, from uint64, to uint64) {
  180. // short circuit for invalid range
  181. if from >= to {
  182. return
  183. }
  184. var (
  185. hashesCh, abortCh = iterateTransactions(db, from, to, true)
  186. batch = db.NewBatch()
  187. start = time.Now()
  188. logged = start.Add(-7 * time.Second)
  189. // Since we iterate in reverse, we expect the first number to come
  190. // in to be [to-1]. Therefore, setting lastNum to means that the
  191. // prqueue gap-evaluation will work correctly
  192. lastNum = to
  193. queue = prque.New(nil)
  194. // for stats reporting
  195. blocks, txs = 0, 0
  196. )
  197. defer close(abortCh)
  198. for chanDelivery := range hashesCh {
  199. // Push the delivery into the queue and process contiguous ranges.
  200. // Since we iterate in reverse, so lower numbers have lower prio, and
  201. // we can use the number directly as prio marker
  202. queue.Push(chanDelivery, int64(chanDelivery.number))
  203. for !queue.Empty() {
  204. // If the next available item is gapped, return
  205. if _, priority := queue.Peek(); priority != int64(lastNum-1) {
  206. break
  207. }
  208. // Next block available, pop it off and index it
  209. delivery := queue.PopItem().(*blockTxHashes)
  210. lastNum = delivery.number
  211. WriteTxLookupEntries(batch, delivery.number, delivery.hashes)
  212. blocks++
  213. txs += len(delivery.hashes)
  214. // If enough data was accumulated in memory or we're at the last block, dump to disk
  215. if batch.ValueSize() > ethdb.IdealBatchSize {
  216. // Also write the tail there
  217. WriteTxIndexTail(batch, lastNum)
  218. if err := batch.Write(); err != nil {
  219. log.Crit("Failed writing batch to db", "error", err)
  220. return
  221. }
  222. batch.Reset()
  223. }
  224. // If we've spent too much time already, notify the user of what we're doing
  225. if time.Since(logged) > 8*time.Second {
  226. log.Info("Indexing transactions", "blocks", blocks, "txs", txs, "tail", lastNum, "total", to-from, "elapsed", common.PrettyDuration(time.Since(start)))
  227. logged = time.Now()
  228. }
  229. }
  230. }
  231. if lastNum < to {
  232. WriteTxIndexTail(batch, lastNum)
  233. // No need to write the batch if we never entered the loop above...
  234. if err := batch.Write(); err != nil {
  235. log.Crit("Failed writing batch to db", "error", err)
  236. return
  237. }
  238. }
  239. log.Info("Indexed transactions", "blocks", blocks, "txs", txs, "tail", lastNum, "elapsed", common.PrettyDuration(time.Since(start)))
  240. }
  241. // UnindexTransactions removes txlookup indices of the specified block range.
  242. func UnindexTransactions(db ethdb.Database, from uint64, to uint64) {
  243. // short circuit for invalid range
  244. if from >= to {
  245. return
  246. }
  247. // Write flag first and then unindex the transaction indices. Some indices
  248. // will be left in the database if crash happens but it's fine.
  249. WriteTxIndexTail(db, to)
  250. // If only one block is unindexed, do it directly
  251. //if from+1 == to {
  252. // data := ReadCanonicalBodyRLP(db, uint64(from))
  253. // DeleteTxLookupEntries(db, ReadBlock(db, ReadCanonicalHash(db, from), from))
  254. // log.Info("Unindexed transactions", "blocks", 1, "tail", to)
  255. // return
  256. //}
  257. // TODO @holiman, add this back (if we want it)
  258. var (
  259. hashesCh, abortCh = iterateTransactions(db, from, to, false)
  260. batch = db.NewBatch()
  261. start = time.Now()
  262. logged = start.Add(-7 * time.Second)
  263. )
  264. defer close(abortCh)
  265. // Otherwise spin up the concurrent iterator and unindexer
  266. blocks, txs := 0, 0
  267. for delivery := range hashesCh {
  268. DeleteTxLookupEntries(batch, delivery.hashes)
  269. txs += len(delivery.hashes)
  270. blocks++
  271. // If enough data was accumulated in memory or we're at the last block, dump to disk
  272. // A batch counts the size of deletion as '1', so we need to flush more
  273. // often than that.
  274. if blocks%1000 == 0 {
  275. if err := batch.Write(); err != nil {
  276. log.Crit("Failed writing batch to db", "error", err)
  277. return
  278. }
  279. batch.Reset()
  280. }
  281. // If we've spent too much time already, notify the user of what we're doing
  282. if time.Since(logged) > 8*time.Second {
  283. log.Info("Unindexing transactions", "blocks", blocks, "txs", txs, "total", to-from, "elapsed", common.PrettyDuration(time.Since(start)))
  284. logged = time.Now()
  285. }
  286. }
  287. if err := batch.Write(); err != nil {
  288. log.Crit("Failed writing batch to db", "error", err)
  289. return
  290. }
  291. log.Info("Unindexed transactions", "blocks", blocks, "txs", txs, "tail", to, "elapsed", common.PrettyDuration(time.Since(start)))
  292. }