chain_iterator.go 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305
  1. // Copyright 2019 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package rawdb
  17. import (
  18. "math"
  19. "runtime"
  20. "sync/atomic"
  21. "time"
  22. "github.com/ethereum/go-ethereum/common"
  23. "github.com/ethereum/go-ethereum/common/prque"
  24. "github.com/ethereum/go-ethereum/ethdb"
  25. "github.com/ethereum/go-ethereum/log"
  26. "github.com/ethereum/go-ethereum/rlp"
  27. "golang.org/x/crypto/sha3"
  28. )
  29. // InitDatabaseFromFreezer reinitializes an empty database from a previous batch
  30. // of frozen ancient blocks. The method iterates over all the frozen blocks and
  31. // injects into the database the block hash->number mappings.
  32. func InitDatabaseFromFreezer(db ethdb.Database) {
  33. // If we can't access the freezer or it's empty, abort
  34. frozen, err := db.Ancients()
  35. if err != nil || frozen == 0 {
  36. return
  37. }
  38. var (
  39. batch = db.NewBatch()
  40. start = time.Now()
  41. logged = start.Add(-7 * time.Second) // Unindex during import is fast, don't double log
  42. hash common.Hash
  43. )
  44. for i := uint64(0); i < frozen; i++ {
  45. // Since the freezer has all data in sequential order on a file,
  46. // it would be 'neat' to read more data in one go, and let the
  47. // freezerdb return N items (e.g up to 1000 items per go)
  48. // That would require an API change in Ancients though
  49. if h, err := db.Ancient(freezerHashTable, i); err != nil {
  50. log.Crit("Failed to init database from freezer", "err", err)
  51. } else {
  52. hash = common.BytesToHash(h)
  53. }
  54. WriteHeaderNumber(batch, hash, i)
  55. // If enough data was accumulated in memory or we're at the last block, dump to disk
  56. if batch.ValueSize() > ethdb.IdealBatchSize {
  57. if err := batch.Write(); err != nil {
  58. log.Crit("Failed to write data to db", "err", err)
  59. }
  60. batch.Reset()
  61. }
  62. // If we've spent too much time already, notify the user of what we're doing
  63. if time.Since(logged) > 8*time.Second {
  64. log.Info("Initializing database from freezer", "total", frozen, "number", i, "hash", hash, "elapsed", common.PrettyDuration(time.Since(start)))
  65. logged = time.Now()
  66. }
  67. }
  68. if err := batch.Write(); err != nil {
  69. log.Crit("Failed to write data to db", "err", err)
  70. }
  71. batch.Reset()
  72. WriteHeadHeaderHash(db, hash)
  73. WriteHeadFastBlockHash(db, hash)
  74. log.Info("Initialized database from freezer", "blocks", frozen, "elapsed", common.PrettyDuration(time.Since(start)))
  75. }
  76. type blockTxHashes struct {
  77. number uint64
  78. hashes []common.Hash
  79. }
  80. // iterateTransactions iterates over all transactions in the (canon) block
  81. // number(s) given, and yields the hashes on a channel
  82. func iterateTransactions(db ethdb.Database, from uint64, to uint64, reverse bool) (chan *blockTxHashes, chan struct{}) {
  83. // One thread sequentially reads data from db
  84. type numberRlp struct {
  85. number uint64
  86. rlp rlp.RawValue
  87. }
  88. if to == from {
  89. return nil, nil
  90. }
  91. threads := to - from
  92. if cpus := runtime.NumCPU(); threads > uint64(cpus) {
  93. threads = uint64(cpus)
  94. }
  95. var (
  96. rlpCh = make(chan *numberRlp, threads*2) // we send raw rlp over this channel
  97. hashesCh = make(chan *blockTxHashes, threads*2) // send hashes over hashesCh
  98. abortCh = make(chan struct{})
  99. )
  100. // lookup runs in one instance
  101. lookup := func() {
  102. n, end := from, to
  103. if reverse {
  104. n, end = to-1, from-1
  105. }
  106. defer close(rlpCh)
  107. for n != end {
  108. data := ReadCanonicalBodyRLP(db, n)
  109. // Feed the block to the aggregator, or abort on interrupt
  110. select {
  111. case rlpCh <- &numberRlp{n, data}:
  112. case <-abortCh:
  113. return
  114. }
  115. if reverse {
  116. n--
  117. } else {
  118. n++
  119. }
  120. }
  121. }
  122. // process runs in parallell
  123. nThreadsAlive := int32(threads)
  124. process := func() {
  125. defer func() {
  126. // Last processor closes the result channel
  127. if atomic.AddInt32(&nThreadsAlive, -1) == 0 {
  128. close(hashesCh)
  129. }
  130. }()
  131. var hasher = sha3.NewLegacyKeccak256()
  132. for data := range rlpCh {
  133. it, err := rlp.NewListIterator(data.rlp)
  134. if err != nil {
  135. log.Warn("tx iteration error", "error", err)
  136. return
  137. }
  138. it.Next()
  139. txs := it.Value()
  140. txIt, err := rlp.NewListIterator(txs)
  141. if err != nil {
  142. log.Warn("tx iteration error", "error", err)
  143. return
  144. }
  145. var hashes []common.Hash
  146. for txIt.Next() {
  147. if err := txIt.Err(); err != nil {
  148. log.Warn("tx iteration error", "error", err)
  149. return
  150. }
  151. var txHash common.Hash
  152. hasher.Reset()
  153. hasher.Write(txIt.Value())
  154. hasher.Sum(txHash[:0])
  155. hashes = append(hashes, txHash)
  156. }
  157. result := &blockTxHashes{
  158. hashes: hashes,
  159. number: data.number,
  160. }
  161. // Feed the block to the aggregator, or abort on interrupt
  162. select {
  163. case hashesCh <- result:
  164. case <-abortCh:
  165. return
  166. }
  167. }
  168. }
  169. go lookup() // start the sequential db accessor
  170. for i := 0; i < int(threads); i++ {
  171. go process()
  172. }
  173. return hashesCh, abortCh
  174. }
  175. // IndexTransactions creates txlookup indices of the specified block range.
  176. //
  177. // This function iterates canonical chain in reverse order, it has one main advantage:
  178. // We can write tx index tail flag periodically even without the whole indexing
  179. // procedure is finished. So that we can resume indexing procedure next time quickly.
  180. func IndexTransactions(db ethdb.Database, from uint64, to uint64) {
  181. // short circuit for invalid range
  182. if from >= to {
  183. return
  184. }
  185. var (
  186. hashesCh, abortCh = iterateTransactions(db, from, to, true)
  187. batch = db.NewBatch()
  188. start = time.Now()
  189. logged = start.Add(-7 * time.Second)
  190. // Since we iterate in reverse, we expect the first number to come
  191. // in to be [to-1]. Therefore, setting lastNum to means that the
  192. // prqueue gap-evaluation will work correctly
  193. lastNum = to
  194. queue = prque.New(nil)
  195. // for stats reporting
  196. blocks, txs = 0, 0
  197. )
  198. defer close(abortCh)
  199. for chanDelivery := range hashesCh {
  200. // Push the delivery into the queue and process contiguous ranges.
  201. // Since we iterate in reverse, so lower numbers have lower prio, and
  202. // we can use the number directly as prio marker
  203. queue.Push(chanDelivery, int64(chanDelivery.number))
  204. for !queue.Empty() {
  205. // If the next available item is gapped, return
  206. if _, priority := queue.Peek(); priority != int64(lastNum-1) {
  207. break
  208. }
  209. // Next block available, pop it off and index it
  210. delivery := queue.PopItem().(*blockTxHashes)
  211. lastNum = delivery.number
  212. WriteTxLookupEntriesByHash(batch, delivery.number, delivery.hashes)
  213. blocks++
  214. txs += len(delivery.hashes)
  215. // If enough data was accumulated in memory or we're at the last block, dump to disk
  216. if batch.ValueSize() > ethdb.IdealBatchSize {
  217. // Also write the tail there
  218. WriteTxIndexTail(batch, lastNum)
  219. if err := batch.Write(); err != nil {
  220. log.Crit("Failed writing batch to db", "error", err)
  221. return
  222. }
  223. batch.Reset()
  224. }
  225. // If we've spent too much time already, notify the user of what we're doing
  226. if time.Since(logged) > 8*time.Second {
  227. log.Info("Indexing transactions", "blocks", blocks, "txs", txs, "tail", lastNum, "total", to-from, "elapsed", common.PrettyDuration(time.Since(start)))
  228. logged = time.Now()
  229. }
  230. }
  231. }
  232. if lastNum < to {
  233. WriteTxIndexTail(batch, lastNum)
  234. // No need to write the batch if we never entered the loop above...
  235. if err := batch.Write(); err != nil {
  236. log.Crit("Failed writing batch to db", "error", err)
  237. return
  238. }
  239. }
  240. log.Info("Indexed transactions", "blocks", blocks, "txs", txs, "tail", lastNum, "elapsed", common.PrettyDuration(time.Since(start)))
  241. }
  242. // UnindexTransactions removes txlookup indices of the specified block range.
  243. func UnindexTransactions(db ethdb.Database, from uint64, to uint64) {
  244. // short circuit for invalid range
  245. if from >= to {
  246. return
  247. }
  248. // Write flag first and then unindex the transaction indices. Some indices
  249. // will be left in the database if crash happens but it's fine.
  250. WriteTxIndexTail(db, to)
  251. // If only one block is unindexed, do it directly
  252. //if from+1 == to {
  253. // data := ReadCanonicalBodyRLP(db, uint64(from))
  254. // DeleteTxLookupEntries(db, ReadBlock(db, ReadCanonicalHash(db, from), from))
  255. // log.Info("Unindexed transactions", "blocks", 1, "tail", to)
  256. // return
  257. //}
  258. // TODO @holiman, add this back (if we want it)
  259. var (
  260. hashesCh, abortCh = iterateTransactions(db, from, to, false)
  261. batch = db.NewBatch()
  262. start = time.Now()
  263. logged = start.Add(-7 * time.Second)
  264. )
  265. defer close(abortCh)
  266. // Otherwise spin up the concurrent iterator and unindexer
  267. blocks, txs := 0, 0
  268. for delivery := range hashesCh {
  269. DeleteTxLookupEntriesByHash(batch, delivery.hashes)
  270. txs += len(delivery.hashes)
  271. blocks++
  272. // If enough data was accumulated in memory or we're at the last block, dump to disk
  273. // A batch counts the size of deletion as '1', so we need to flush more
  274. // often than that.
  275. if blocks%1000 == 0 {
  276. if err := batch.Write(); err != nil {
  277. log.Crit("Failed writing batch to db", "error", err)
  278. return
  279. }
  280. batch.Reset()
  281. }
  282. // If we've spent too much time already, notify the user of what we're doing
  283. if time.Since(logged) > 8*time.Second {
  284. log.Info("Unindexing transactions", "blocks", blocks, "txs", txs, "total", to-from, "elapsed", common.PrettyDuration(time.Since(start)))
  285. logged = time.Now()
  286. }
  287. }
  288. if err := batch.Write(); err != nil {
  289. log.Crit("Failed writing batch to db", "error", err)
  290. return
  291. }
  292. log.Info("Unindexed transactions", "blocks", blocks, "txs", txs, "tail", to, "elapsed", common.PrettyDuration(time.Since(start)))
  293. }