postprocess.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529
  1. // Copyright 2017 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package light
  17. import (
  18. "bytes"
  19. "context"
  20. "encoding/binary"
  21. "errors"
  22. "fmt"
  23. "math/big"
  24. "time"
  25. mapset "github.com/deckarep/golang-set"
  26. "github.com/ethereum/go-ethereum/common"
  27. "github.com/ethereum/go-ethereum/common/bitutil"
  28. "github.com/ethereum/go-ethereum/core"
  29. "github.com/ethereum/go-ethereum/core/rawdb"
  30. "github.com/ethereum/go-ethereum/core/types"
  31. "github.com/ethereum/go-ethereum/ethdb"
  32. "github.com/ethereum/go-ethereum/log"
  33. "github.com/ethereum/go-ethereum/params"
  34. "github.com/ethereum/go-ethereum/rlp"
  35. "github.com/ethereum/go-ethereum/trie"
  36. )
  37. // IndexerConfig includes a set of configs for chain indexers.
  38. type IndexerConfig struct {
  39. // The block frequency for creating CHTs.
  40. ChtSize uint64
  41. // The number of confirmations needed to generate/accept a canonical hash help trie.
  42. ChtConfirms uint64
  43. // The block frequency for creating new bloom bits.
  44. BloomSize uint64
  45. // The number of confirmation needed before a bloom section is considered probably final and its rotated bits
  46. // are calculated.
  47. BloomConfirms uint64
  48. // The block frequency for creating BloomTrie.
  49. BloomTrieSize uint64
  50. // The number of confirmations needed to generate/accept a bloom trie.
  51. BloomTrieConfirms uint64
  52. }
  53. var (
  54. // DefaultServerIndexerConfig wraps a set of configs as a default indexer config for server side.
  55. DefaultServerIndexerConfig = &IndexerConfig{
  56. ChtSize: params.CHTFrequency,
  57. ChtConfirms: params.HelperTrieProcessConfirmations,
  58. BloomSize: params.BloomBitsBlocks,
  59. BloomConfirms: params.BloomConfirms,
  60. BloomTrieSize: params.BloomTrieFrequency,
  61. BloomTrieConfirms: params.HelperTrieProcessConfirmations,
  62. }
  63. // DefaultClientIndexerConfig wraps a set of configs as a default indexer config for client side.
  64. DefaultClientIndexerConfig = &IndexerConfig{
  65. ChtSize: params.CHTFrequency,
  66. ChtConfirms: params.HelperTrieConfirmations,
  67. BloomSize: params.BloomBitsBlocksClient,
  68. BloomConfirms: params.HelperTrieConfirmations,
  69. BloomTrieSize: params.BloomTrieFrequency,
  70. BloomTrieConfirms: params.HelperTrieConfirmations,
  71. }
  72. // TestServerIndexerConfig wraps a set of configs as a test indexer config for server side.
  73. TestServerIndexerConfig = &IndexerConfig{
  74. ChtSize: 128,
  75. ChtConfirms: 1,
  76. BloomSize: 16,
  77. BloomConfirms: 1,
  78. BloomTrieSize: 128,
  79. BloomTrieConfirms: 1,
  80. }
  81. // TestClientIndexerConfig wraps a set of configs as a test indexer config for client side.
  82. TestClientIndexerConfig = &IndexerConfig{
  83. ChtSize: 128,
  84. ChtConfirms: 8,
  85. BloomSize: 128,
  86. BloomConfirms: 8,
  87. BloomTrieSize: 128,
  88. BloomTrieConfirms: 8,
  89. }
  90. )
  91. var (
  92. errNoTrustedCht = errors.New("no trusted canonical hash trie")
  93. errNoTrustedBloomTrie = errors.New("no trusted bloom trie")
  94. errNoHeader = errors.New("header not found")
  95. chtPrefix = []byte("chtRootV2-") // chtPrefix + chtNum (uint64 big endian) -> trie root hash
  96. ChtTablePrefix = "cht-"
  97. )
  98. // ChtNode structures are stored in the Canonical Hash Trie in an RLP encoded format
  99. type ChtNode struct {
  100. Hash common.Hash
  101. Td *big.Int
  102. }
  103. // GetChtRoot reads the CHT root associated to the given section from the database
  104. func GetChtRoot(db ethdb.Database, sectionIdx uint64, sectionHead common.Hash) common.Hash {
  105. var encNumber [8]byte
  106. binary.BigEndian.PutUint64(encNumber[:], sectionIdx)
  107. data, _ := db.Get(append(append(chtPrefix, encNumber[:]...), sectionHead.Bytes()...))
  108. return common.BytesToHash(data)
  109. }
  110. // StoreChtRoot writes the CHT root associated to the given section into the database
  111. func StoreChtRoot(db ethdb.Database, sectionIdx uint64, sectionHead, root common.Hash) {
  112. var encNumber [8]byte
  113. binary.BigEndian.PutUint64(encNumber[:], sectionIdx)
  114. db.Put(append(append(chtPrefix, encNumber[:]...), sectionHead.Bytes()...), root.Bytes())
  115. }
  116. // ChtIndexerBackend implements core.ChainIndexerBackend.
  117. type ChtIndexerBackend struct {
  118. disablePruning bool
  119. diskdb, trieTable ethdb.Database
  120. odr OdrBackend
  121. triedb *trie.Database
  122. trieset mapset.Set
  123. section, sectionSize uint64
  124. lastHash common.Hash
  125. trie *trie.Trie
  126. }
  127. // NewChtIndexer creates a Cht chain indexer
  128. func NewChtIndexer(db ethdb.Database, odr OdrBackend, size, confirms uint64, disablePruning bool) *core.ChainIndexer {
  129. trieTable := rawdb.NewTable(db, ChtTablePrefix)
  130. backend := &ChtIndexerBackend{
  131. diskdb: db,
  132. odr: odr,
  133. trieTable: trieTable,
  134. triedb: trie.NewDatabaseWithConfig(trieTable, &trie.Config{Cache: 1}), // Use a tiny cache only to keep memory down
  135. trieset: mapset.NewSet(),
  136. sectionSize: size,
  137. disablePruning: disablePruning,
  138. }
  139. return core.NewChainIndexer(db, rawdb.NewTable(db, "chtIndexV2-"), backend, size, confirms, time.Millisecond*100, "cht")
  140. }
  141. // fetchMissingNodes tries to retrieve the last entry of the latest trusted CHT from the
  142. // ODR backend in order to be able to add new entries and calculate subsequent root hashes
  143. func (c *ChtIndexerBackend) fetchMissingNodes(ctx context.Context, section uint64, root common.Hash) error {
  144. batch := c.trieTable.NewBatch()
  145. r := &ChtRequest{ChtRoot: root, ChtNum: section - 1, BlockNum: section*c.sectionSize - 1, Config: c.odr.IndexerConfig()}
  146. for {
  147. err := c.odr.Retrieve(ctx, r)
  148. switch err {
  149. case nil:
  150. r.Proof.Store(batch)
  151. return batch.Write()
  152. case ErrNoPeers:
  153. // if there are no peers to serve, retry later
  154. select {
  155. case <-ctx.Done():
  156. return ctx.Err()
  157. case <-time.After(time.Second * 10):
  158. // stay in the loop and try again
  159. }
  160. default:
  161. return err
  162. }
  163. }
  164. }
  165. // Reset implements core.ChainIndexerBackend
  166. func (c *ChtIndexerBackend) Reset(ctx context.Context, section uint64, lastSectionHead common.Hash) error {
  167. var root common.Hash
  168. if section > 0 {
  169. root = GetChtRoot(c.diskdb, section-1, lastSectionHead)
  170. }
  171. var err error
  172. c.trie, err = trie.New(common.Hash{}, root, c.triedb)
  173. if err != nil && c.odr != nil {
  174. err = c.fetchMissingNodes(ctx, section, root)
  175. if err == nil {
  176. c.trie, err = trie.New(common.Hash{}, root, c.triedb)
  177. }
  178. }
  179. c.section = section
  180. return err
  181. }
  182. // Process implements core.ChainIndexerBackend
  183. func (c *ChtIndexerBackend) Process(ctx context.Context, header *types.Header) error {
  184. hash, num := header.Hash(), header.Number.Uint64()
  185. c.lastHash = hash
  186. td := rawdb.ReadTd(c.diskdb, hash, num)
  187. if td == nil {
  188. panic(nil)
  189. }
  190. var encNumber [8]byte
  191. binary.BigEndian.PutUint64(encNumber[:], num)
  192. data, _ := rlp.EncodeToBytes(ChtNode{hash, td})
  193. c.trie.Update(encNumber[:], data)
  194. return nil
  195. }
  196. // Commit implements core.ChainIndexerBackend
  197. func (c *ChtIndexerBackend) Commit() error {
  198. root, nodes, err := c.trie.Commit(false)
  199. if err != nil {
  200. return err
  201. }
  202. // Commit trie changes into trie database in case it's not nil.
  203. if nodes != nil {
  204. if err := c.triedb.Update(trie.NewWithNodeSet(nodes)); err != nil {
  205. return err
  206. }
  207. }
  208. // Re-create trie with newly generated root and updated database.
  209. c.trie, err = trie.New(common.Hash{}, root, c.triedb)
  210. if err != nil {
  211. return err
  212. }
  213. // Pruning historical trie nodes if necessary.
  214. if !c.disablePruning {
  215. // Flush the triedb and track the latest trie nodes.
  216. c.trieset.Clear()
  217. c.triedb.Commit(root, false, func(hash common.Hash) { c.trieset.Add(hash) })
  218. it := c.trieTable.NewIterator(nil, nil)
  219. defer it.Release()
  220. var (
  221. deleted int
  222. remaining int
  223. t = time.Now()
  224. )
  225. for it.Next() {
  226. trimmed := bytes.TrimPrefix(it.Key(), []byte(ChtTablePrefix))
  227. if !c.trieset.Contains(common.BytesToHash(trimmed)) {
  228. c.trieTable.Delete(trimmed)
  229. deleted += 1
  230. } else {
  231. remaining += 1
  232. }
  233. }
  234. log.Debug("Prune historical CHT trie nodes", "deleted", deleted, "remaining", remaining, "elapsed", common.PrettyDuration(time.Since(t)))
  235. } else {
  236. c.triedb.Commit(root, false, nil)
  237. }
  238. log.Info("Storing CHT", "section", c.section, "head", fmt.Sprintf("%064x", c.lastHash), "root", fmt.Sprintf("%064x", root))
  239. StoreChtRoot(c.diskdb, c.section, c.lastHash, root)
  240. return nil
  241. }
  242. // Prune implements core.ChainIndexerBackend which deletes all chain data
  243. // (except hash<->number mappings) older than the specified threshold.
  244. func (c *ChtIndexerBackend) Prune(threshold uint64) error {
  245. // Short circuit if the light pruning is disabled.
  246. if c.disablePruning {
  247. return nil
  248. }
  249. t := time.Now()
  250. // Always keep genesis header in database.
  251. start, end := uint64(1), (threshold+1)*c.sectionSize
  252. var batch = c.diskdb.NewBatch()
  253. for {
  254. numbers, hashes := rawdb.ReadAllCanonicalHashes(c.diskdb, start, end, 10240)
  255. if len(numbers) == 0 {
  256. break
  257. }
  258. for i := 0; i < len(numbers); i++ {
  259. // Keep hash<->number mapping in database otherwise the hash based
  260. // API(e.g. GetReceipt, GetLogs) will be broken.
  261. //
  262. // Storage size wise, the size of a mapping is ~41bytes. For one
  263. // section is about 1.3MB which is acceptable.
  264. //
  265. // In order to totally get rid of this index, we need an additional
  266. // flag to specify how many historical data light client can serve.
  267. rawdb.DeleteCanonicalHash(batch, numbers[i])
  268. rawdb.DeleteBlockWithoutNumber(batch, hashes[i], numbers[i])
  269. }
  270. if batch.ValueSize() > ethdb.IdealBatchSize {
  271. if err := batch.Write(); err != nil {
  272. return err
  273. }
  274. batch.Reset()
  275. }
  276. start = numbers[len(numbers)-1] + 1
  277. }
  278. if err := batch.Write(); err != nil {
  279. return err
  280. }
  281. log.Debug("Prune history headers", "threshold", threshold, "elapsed", common.PrettyDuration(time.Since(t)))
  282. return nil
  283. }
  284. var (
  285. bloomTriePrefix = []byte("bltRoot-") // bloomTriePrefix + bloomTrieNum (uint64 big endian) -> trie root hash
  286. BloomTrieTablePrefix = "blt-"
  287. )
  288. // GetBloomTrieRoot reads the BloomTrie root associated to the given section from the database
  289. func GetBloomTrieRoot(db ethdb.Database, sectionIdx uint64, sectionHead common.Hash) common.Hash {
  290. var encNumber [8]byte
  291. binary.BigEndian.PutUint64(encNumber[:], sectionIdx)
  292. data, _ := db.Get(append(append(bloomTriePrefix, encNumber[:]...), sectionHead.Bytes()...))
  293. return common.BytesToHash(data)
  294. }
  295. // StoreBloomTrieRoot writes the BloomTrie root associated to the given section into the database
  296. func StoreBloomTrieRoot(db ethdb.Database, sectionIdx uint64, sectionHead, root common.Hash) {
  297. var encNumber [8]byte
  298. binary.BigEndian.PutUint64(encNumber[:], sectionIdx)
  299. db.Put(append(append(bloomTriePrefix, encNumber[:]...), sectionHead.Bytes()...), root.Bytes())
  300. }
  301. // BloomTrieIndexerBackend implements core.ChainIndexerBackend
  302. type BloomTrieIndexerBackend struct {
  303. disablePruning bool
  304. diskdb, trieTable ethdb.Database
  305. triedb *trie.Database
  306. trieset mapset.Set
  307. odr OdrBackend
  308. section uint64
  309. parentSize uint64
  310. size uint64
  311. bloomTrieRatio uint64
  312. trie *trie.Trie
  313. sectionHeads []common.Hash
  314. }
  315. // NewBloomTrieIndexer creates a BloomTrie chain indexer
  316. func NewBloomTrieIndexer(db ethdb.Database, odr OdrBackend, parentSize, size uint64, disablePruning bool) *core.ChainIndexer {
  317. trieTable := rawdb.NewTable(db, BloomTrieTablePrefix)
  318. backend := &BloomTrieIndexerBackend{
  319. diskdb: db,
  320. odr: odr,
  321. trieTable: trieTable,
  322. triedb: trie.NewDatabaseWithConfig(trieTable, &trie.Config{Cache: 1}), // Use a tiny cache only to keep memory down
  323. trieset: mapset.NewSet(),
  324. parentSize: parentSize,
  325. size: size,
  326. disablePruning: disablePruning,
  327. }
  328. backend.bloomTrieRatio = size / parentSize
  329. backend.sectionHeads = make([]common.Hash, backend.bloomTrieRatio)
  330. return core.NewChainIndexer(db, rawdb.NewTable(db, "bltIndex-"), backend, size, 0, time.Millisecond*100, "bloomtrie")
  331. }
  332. // fetchMissingNodes tries to retrieve the last entries of the latest trusted bloom trie from the
  333. // ODR backend in order to be able to add new entries and calculate subsequent root hashes
  334. func (b *BloomTrieIndexerBackend) fetchMissingNodes(ctx context.Context, section uint64, root common.Hash) error {
  335. indexCh := make(chan uint, types.BloomBitLength)
  336. type res struct {
  337. nodes *NodeSet
  338. err error
  339. }
  340. resCh := make(chan res, types.BloomBitLength)
  341. for i := 0; i < 20; i++ {
  342. go func() {
  343. for bitIndex := range indexCh {
  344. r := &BloomRequest{BloomTrieRoot: root, BloomTrieNum: section - 1, BitIdx: bitIndex, SectionIndexList: []uint64{section - 1}, Config: b.odr.IndexerConfig()}
  345. for {
  346. if err := b.odr.Retrieve(ctx, r); err == ErrNoPeers {
  347. // if there are no peers to serve, retry later
  348. select {
  349. case <-ctx.Done():
  350. resCh <- res{nil, ctx.Err()}
  351. return
  352. case <-time.After(time.Second * 10):
  353. // stay in the loop and try again
  354. }
  355. } else {
  356. resCh <- res{r.Proofs, err}
  357. break
  358. }
  359. }
  360. }
  361. }()
  362. }
  363. for i := uint(0); i < types.BloomBitLength; i++ {
  364. indexCh <- i
  365. }
  366. close(indexCh)
  367. batch := b.trieTable.NewBatch()
  368. for i := uint(0); i < types.BloomBitLength; i++ {
  369. res := <-resCh
  370. if res.err != nil {
  371. return res.err
  372. }
  373. res.nodes.Store(batch)
  374. }
  375. return batch.Write()
  376. }
  377. // Reset implements core.ChainIndexerBackend
  378. func (b *BloomTrieIndexerBackend) Reset(ctx context.Context, section uint64, lastSectionHead common.Hash) error {
  379. var root common.Hash
  380. if section > 0 {
  381. root = GetBloomTrieRoot(b.diskdb, section-1, lastSectionHead)
  382. }
  383. var err error
  384. b.trie, err = trie.New(common.Hash{}, root, b.triedb)
  385. if err != nil && b.odr != nil {
  386. err = b.fetchMissingNodes(ctx, section, root)
  387. if err == nil {
  388. b.trie, err = trie.New(common.Hash{}, root, b.triedb)
  389. }
  390. }
  391. b.section = section
  392. return err
  393. }
  394. // Process implements core.ChainIndexerBackend
  395. func (b *BloomTrieIndexerBackend) Process(ctx context.Context, header *types.Header) error {
  396. num := header.Number.Uint64() - b.section*b.size
  397. if (num+1)%b.parentSize == 0 {
  398. b.sectionHeads[num/b.parentSize] = header.Hash()
  399. }
  400. return nil
  401. }
  402. // Commit implements core.ChainIndexerBackend
  403. func (b *BloomTrieIndexerBackend) Commit() error {
  404. var compSize, decompSize uint64
  405. for i := uint(0); i < types.BloomBitLength; i++ {
  406. var encKey [10]byte
  407. binary.BigEndian.PutUint16(encKey[0:2], uint16(i))
  408. binary.BigEndian.PutUint64(encKey[2:10], b.section)
  409. var decomp []byte
  410. for j := uint64(0); j < b.bloomTrieRatio; j++ {
  411. data, err := rawdb.ReadBloomBits(b.diskdb, i, b.section*b.bloomTrieRatio+j, b.sectionHeads[j])
  412. if err != nil {
  413. return err
  414. }
  415. decompData, err2 := bitutil.DecompressBytes(data, int(b.parentSize/8))
  416. if err2 != nil {
  417. return err2
  418. }
  419. decomp = append(decomp, decompData...)
  420. }
  421. comp := bitutil.CompressBytes(decomp)
  422. decompSize += uint64(len(decomp))
  423. compSize += uint64(len(comp))
  424. if len(comp) > 0 {
  425. b.trie.Update(encKey[:], comp)
  426. } else {
  427. b.trie.Delete(encKey[:])
  428. }
  429. }
  430. root, nodes, err := b.trie.Commit(false)
  431. if err != nil {
  432. return err
  433. }
  434. // Commit trie changes into trie database in case it's not nil.
  435. if nodes != nil {
  436. if err := b.triedb.Update(trie.NewWithNodeSet(nodes)); err != nil {
  437. return err
  438. }
  439. }
  440. // Re-create trie with newly generated root and updated database.
  441. b.trie, err = trie.New(common.Hash{}, root, b.triedb)
  442. if err != nil {
  443. return err
  444. }
  445. // Pruning historical trie nodes if necessary.
  446. if !b.disablePruning {
  447. // Flush the triedb and track the latest trie nodes.
  448. b.trieset.Clear()
  449. b.triedb.Commit(root, false, func(hash common.Hash) { b.trieset.Add(hash) })
  450. it := b.trieTable.NewIterator(nil, nil)
  451. defer it.Release()
  452. var (
  453. deleted int
  454. remaining int
  455. t = time.Now()
  456. )
  457. for it.Next() {
  458. trimmed := bytes.TrimPrefix(it.Key(), []byte(BloomTrieTablePrefix))
  459. if !b.trieset.Contains(common.BytesToHash(trimmed)) {
  460. b.trieTable.Delete(trimmed)
  461. deleted += 1
  462. } else {
  463. remaining += 1
  464. }
  465. }
  466. log.Debug("Prune historical bloom trie nodes", "deleted", deleted, "remaining", remaining, "elapsed", common.PrettyDuration(time.Since(t)))
  467. } else {
  468. b.triedb.Commit(root, false, nil)
  469. }
  470. sectionHead := b.sectionHeads[b.bloomTrieRatio-1]
  471. StoreBloomTrieRoot(b.diskdb, b.section, sectionHead, root)
  472. log.Info("Storing bloom trie", "section", b.section, "head", fmt.Sprintf("%064x", sectionHead), "root", fmt.Sprintf("%064x", root), "compression", float64(compSize)/float64(decompSize))
  473. return nil
  474. }
  475. // Prune implements core.ChainIndexerBackend which deletes all
  476. // bloombits which older than the specified threshold.
  477. func (b *BloomTrieIndexerBackend) Prune(threshold uint64) error {
  478. // Short circuit if the light pruning is disabled.
  479. if b.disablePruning {
  480. return nil
  481. }
  482. start := time.Now()
  483. for i := uint(0); i < types.BloomBitLength; i++ {
  484. rawdb.DeleteBloombits(b.diskdb, i, 0, threshold*b.bloomTrieRatio+b.bloomTrieRatio)
  485. }
  486. log.Debug("Prune history bloombits", "threshold", threshold, "elapsed", common.PrettyDuration(time.Since(start)))
  487. return nil
  488. }