| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294 |
- // Copyright 2017 The go-ethereum Authors
- // This file is part of the go-ethereum library.
- //
- // The go-ethereum library is free software: you can redistribute it and/or modify
- // it under the terms of the GNU Lesser General Public License as published by
- // the Free Software Foundation, either version 3 of the License, or
- // (at your option) any later version.
- //
- // The go-ethereum library is distributed in the hope that it will be useful,
- // but WITHOUT ANY WARRANTY; without even the implied warranty of
- // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- // GNU Lesser General Public License for more details.
- //
- // You should have received a copy of the GNU Lesser General Public License
- // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
- // Package core implements the Ethereum consensus protocol.
- package core
- import (
- "encoding/binary"
- "sync"
- "time"
- "github.com/ethereum/go-ethereum/common"
- "github.com/ethereum/go-ethereum/core/types"
- "github.com/ethereum/go-ethereum/ethdb"
- "github.com/ethereum/go-ethereum/event"
- )
- // ChainIndexer does a post-processing job for equally sized sections of the canonical
- // chain (like BlooomBits and CHT structures). A ChainIndexer is connected to the blockchain
- // through the event system by starting a ChainEventLoop in a goroutine.
- // Further child ChainIndexers can be added which use the output of the parent section
- // indexer. These child indexers receive new head notifications only after an entire section
- // has been finished or in case of rollbacks that might affect already finished sections.
- type ChainIndexer struct {
- chainDb, indexDb ethdb.Database
- backend ChainIndexerBackend
- sectionSize, confirmReq uint64
- stop chan struct{}
- lock sync.Mutex
- procWait time.Duration
- tryUpdate chan struct{}
- stored, targetCount, calcIdx, lastForwarded uint64
- updating bool
- children []*ChainIndexer
- }
- // ChainIndexerBackend interface is a backend for the indexer doing the actual post-processing job
- type ChainIndexerBackend interface {
- Reset(section uint64) // start processing a new section
- Process(header *types.Header) // process a single block (called for each block in the section)
- Commit(db ethdb.Database) error // do some more processing if necessary and store the results in the database
- UpdateMsg(done, all uint64) // print a progress update message if necessary (only called when multiple sections need to be processed)
- }
- // NewChainIndexer creates a new ChainIndexer
- // db: database where the index of available processed sections is stored (the index is stored by the
- // indexer, the actual processed chain data is stored by the backend)
- // dbKey: key prefix where the index is stored
- // backend: an implementation of ChainIndexerBackend
- // sectionSize: the size of processable sections
- // confirmReq: required number of confirmation blocks before a new section is being processed
- // procWait: waiting time between processing sections (simple way of limiting the resource usage of a db upgrade)
- // stop: quit channel
- func NewChainIndexer(chainDb, indexDb ethdb.Database, backend ChainIndexerBackend, sectionSize, confirmReq uint64, procWait time.Duration, stop chan struct{}) *ChainIndexer {
- c := &ChainIndexer{
- chainDb: chainDb,
- indexDb: indexDb,
- backend: backend,
- sectionSize: sectionSize,
- confirmReq: confirmReq,
- tryUpdate: make(chan struct{}, 1),
- stop: stop,
- procWait: procWait,
- }
- c.stored = c.getValidSections()
- go c.updateLoop()
- return c
- }
- // updateLoop is the main event loop of the indexer
- func (c *ChainIndexer) updateLoop() {
- updateMsg := false
- for {
- select {
- case <-c.stop:
- return
- case <-c.tryUpdate:
- c.lock.Lock()
- if c.targetCount > c.stored {
- if !updateMsg && c.targetCount > c.stored+1 {
- updateMsg = true
- c.backend.UpdateMsg(c.stored, c.targetCount)
- }
- c.calcIdx = c.stored
- var lastSectionHead common.Hash
- if c.calcIdx > 0 {
- lastSectionHead = c.getSectionHead(c.calcIdx - 1)
- }
- c.lock.Unlock()
- sectionHead, ok := c.processSection(c.calcIdx, lastSectionHead)
- c.lock.Lock()
- if ok && lastSectionHead == c.getSectionHead(c.calcIdx-1) {
- c.stored = c.calcIdx + 1
- c.setSectionHead(c.calcIdx, sectionHead)
- c.setValidSections(c.stored)
- if updateMsg {
- c.backend.UpdateMsg(c.stored, c.targetCount)
- if c.stored >= c.targetCount {
- updateMsg = false
- }
- }
- c.lastForwarded = c.stored*c.sectionSize - 1
- for _, cp := range c.children {
- cp.newHead(c.lastForwarded, false)
- }
- } else {
- // if processing has failed, do not retry until further notification
- c.targetCount = c.stored
- }
- }
- if c.targetCount > c.stored {
- go func() {
- time.Sleep(c.procWait)
- c.tryUpdate <- struct{}{}
- }()
- } else {
- c.updating = false
- }
- c.lock.Unlock()
- }
- }
- }
- // ChainEventLoop runs in a goroutine and feeds blockchain events to the indexer by calling newHead
- // (not needed for child indexers where the parent calls newHead)
- func (c *ChainIndexer) ChainEventLoop(currentHeader *types.Header, eventMux *event.TypeMux) {
- sub := eventMux.Subscribe(ChainEvent{})
- c.newHead(currentHeader.Number.Uint64(), false)
- lastHead := currentHeader.Hash()
- for {
- select {
- case <-c.stop:
- return
- case ev := <-sub.Chan():
- header := ev.Data.(ChainEvent).Block.Header()
- c.newHead(header.Number.Uint64(), header.ParentHash != lastHead)
- lastHead = header.Hash()
- }
- }
- }
- // AddChildIndexer adds a child ChainIndexer that can use the output of this one
- func (c *ChainIndexer) AddChildIndexer(ci *ChainIndexer) {
- c.children = append(c.children, ci)
- }
- // newHead notifies the indexer about new chain heads or rollbacks
- func (c *ChainIndexer) newHead(headNum uint64, rollback bool) {
- c.lock.Lock()
- defer c.lock.Unlock()
- if rollback {
- firstChanged := headNum / c.sectionSize
- if firstChanged < c.targetCount {
- c.targetCount = firstChanged
- }
- if firstChanged < c.stored {
- c.stored = firstChanged
- c.setValidSections(c.stored)
- }
- headNum = firstChanged * c.sectionSize
- if headNum < c.lastForwarded {
- c.lastForwarded = headNum
- for _, cp := range c.children {
- cp.newHead(c.lastForwarded, true)
- }
- }
- } else {
- var newCount uint64
- if headNum >= c.confirmReq {
- newCount = (headNum + 1 - c.confirmReq) / c.sectionSize
- if newCount > c.targetCount {
- c.targetCount = newCount
- if !c.updating {
- c.updating = true
- c.tryUpdate <- struct{}{}
- }
- }
- }
- }
- }
- // processSection processes an entire section by calling backend functions while ensuring
- // the continuity of the passed headers. Since the chain mutex is not held while processing,
- // the continuity can be broken by a long reorg, in which case the function returns with ok == false.
- func (c *ChainIndexer) processSection(section uint64, lastSectionHead common.Hash) (sectionHead common.Hash, ok bool) {
- c.backend.Reset(section)
- head := lastSectionHead
- for i := section * c.sectionSize; i < (section+1)*c.sectionSize; i++ {
- hash := GetCanonicalHash(c.chainDb, i)
- if hash == (common.Hash{}) {
- return common.Hash{}, false
- }
- header := GetHeader(c.chainDb, hash, i)
- if header == nil || header.ParentHash != head {
- return common.Hash{}, false
- }
- c.backend.Process(header)
- head = header.Hash()
- }
- if err := c.backend.Commit(c.chainDb); err != nil {
- return common.Hash{}, false
- }
- return head, true
- }
- // CanonicalSections returns the number of processed sections that are consistent with
- // the current canonical chain
- func (c *ChainIndexer) CanonicalSections() uint64 {
- c.lock.Lock()
- defer c.lock.Unlock()
- cnt := c.getValidSections()
- for cnt > 0 {
- if c.getSectionHead(cnt-1) == GetCanonicalHash(c.chainDb, cnt*c.sectionSize-1) {
- break
- }
- cnt--
- c.setValidSections(cnt)
- }
- return cnt
- }
- // getValidSections reads the number of valid sections from the index database
- func (c *ChainIndexer) getValidSections() uint64 {
- data, _ := c.indexDb.Get([]byte("count"))
- if len(data) == 8 {
- return binary.BigEndian.Uint64(data[:])
- }
- return 0
- }
- // setValidSections writes the number of valid sections to the index database
- func (c *ChainIndexer) setValidSections(cnt uint64) {
- oldCnt := c.getValidSections()
- if cnt < oldCnt {
- for i := cnt; i < oldCnt; i++ {
- c.removeSectionHead(i)
- }
- }
- var data [8]byte
- binary.BigEndian.PutUint64(data[:], cnt)
- c.indexDb.Put([]byte("count"), data[:])
- }
- // getSectionHead reads the last block hash of a processed section from the index database
- func (c *ChainIndexer) getSectionHead(idx uint64) common.Hash {
- var data [8]byte
- binary.BigEndian.PutUint64(data[:], idx)
- hash, _ := c.indexDb.Get(append([]byte("shead"), data[:]...))
- if len(hash) == len(common.Hash{}) {
- return common.BytesToHash(hash)
- }
- return common.Hash{}
- }
- // setSectionHead writes the last block hash of a processed section to the index database
- func (c *ChainIndexer) setSectionHead(idx uint64, shead common.Hash) {
- var data [8]byte
- binary.BigEndian.PutUint64(data[:], idx)
- c.indexDb.Put(append([]byte("shead"), data[:]...), shead.Bytes())
- }
- // removeSectionHead removes the reference to a processed section from the index database
- func (c *ChainIndexer) removeSectionHead(idx uint64) {
- var data [8]byte
- binary.BigEndian.PutUint64(data[:], idx)
- c.indexDb.Delete(append([]byte("shead"), data[:]...))
- }
|