浏览代码

[R4R]prefetch state by applying the transactions within one block (#704)

* prefetch state by apply transactions within one block

* resolve comments

* stop prefetch once process is done

* update comments

fix ut
zjubfd 3 年之前
父节点
当前提交
c78ecfbb4b
共有 4 个文件被更改,包括 65 次插入36 次删除
  1. 16 5
      core/blockchain.go
  2. 1 1
      core/blockchain_diff_test.go
  3. 48 29
      core/state_prefetcher.go
  4. 0 1
      core/state_processor.go

+ 16 - 5
core/blockchain.go

@@ -90,6 +90,7 @@ const (
 	maxFutureBlocks        = 256
 	maxTimeFutureBlocks    = 30
 	maxBeyondBlocks        = 2048
+	prefetchTxNumber       = 100
 
 	diffLayerFreezerRecheckInterval = 3 * time.Second
 	diffLayerPruneRecheckInterval   = 1 * time.Second // The interval to prune unverified diff layers
@@ -233,10 +234,11 @@ type BlockChain struct {
 	running       int32          // 0 if chain is running, 1 when stopped
 	procInterrupt int32          // interrupt signaler for block processing
 
-	engine    consensus.Engine
-	validator Validator // Block and state validator interface
-	processor Processor // Block transaction processor interface
-	vmConfig  vm.Config
+	engine     consensus.Engine
+	prefetcher Prefetcher
+	validator  Validator // Block and state validator interface
+	processor  Processor // Block transaction processor interface
+	vmConfig   vm.Config
 
 	shouldPreserve  func(*types.Block) bool        // Function used to determine whether should preserve the given block.
 	terminateInsert func(common.Hash, uint64) bool // Testing hook used to terminate ancient receipt chain insertion.
@@ -295,6 +297,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
 		diffNumToBlockHashes:  make(map[uint64]map[common.Hash]struct{}),
 		diffPeersToDiffHashes: make(map[string]map[common.Hash]struct{}),
 	}
+	bc.prefetcher = NewStatePrefetcher(chainConfig, bc, engine)
 	bc.validator = NewBlockValidator(chainConfig, bc, engine)
 	bc.processor = NewStateProcessor(chainConfig, bc, engine)
 
@@ -2051,10 +2054,18 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er
 
 		// Enable prefetching to pull in trie node paths while processing transactions
 		statedb.StartPrefetcher("chain")
-
+		var followupInterrupt uint32
+		// For diff sync, it may fallback to full sync, so we still do prefetch
+		if len(block.Transactions()) >= prefetchTxNumber {
+			throwaway := statedb.Copy()
+			go func(start time.Time, followup *types.Block, throwaway *state.StateDB, interrupt *uint32) {
+				bc.prefetcher.Prefetch(followup, throwaway, bc.vmConfig, &followupInterrupt)
+			}(time.Now(), block, throwaway, &followupInterrupt)
+		}
 		//Process block using the parent state as reference point
 		substart := time.Now()
 		statedb, receipts, logs, usedGas, err := bc.processor.Process(block, statedb, bc.vmConfig)
+		atomic.StoreUint32(&followupInterrupt, 1)
 		activeState = statedb
 		if err != nil {
 			bc.reportBlock(block, receipts, err)

+ 1 - 1
core/blockchain_diff_test.go

@@ -372,7 +372,7 @@ func TestFreezeDiffLayer(t *testing.T) {
 		t.Errorf("size of diff queue is wrong, expected: %d, get: %d", blockNum-1, fullBackend.chain.diffQueue.Size())
 	}
 
-	time.Sleep(diffLayerFreezerRecheckInterval + 1*time.Second)
+	time.Sleep(diffLayerFreezerRecheckInterval + 2*time.Second)
 	if fullBackend.chain.diffQueue.Size() != int(fullBackend.chain.triesInMemory) {
 		t.Errorf("size of diff queue is wrong, expected: %d, get: %d", blockNum, fullBackend.chain.diffQueue.Size())
 	}

+ 48 - 29
core/state_prefetcher.go

@@ -17,6 +17,7 @@
 package core
 
 import (
+	"runtime"
 	"sync/atomic"
 
 	"github.com/ethereum/go-ethereum/consensus"
@@ -35,42 +36,60 @@ type statePrefetcher struct {
 	engine consensus.Engine    // Consensus engine used for block rewards
 }
 
+// NewStatePrefetcher initialises a new statePrefetcher.
+func NewStatePrefetcher(config *params.ChainConfig, bc *BlockChain, engine consensus.Engine) *statePrefetcher {
+	return &statePrefetcher{
+		config: config,
+		bc:     bc,
+		engine: engine,
+	}
+}
+
 // Prefetch processes the state changes according to the Ethereum rules by running
 // the transaction messages using the statedb, but any changes are discarded. The
-// only goal is to pre-cache transaction signatures and state trie nodes.
+// only goal is to pre-cache transaction signatures and snapshot clean state.
 func (p *statePrefetcher) Prefetch(block *types.Block, statedb *state.StateDB, cfg vm.Config, interrupt *uint32) {
 	var (
-		header       = block.Header()
-		gaspool      = new(GasPool).AddGas(block.GasLimit())
-		blockContext = NewEVMBlockContext(header, p.bc, nil)
-		evm          = vm.NewEVM(blockContext, vm.TxContext{}, statedb, p.config, cfg)
-		signer       = types.MakeSigner(p.config, header.Number)
+		header = block.Header()
+		signer = types.MakeSigner(p.config, header.Number)
 	)
-	// Iterate over and process the individual transactions
-	byzantium := p.config.IsByzantium(block.Number())
-	for i, tx := range block.Transactions() {
-		// If block precaching was interrupted, abort
-		if interrupt != nil && atomic.LoadUint32(interrupt) == 1 {
-			return
-		}
-		// Convert the transaction into an executable message and pre-cache its sender
-		msg, err := tx.AsMessage(signer)
-		if err != nil {
-			return // Also invalid block, bail out
-		}
-		statedb.Prepare(tx.Hash(), block.Hash(), i)
-		if err := precacheTransaction(msg, p.config, gaspool, statedb, header, evm); err != nil {
-			return // Ugh, something went horribly wrong, bail out
-		}
-		// If we're pre-byzantium, pre-load trie nodes for the intermediate root
-		if !byzantium {
-			statedb.IntermediateRoot(true)
-		}
+	transactions := block.Transactions()
+	threads := runtime.NumCPU()
+	batch := len(transactions) / (threads + 1)
+	if batch == 0 {
+		return
 	}
-	// If were post-byzantium, pre-load trie nodes for the final root hash
-	if byzantium {
-		statedb.IntermediateRoot(true)
+	// No need to execute the first batch, since the main processor will do it.
+	for i := 1; i <= threads; i++ {
+		start := i * batch
+		end := (i + 1) * batch
+		if i == threads {
+			end = len(transactions)
+		}
+		go func(start, end int) {
+			newStatedb := statedb.Copy()
+			gaspool := new(GasPool).AddGas(block.GasLimit())
+			blockContext := NewEVMBlockContext(header, p.bc, nil)
+			evm := vm.NewEVM(blockContext, vm.TxContext{}, statedb, p.config, cfg)
+			// Iterate over and process the individual transactions
+			for i, tx := range transactions[start:end] {
+				// If block precaching was interrupted, abort
+				if interrupt != nil && atomic.LoadUint32(interrupt) == 1 {
+					return
+				}
+				// Convert the transaction into an executable message and pre-cache its sender
+				msg, err := tx.AsMessage(signer)
+				if err != nil {
+					return // Also invalid block, bail out
+				}
+				newStatedb.Prepare(tx.Hash(), block.Hash(), i)
+				if err := precacheTransaction(msg, p.config, gaspool, newStatedb, header, evm); err != nil {
+					return // Ugh, something went horribly wrong, bail out
+				}
+			}
+		}(start, end)
 	}
+
 }
 
 // precacheTransaction attempts to apply a transaction to the given state database

+ 0 - 1
core/state_processor.go

@@ -378,7 +378,6 @@ func (p *StateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg
 		gp      = new(GasPool).AddGas(block.GasLimit())
 	)
 	signer := types.MakeSigner(p.bc.chainConfig, block.Number())
-	statedb.TryPreload(block, signer)
 	var receipts = make([]*types.Receipt, 0)
 	// Mutate the block and state according to any hard-fork specs
 	if p.config.DAOForkSupport && p.config.DAOForkBlock != nil && p.config.DAOForkBlock.Cmp(block.Number()) == 0 {