Ver Fonte

[R4R] implement State Verification && Snapshot Commit pipeline (#668)

* pipeline commit trie

add metrics

reopen trie

* add unit testcase

* resolve keefe's comment

* resolve igor's comments

* update prefetch

remove prefetcher

* no need to return error for precacheTransaction

* fix lint issue

* add some comments

* remove useless code

* add default option is false

* fix diffsync nil point

* fix panic on  GetProofByHash

Co-authored-by: zjubfd <zjubfd@google.com>
zjubfd há 3 anos atrás
pai
commit
eb7e3092d5

+ 6 - 2
cmd/evm/internal/t8ntool/execution.go

@@ -223,7 +223,9 @@ func (pre *Prestate) Apply(vmConfig vm.Config, chainConfig *params.ChainConfig,
 		statedb.AddBalance(pre.Env.Coinbase, minerReward)
 	}
 	// Commit block
-	root, _, err := statedb.Commit(chainConfig.IsEIP158(vmContext.BlockNumber))
+	statedb.Finalise(chainConfig.IsEIP158(vmContext.BlockNumber))
+	statedb.AccountsIntermediateRoot()
+	root, _, err := statedb.Commit(nil)
 	if err != nil {
 		fmt.Fprintf(os.Stderr, "Could not commit state: %v", err)
 		return nil, nil, NewError(ErrorEVM, fmt.Errorf("could not commit state: %v", err))
@@ -252,7 +254,9 @@ func MakePreState(db ethdb.Database, accounts core.GenesisAlloc) *state.StateDB
 		}
 	}
 	// Commit and re-open to start with a clean state.
-	root, _, _ := statedb.Commit(false)
+	statedb.Finalise(false)
+	statedb.AccountsIntermediateRoot()
+	root, _, _ := statedb.Commit(nil)
 	statedb, _ = state.New(root, sdb, nil)
 	return statedb
 }

+ 3 - 1
cmd/evm/runner.go

@@ -268,7 +268,9 @@ func runCmd(ctx *cli.Context) error {
 	output, leftOverGas, stats, err := timedExec(bench, execFunc)
 
 	if ctx.GlobalBool(DumpFlag.Name) {
-		statedb.Commit(true)
+		statedb.Finalise(true)
+		statedb.AccountsIntermediateRoot()
+		statedb.Commit(nil)
 		statedb.IntermediateRoot(true)
 		fmt.Println(string(statedb.Dump(false, false, true)))
 	}

+ 2 - 1
cmd/evm/staterunner.go

@@ -101,7 +101,8 @@ func stateTestCmd(ctx *cli.Context) error {
 			_, state, err := test.Run(st, cfg, false)
 			// print state root for evmlab tracing
 			if ctx.GlobalBool(MachineFlag.Name) && state != nil {
-				fmt.Fprintf(os.Stderr, "{\"stateRoot\": \"%x\"}\n", state.IntermediateRoot(false))
+				root := state.IntermediateRoot(false)
+				fmt.Fprintf(os.Stderr, "{\"stateRoot\": \"%x\"}\n", root)
 			}
 			if err != nil {
 				// Test failed, mark as so and dump any state to aid debugging

+ 1 - 0
cmd/geth/main.go

@@ -72,6 +72,7 @@ var (
 		utils.DirectBroadcastFlag,
 		utils.DisableSnapProtocolFlag,
 		utils.DiffSyncFlag,
+		utils.PipeCommitFlag,
 		utils.RangeLimitFlag,
 		utils.USBFlag,
 		utils.SmartCardDaemonPathFlag,

+ 7 - 0
cmd/utils/flags.go

@@ -127,6 +127,10 @@ var (
 		Usage: "Enable diffy sync, Please note that enable diffsync will improve the syncing speed, " +
 			"but will degrade the security to light client level",
 	}
+	PipeCommitFlag = cli.BoolFlag{
+		Name:  "pipecommit",
+		Usage: "Enable MPT pipeline commit, it will improve syncing performance. It is an experimental feature(default is false)",
+	}
 	RangeLimitFlag = cli.BoolFlag{
 		Name:  "rangelimit",
 		Usage: "Enable 5000 blocks limit for range query",
@@ -1632,6 +1636,9 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *ethconfig.Config) {
 	if ctx.GlobalIsSet(DiffSyncFlag.Name) {
 		cfg.DiffSync = ctx.GlobalBool(DiffSyncFlag.Name)
 	}
+	if ctx.GlobalIsSet(PipeCommitFlag.Name) {
+		cfg.PipeCommit = ctx.GlobalBool(PipeCommitFlag.Name)
+	}
 	if ctx.GlobalIsSet(RangeLimitFlag.Name) {
 		cfg.RangeLimit = ctx.GlobalBool(RangeLimitFlag.Name)
 	}

+ 4 - 0
consensus/clique/clique.go

@@ -560,7 +560,11 @@ func (c *Clique) Finalize(chain consensus.ChainHeaderReader, header *types.Heade
 func (c *Clique) FinalizeAndAssemble(chain consensus.ChainHeaderReader, header *types.Header, state *state.StateDB,
 	txs []*types.Transaction, uncles []*types.Header, receipts []*types.Receipt) (*types.Block, []*types.Receipt, error) {
 	// No block rewards in PoA, so the state remains as is and uncles are dropped
+	var err error
 	header.Root = state.IntermediateRoot(chain.Config().IsEIP158(header.Number))
+	if err != nil {
+		return nil, nil, err
+	}
 	header.UncleHash = types.CalcUncleHash(nil)
 
 	// Assemble and return the final block for sealing

+ 22 - 7
core/block_validator.go

@@ -18,6 +18,7 @@ package core
 
 import (
 	"fmt"
+	"time"
 
 	"github.com/ethereum/go-ethereum/consensus"
 	"github.com/ethereum/go-ethereum/core/state"
@@ -26,6 +27,8 @@ import (
 	"github.com/ethereum/go-ethereum/trie"
 )
 
+const badBlockCacheExpire = 30 * time.Second
+
 // BlockValidator is responsible for validating block headers, uncles and
 // processed state.
 //
@@ -54,6 +57,9 @@ func (v *BlockValidator) ValidateBody(block *types.Block) error {
 	if v.bc.HasBlockAndState(block.Hash(), block.NumberU64()) {
 		return ErrKnownBlock
 	}
+	if v.bc.isCachedBadBlock(block) {
+		return ErrKnownBadBlock
+	}
 	// Header validity is known at this point, check the uncles and transactions
 	header := block.Header()
 	if err := v.engine.VerifyUncles(v.bc, block); err != nil {
@@ -106,7 +112,7 @@ func (v *BlockValidator) ValidateBody(block *types.Block) error {
 // transition, such as amount of used gas, the receipt roots and the state root
 // itself. ValidateState returns a database batch if the validation was a success
 // otherwise nil and an error is returned.
-func (v *BlockValidator) ValidateState(block *types.Block, statedb *state.StateDB, receipts types.Receipts, usedGas uint64) error {
+func (v *BlockValidator) ValidateState(block *types.Block, statedb *state.StateDB, receipts types.Receipts, usedGas uint64, skipHeavyVerify bool) error {
 	header := block.Header()
 	if block.GasUsed() != usedGas {
 		return fmt.Errorf("invalid gas used (remote: %d local: %d)", block.GasUsed(), usedGas)
@@ -125,17 +131,26 @@ func (v *BlockValidator) ValidateState(block *types.Block, statedb *state.StateD
 			receiptSha := types.DeriveSha(receipts, trie.NewStackTrie(nil))
 			if receiptSha != header.ReceiptHash {
 				return fmt.Errorf("invalid receipt root hash (remote: %x local: %x)", header.ReceiptHash, receiptSha)
-			} else {
-				return nil
 			}
+			return nil
 		},
-		func() error {
+	}
+	if skipHeavyVerify {
+		validateFuns = append(validateFuns, func() error {
+			if err := statedb.WaitPipeVerification(); err != nil {
+				return err
+			}
+			statedb.Finalise(v.config.IsEIP158(header.Number))
+			statedb.AccountsIntermediateRoot()
+			return nil
+		})
+	} else {
+		validateFuns = append(validateFuns, func() error {
 			if root := statedb.IntermediateRoot(v.config.IsEIP158(header.Number)); header.Root != root {
 				return fmt.Errorf("invalid merkle root (remote: %x local: %x)", header.Root, root)
-			} else {
-				return nil
 			}
-		},
+			return nil
+		})
 	}
 	validateRes := make(chan error, len(validateFuns))
 	for _, f := range validateFuns {

+ 154 - 69
core/blockchain.go

@@ -77,7 +77,8 @@ var (
 	blockReorgDropMeter     = metrics.NewRegisteredMeter("chain/reorg/drop", nil)
 	blockReorgInvalidatedTx = metrics.NewRegisteredMeter("chain/reorg/invalidTx", nil)
 
-	errInsertionInterrupted = errors.New("insertion is interrupted")
+	errInsertionInterrupted        = errors.New("insertion is interrupted")
+	errStateRootVerificationFailed = errors.New("state root verification failed")
 )
 
 const (
@@ -87,6 +88,7 @@ const (
 	diffLayerRLPCacheLimit = 256
 	receiptsCacheLimit     = 10000
 	txLookupCacheLimit     = 1024
+	maxBadBlockLimit       = 16
 	maxFutureBlocks        = 256
 	maxTimeFutureBlocks    = 30
 	maxBeyondBlocks        = 2048
@@ -99,6 +101,8 @@ const (
 	maxDiffForkDist                 = 11              // Maximum allowed backward distance from the chain head
 	maxDiffLimitForBroadcast        = 128             // Maximum number of unique diff layers a peer may have broadcasted
 
+	rewindBadBlockInterval = 1 * time.Second
+
 	// BlockChainVersion ensures that an incompatible database forces a resync from scratch.
 	//
 	// Changelog:
@@ -177,10 +181,11 @@ type BlockChain struct {
 	chainConfig *params.ChainConfig // Chain & network configuration
 	cacheConfig *CacheConfig        // Cache configuration for pruning
 
-	db     ethdb.Database // Low level persistent database to store final content in
-	snaps  *snapshot.Tree // Snapshot tree for fast trie leaf access
-	triegc *prque.Prque   // Priority queue mapping block numbers to tries to gc
-	gcproc time.Duration  // Accumulates canonical block processing for trie dumping
+	db         ethdb.Database // Low level persistent database to store final content in
+	snaps      *snapshot.Tree // Snapshot tree for fast trie leaf access
+	triegc     *prque.Prque   // Priority queue mapping block numbers to tries to gc
+	gcproc     time.Duration  // Accumulates canonical block processing for trie dumping
+	commitLock sync.Mutex     // CommitLock is used to protect above field from being modified concurrently
 
 	// txLookupLimit is the maximum number of blocks from head whose tx indices
 	// are reserved:
@@ -213,6 +218,7 @@ type BlockChain struct {
 	blockCache    *lru.Cache     // Cache for the most recent entire blocks
 	txLookupCache *lru.Cache     // Cache for the most recent transaction lookup data.
 	futureBlocks  *lru.Cache     // future blocks are blocks added for later processing
+	badBlockCache *lru.Cache     // Cache for the blocks that failed to pass MPT root verification
 
 	// trusted diff layers
 	diffLayerCache             *lru.Cache   // Cache for the diffLayers
@@ -239,6 +245,7 @@ type BlockChain struct {
 	validator  Validator // Block and state validator interface
 	processor  Processor // Block transaction processor interface
 	vmConfig   vm.Config
+	pipeCommit bool
 
 	shouldPreserve  func(*types.Block) bool        // Function used to determine whether should preserve the given block.
 	terminateInsert func(common.Hash, uint64) bool // Testing hook used to terminate ancient receipt chain insertion.
@@ -262,6 +269,8 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
 	receiptsCache, _ := lru.New(receiptsCacheLimit)
 	blockCache, _ := lru.New(blockCacheLimit)
 	txLookupCache, _ := lru.New(txLookupCacheLimit)
+	badBlockCache, _ := lru.New(maxBadBlockLimit)
+
 	futureBlocks, _ := lru.New(maxFutureBlocks)
 	diffLayerCache, _ := lru.New(diffLayerCacheLimit)
 	diffLayerRLPCache, _ := lru.New(diffLayerRLPCacheLimit)
@@ -283,6 +292,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
 		bodyRLPCache:          bodyRLPCache,
 		receiptsCache:         receiptsCache,
 		blockCache:            blockCache,
+		badBlockCache:         badBlockCache,
 		diffLayerCache:        diffLayerCache,
 		diffLayerRLPCache:     diffLayerRLPCache,
 		txLookupCache:         txLookupCache,
@@ -461,7 +471,10 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
 		go bc.trustedDiffLayerLoop()
 	}
 	go bc.untrustedDiffLayerPruneLoop()
-
+	if bc.pipeCommit {
+		// check current block and rewind invalid one
+		go bc.rewindInvalidHeaderBlockLoop()
+	}
 	return bc, nil
 }
 
@@ -577,6 +590,25 @@ func (bc *BlockChain) SetHead(head uint64) error {
 	return err
 }
 
+func (bc *BlockChain) tryRewindBadBlocks() {
+	bc.chainmu.Lock()
+	defer bc.chainmu.Unlock()
+	block := bc.CurrentBlock()
+	snaps := bc.snaps
+	// Verified and Result is false
+	if snaps != nil && snaps.Snapshot(block.Root()) != nil &&
+		snaps.Snapshot(block.Root()).Verified() && !snaps.Snapshot(block.Root()).WaitAndGetVerifyRes() {
+		// Rewind by one block
+		log.Warn("current block verified failed, rewind to its parent", "height", block.NumberU64(), "hash", block.Hash())
+		bc.futureBlocks.Remove(block.Hash())
+		bc.badBlockCache.Add(block.Hash(), time.Now())
+		bc.diffLayerCache.Remove(block.Hash())
+		bc.diffLayerRLPCache.Remove(block.Hash())
+		bc.reportBlock(block, nil, errStateRootVerificationFailed)
+		bc.setHeadBeyondRoot(block.NumberU64()-1, common.Hash{})
+	}
+}
+
 // SetHeadBeyondRoot rewinds the local chain to a new head with the extra condition
 // that the rewind must pass the specified state root. This method is meant to be
 // used when rewinding with snapshots enabled to ensure that we go back further than
@@ -588,7 +620,10 @@ func (bc *BlockChain) SetHead(head uint64) error {
 func (bc *BlockChain) SetHeadBeyondRoot(head uint64, root common.Hash) (uint64, error) {
 	bc.chainmu.Lock()
 	defer bc.chainmu.Unlock()
+	return bc.setHeadBeyondRoot(head, root)
+}
 
+func (bc *BlockChain) setHeadBeyondRoot(head uint64, root common.Hash) (uint64, error) {
 	// Track the block number of the requested root hash
 	var rootNumber uint64 // (no root == always 0)
 
@@ -1056,6 +1091,12 @@ func (bc *BlockChain) HasFastBlock(hash common.Hash, number uint64) bool {
 
 // HasState checks if state trie is fully present in the database or not.
 func (bc *BlockChain) HasState(hash common.Hash) bool {
+	if bc.pipeCommit && bc.snaps != nil {
+		// If parent snap is pending on verification, treat it as state exist
+		if s := bc.snaps.Snapshot(hash); s != nil && !s.Verified() {
+			return true
+		}
+	}
 	_, err := bc.stateCache.OpenTrie(hash)
 	return err == nil
 }
@@ -1667,8 +1708,78 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
 		}
 		wg.Done()
 	}()
+
+	tryCommitTrieDB := func() error {
+		bc.commitLock.Lock()
+		defer bc.commitLock.Unlock()
+
+		triedb := bc.stateCache.TrieDB()
+		// If we're running an archive node, always flush
+		if bc.cacheConfig.TrieDirtyDisabled {
+			err := triedb.Commit(block.Root(), false, nil)
+			if err != nil {
+				return err
+			}
+		} else {
+			// Full but not archive node, do proper garbage collection
+			triedb.Reference(block.Root(), common.Hash{}) // metadata reference to keep trie alive
+			bc.triegc.Push(block.Root(), -int64(block.NumberU64()))
+
+			if current := block.NumberU64(); current > bc.triesInMemory {
+				// If we exceeded our memory allowance, flush matured singleton nodes to disk
+				var (
+					nodes, imgs = triedb.Size()
+					limit       = common.StorageSize(bc.cacheConfig.TrieDirtyLimit) * 1024 * 1024
+				)
+				if nodes > limit || imgs > 4*1024*1024 {
+					triedb.Cap(limit - ethdb.IdealBatchSize)
+				}
+				// Find the next state trie we need to commit
+				chosen := current - bc.triesInMemory
+
+				// If we exceeded out time allowance, flush an entire trie to disk
+				if bc.gcproc > bc.cacheConfig.TrieTimeLimit {
+					canWrite := true
+					if posa, ok := bc.engine.(consensus.PoSA); ok {
+						if !posa.EnoughDistance(bc, block.Header()) {
+							canWrite = false
+						}
+					}
+					if canWrite {
+						// If the header is missing (canonical chain behind), we're reorging a low
+						// diff sidechain. Suspend committing until this operation is completed.
+						header := bc.GetHeaderByNumber(chosen)
+						if header == nil {
+							log.Warn("Reorg in progress, trie commit postponed", "number", chosen)
+						} else {
+							// If we're exceeding limits but haven't reached a large enough memory gap,
+							// warn the user that the system is becoming unstable.
+							if chosen < lastWrite+bc.triesInMemory && bc.gcproc >= 2*bc.cacheConfig.TrieTimeLimit {
+								log.Info("State in memory for too long, committing", "time", bc.gcproc, "allowance", bc.cacheConfig.TrieTimeLimit, "optimum", float64(chosen-lastWrite)/float64(bc.triesInMemory))
+							}
+							// Flush an entire trie and restart the counters
+							triedb.Commit(header.Root, true, nil)
+							lastWrite = chosen
+							bc.gcproc = 0
+						}
+					}
+				}
+				// Garbage collect anything below our required write retention
+				for !bc.triegc.Empty() {
+					root, number := bc.triegc.Pop()
+					if uint64(-number) > chosen {
+						bc.triegc.Push(root, number)
+						break
+					}
+					go triedb.Dereference(root.(common.Hash))
+				}
+			}
+		}
+		return nil
+	}
+
 	// Commit all cached state changes into underlying memory database.
-	root, diffLayer, err := state.Commit(bc.chainConfig.IsEIP158(block.Number()))
+	_, diffLayer, err := state.Commit(bc.tryRewindBadBlocks, tryCommitTrieDB)
 	if err != nil {
 		return NonStatTy, err
 	}
@@ -1681,69 +1792,9 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
 		diffLayer.Number = block.NumberU64()
 		bc.cacheDiffLayer(diffLayer)
 	}
-	triedb := bc.stateCache.TrieDB()
 
-	// If we're running an archive node, always flush
-	if bc.cacheConfig.TrieDirtyDisabled {
-		if err := triedb.Commit(root, false, nil); err != nil {
-			return NonStatTy, err
-		}
-	} else {
-		// Full but not archive node, do proper garbage collection
-		triedb.Reference(root, common.Hash{}) // metadata reference to keep trie alive
-		bc.triegc.Push(root, -int64(block.NumberU64()))
-
-		if current := block.NumberU64(); current > bc.triesInMemory {
-			// If we exceeded our memory allowance, flush matured singleton nodes to disk
-			var (
-				nodes, imgs = triedb.Size()
-				limit       = common.StorageSize(bc.cacheConfig.TrieDirtyLimit) * 1024 * 1024
-			)
-			if nodes > limit || imgs > 4*1024*1024 {
-				triedb.Cap(limit - ethdb.IdealBatchSize)
-			}
-			// Find the next state trie we need to commit
-			chosen := current - bc.triesInMemory
-
-			// If we exceeded out time allowance, flush an entire trie to disk
-			if bc.gcproc > bc.cacheConfig.TrieTimeLimit {
-				canWrite := true
-				if posa, ok := bc.engine.(consensus.PoSA); ok {
-					if !posa.EnoughDistance(bc, block.Header()) {
-						canWrite = false
-					}
-				}
-				if canWrite {
-					// If the header is missing (canonical chain behind), we're reorging a low
-					// diff sidechain. Suspend committing until this operation is completed.
-					header := bc.GetHeaderByNumber(chosen)
-					if header == nil {
-						log.Warn("Reorg in progress, trie commit postponed", "number", chosen)
-					} else {
-						// If we're exceeding limits but haven't reached a large enough memory gap,
-						// warn the user that the system is becoming unstable.
-						if chosen < lastWrite+bc.triesInMemory && bc.gcproc >= 2*bc.cacheConfig.TrieTimeLimit {
-							log.Info("State in memory for too long, committing", "time", bc.gcproc, "allowance", bc.cacheConfig.TrieTimeLimit, "optimum", float64(chosen-lastWrite)/float64(bc.triesInMemory))
-						}
-						// Flush an entire trie and restart the counters
-						triedb.Commit(header.Root, true, nil)
-						lastWrite = chosen
-						bc.gcproc = 0
-					}
-				}
-			}
-			// Garbage collect anything below our required write retention
-			for !bc.triegc.Empty() {
-				root, number := bc.triegc.Pop()
-				if uint64(-number) > chosen {
-					bc.triegc.Push(root, number)
-					break
-				}
-				go triedb.Dereference(root.(common.Hash))
-			}
-		}
-	}
 	wg.Wait()
+
 	// If the total difficulty is higher than our known, add it to the canonical chain
 	// Second clause in the if statement reduces the vulnerability to selfish mining.
 	// Please refer to http://www.cs.cornell.edu/~ie53/publications/btcProcFC.pdf
@@ -2068,6 +2119,10 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er
 		}
 		//Process block using the parent state as reference point
 		substart := time.Now()
+		if bc.pipeCommit {
+			statedb.EnablePipeCommit()
+		}
+		statedb.SetExpectedStateRoot(block.Root())
 		statedb, receipts, logs, usedGas, err := bc.processor.Process(block, statedb, bc.vmConfig)
 		atomic.StoreUint32(&followupInterrupt, 1)
 		activeState = statedb
@@ -2088,7 +2143,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er
 		// Validate the state using the default validator
 		substart = time.Now()
 		if !statedb.IsLightProcessed() {
-			if err := bc.validator.ValidateState(block, statedb, receipts, usedGas); err != nil {
+			if err := bc.validator.ValidateState(block, statedb, receipts, usedGas, bc.pipeCommit); err != nil {
 				log.Error("validate state failed", "error", err)
 				bc.reportBlock(block, receipts, err)
 				return it.index, err
@@ -2503,6 +2558,19 @@ func (bc *BlockChain) update() {
 	}
 }
 
+func (bc *BlockChain) rewindInvalidHeaderBlockLoop() {
+	recheck := time.NewTicker(rewindBadBlockInterval)
+	defer recheck.Stop()
+	for {
+		select {
+		case <-recheck.C:
+			bc.tryRewindBadBlocks()
+		case <-bc.quit:
+			return
+		}
+	}
+}
+
 func (bc *BlockChain) trustedDiffLayerLoop() {
 	recheck := time.NewTicker(diffLayerFreezerRecheckInterval)
 	bc.wg.Add(1)
@@ -2839,6 +2907,18 @@ func (bc *BlockChain) maintainTxIndex(ancients uint64) {
 	}
 }
 
+func (bc *BlockChain) isCachedBadBlock(block *types.Block) bool {
+	if timeAt, exist := bc.badBlockCache.Get(block.Hash()); exist {
+		putAt := timeAt.(time.Time)
+		if time.Since(putAt) >= badBlockCacheExpire {
+			bc.badBlockCache.Remove(block.Hash())
+			return false
+		}
+		return true
+	}
+	return false
+}
+
 // reportBlock logs a bad block error.
 func (bc *BlockChain) reportBlock(block *types.Block, receipts types.Receipts, err error) {
 	rawdb.WriteBadBlock(bc.db, block)
@@ -3009,6 +3089,11 @@ func EnableLightProcessor(bc *BlockChain) *BlockChain {
 	return bc
 }
 
+func EnablePipelineCommit(bc *BlockChain) *BlockChain {
+	bc.pipeCommit = true
+	return bc
+}
+
 func EnablePersistDiff(limit uint64) BlockChainOption {
 	return func(chain *BlockChain) *BlockChain {
 		chain.diffLayerFreezerBlockLimit = limit

+ 4 - 4
core/blockchain_diff_test.go

@@ -317,6 +317,9 @@ func TestProcessDiffLayer(t *testing.T) {
 			lightBackend.Chain().HandleDiffLayer(diff, "testpid", true)
 		}
 		_, err := lightBackend.chain.insertChain([]*types.Block{block}, true)
+		if err != nil {
+			t.Errorf("failed to insert block %v", err)
+		}
 		if checks, exist := checkBlocks[i]; exist {
 			for _, check := range checks.txs {
 				s, _ := lightBackend.Chain().Snapshots().Snapshot(block.Root()).Storage(crypto.Keccak256Hash((*check.to)[:]), check.slot)
@@ -325,9 +328,6 @@ func TestProcessDiffLayer(t *testing.T) {
 				}
 			}
 		}
-		if err != nil {
-			t.Errorf("failed to insert block %v", err)
-		}
 	}
 	currentBlock := lightBackend.chain.CurrentBlock()
 	nextBlock := fullBackend.chain.GetBlockByNumber(currentBlock.NumberU64() + 1)
@@ -368,7 +368,7 @@ func TestFreezeDiffLayer(t *testing.T) {
 		// Wait for the buffer to be zero.
 	}
 	// Minus one empty block.
-	if fullBackend.chain.diffQueue.Size() != blockNum-1 {
+	if fullBackend.chain.diffQueue.Size() > blockNum-1 && fullBackend.chain.diffQueue.Size() < blockNum-2 {
 		t.Errorf("size of diff queue is wrong, expected: %d, get: %d", blockNum-1, fullBackend.chain.diffQueue.Size())
 	}
 

+ 188 - 80
core/blockchain_test.go

@@ -43,7 +43,8 @@ import (
 // So we can deterministically seed different blockchains
 var (
 	canonicalSeed = 1
-	forkSeed      = 2
+	forkSeed1     = 2
+	forkSeed2     = 3
 
 	TestTriesInMemory = 128
 )
@@ -51,14 +52,18 @@ var (
 // newCanonical creates a chain database, and injects a deterministic canonical
 // chain. Depending on the full flag, if creates either a full block chain or a
 // header only chain.
-func newCanonical(engine consensus.Engine, n int, full bool) (ethdb.Database, *BlockChain, error) {
+func newCanonical(engine consensus.Engine, n int, full, pipeline bool) (ethdb.Database, *BlockChain, error) {
 	var (
 		db      = rawdb.NewMemoryDatabase()
 		genesis = new(Genesis).MustCommit(db)
 	)
 
 	// Initialize a fresh chain with only a genesis block
-	blockchain, _ := NewBlockChain(db, nil, params.AllEthashProtocolChanges, engine, vm.Config{}, nil, nil)
+	var ops []BlockChainOption
+	if pipeline {
+		ops = append(ops, EnablePipelineCommit)
+	}
+	blockchain, _ := NewBlockChain(db, nil, params.AllEthashProtocolChanges, engine, vm.Config{}, nil, nil, ops...)
 	// Create and inject the requested chain
 	if n == 0 {
 		return db, blockchain, nil
@@ -76,9 +81,53 @@ func newCanonical(engine consensus.Engine, n int, full bool) (ethdb.Database, *B
 }
 
 // Test fork of length N starting from block i
-func testFork(t *testing.T, blockchain *BlockChain, i, n int, full bool, comparator func(td1, td2 *big.Int)) {
+func testInvalidStateRootBlockImport(t *testing.T, blockchain *BlockChain, i, n int, pipeline bool) {
 	// Copy old chain up to #i into a new db
-	db, blockchain2, err := newCanonical(ethash.NewFaker(), i, full)
+	db, blockchain2, err := newCanonical(ethash.NewFaker(), i, true, pipeline)
+	if err != nil {
+		t.Fatal("could not make new canonical in testFork", err)
+	}
+	defer blockchain2.Stop()
+
+	// Assert the chains have the same header/block at #i
+	hash1 := blockchain.GetBlockByNumber(uint64(i)).Hash()
+	hash2 := blockchain2.GetBlockByNumber(uint64(i)).Hash()
+	if hash1 != hash2 {
+		t.Errorf("chain content mismatch at %d: have hash %v, want hash %v", i, hash2, hash1)
+	}
+	// Extend the newly created chain
+	blockChainB := makeBlockChain(blockchain2.CurrentBlock(), n, ethash.NewFaker(), db, forkSeed1)
+	for idx, block := range blockChainB {
+		block.SetRoot(common.Hash{0: byte(forkSeed1), 19: byte(idx)})
+	}
+	previousBlock := blockchain.CurrentBlock()
+	// Sanity check that the forked chain can be imported into the original
+	if _, err := blockchain.InsertChain(blockChainB); err == nil {
+		t.Fatalf("failed to report insert error")
+	}
+
+	time.Sleep(2 * rewindBadBlockInterval)
+	latestBlock := blockchain.CurrentBlock()
+	if latestBlock.Hash() != previousBlock.Hash() || latestBlock.NumberU64() != previousBlock.NumberU64() {
+		t.Fatalf("rewind do not take effect")
+	}
+	db, blockchain3, err := newCanonical(ethash.NewFaker(), i, true, pipeline)
+	if err != nil {
+		t.Fatal("could not make new canonical in testFork", err)
+	}
+	defer blockchain3.Stop()
+
+	blockChainC := makeBlockChain(blockchain3.CurrentBlock(), n, ethash.NewFaker(), db, forkSeed2)
+
+	if _, err := blockchain.InsertChain(blockChainC); err != nil {
+		t.Fatalf("failed to insert forking chain: %v", err)
+	}
+}
+
+// Test fork of length N starting from block i
+func testFork(t *testing.T, blockchain *BlockChain, i, n int, full, pipeline bool, comparator func(td1, td2 *big.Int)) {
+	// Copy old chain up to #i into a new db
+	db, blockchain2, err := newCanonical(ethash.NewFaker(), i, full, pipeline)
 	if err != nil {
 		t.Fatal("could not make new canonical in testFork", err)
 	}
@@ -102,12 +151,12 @@ func testFork(t *testing.T, blockchain *BlockChain, i, n int, full bool, compara
 		headerChainB []*types.Header
 	)
 	if full {
-		blockChainB = makeBlockChain(blockchain2.CurrentBlock(), n, ethash.NewFaker(), db, forkSeed)
+		blockChainB = makeBlockChain(blockchain2.CurrentBlock(), n, ethash.NewFaker(), db, forkSeed1)
 		if _, err := blockchain2.InsertChain(blockChainB); err != nil {
 			t.Fatalf("failed to insert forking chain: %v", err)
 		}
 	} else {
-		headerChainB = makeHeaderChain(blockchain2.CurrentHeader(), n, ethash.NewFaker(), db, forkSeed)
+		headerChainB = makeHeaderChain(blockchain2.CurrentHeader(), n, ethash.NewFaker(), db, forkSeed1)
 		if _, err := blockchain2.InsertHeaderChain(headerChainB, 1); err != nil {
 			t.Fatalf("failed to insert forking chain: %v", err)
 		}
@@ -117,7 +166,7 @@ func testFork(t *testing.T, blockchain *BlockChain, i, n int, full bool, compara
 
 	if full {
 		tdPre = blockchain.GetTdByHash(blockchain.CurrentBlock().Hash())
-		if err := testBlockChainImport(blockChainB, blockchain); err != nil {
+		if err := testBlockChainImport(blockChainB, pipeline, blockchain); err != nil {
 			t.Fatalf("failed to import forked block chain: %v", err)
 		}
 		tdPost = blockchain.GetTdByHash(blockChainB[len(blockChainB)-1].Hash())
@@ -134,7 +183,7 @@ func testFork(t *testing.T, blockchain *BlockChain, i, n int, full bool, compara
 
 // testBlockChainImport tries to process a chain of blocks, writing them into
 // the database if successful.
-func testBlockChainImport(chain types.Blocks, blockchain *BlockChain) error {
+func testBlockChainImport(chain types.Blocks, pipelineCommit bool, blockchain *BlockChain) error {
 	for _, block := range chain {
 		// Try and process the block
 		err := blockchain.engine.VerifyHeader(blockchain, block.Header(), true)
@@ -151,12 +200,16 @@ func testBlockChainImport(chain types.Blocks, blockchain *BlockChain) error {
 		if err != nil {
 			return err
 		}
+		statedb.SetExpectedStateRoot(block.Root())
+		if pipelineCommit {
+			statedb.EnablePipeCommit()
+		}
 		statedb, receipts, _, usedGas, err := blockchain.processor.Process(block, statedb, vm.Config{})
 		if err != nil {
 			blockchain.reportBlock(block, receipts, err)
 			return err
 		}
-		err = blockchain.validator.ValidateState(block, statedb, receipts, usedGas)
+		err = blockchain.validator.ValidateState(block, statedb, receipts, usedGas, pipelineCommit)
 		if err != nil {
 			blockchain.reportBlock(block, receipts, err)
 			return err
@@ -164,7 +217,9 @@ func testBlockChainImport(chain types.Blocks, blockchain *BlockChain) error {
 		blockchain.chainmu.Lock()
 		rawdb.WriteTd(blockchain.db, block.Hash(), block.NumberU64(), new(big.Int).Add(block.Difficulty(), blockchain.GetTdByHash(block.ParentHash())))
 		rawdb.WriteBlock(blockchain.db, block)
-		statedb.Commit(false)
+		statedb.Finalise(false)
+		statedb.AccountsIntermediateRoot()
+		statedb.Commit(nil)
 		blockchain.chainmu.Unlock()
 	}
 	return nil
@@ -187,8 +242,22 @@ func testHeaderChainImport(chain []*types.Header, blockchain *BlockChain) error
 	return nil
 }
 
+func TestBlockImportVerification(t *testing.T) {
+	length := 5
+
+	// Make first chain starting from genesis
+	_, processor, err := newCanonical(ethash.NewFaker(), length, true, true)
+	if err != nil {
+		t.Fatalf("failed to make new canonical chain: %v", err)
+	}
+	defer processor.Stop()
+	// Start fork from current height
+	processor = EnablePipelineCommit(processor)
+	testInvalidStateRootBlockImport(t, processor, length, 10, true)
+}
+
 func TestLastBlock(t *testing.T) {
-	_, blockchain, err := newCanonical(ethash.NewFaker(), 0, true)
+	_, blockchain, err := newCanonical(ethash.NewFaker(), 0, true, false)
 	if err != nil {
 		t.Fatalf("failed to create pristine chain: %v", err)
 	}
@@ -205,14 +274,20 @@ func TestLastBlock(t *testing.T) {
 
 // Tests that given a starting canonical chain of a given size, it can be extended
 // with various length chains.
-func TestExtendCanonicalHeaders(t *testing.T) { testExtendCanonical(t, false) }
-func TestExtendCanonicalBlocks(t *testing.T)  { testExtendCanonical(t, true) }
+func TestExtendCanonicalHeaders(t *testing.T) {
+	testExtendCanonical(t, false, false)
 
-func testExtendCanonical(t *testing.T, full bool) {
+}
+func TestExtendCanonicalBlocks(t *testing.T) {
+	testExtendCanonical(t, true, false)
+	testExtendCanonical(t, true, true)
+}
+
+func testExtendCanonical(t *testing.T, full, pipeline bool) {
 	length := 5
 
 	// Make first chain starting from genesis
-	_, processor, err := newCanonical(ethash.NewFaker(), length, full)
+	_, processor, err := newCanonical(ethash.NewFaker(), length, full, pipeline)
 	if err != nil {
 		t.Fatalf("failed to make new canonical chain: %v", err)
 	}
@@ -225,22 +300,25 @@ func testExtendCanonical(t *testing.T, full bool) {
 		}
 	}
 	// Start fork from current height
-	testFork(t, processor, length, 1, full, better)
-	testFork(t, processor, length, 2, full, better)
-	testFork(t, processor, length, 5, full, better)
-	testFork(t, processor, length, 10, full, better)
+	testFork(t, processor, length, 1, full, pipeline, better)
+	testFork(t, processor, length, 2, full, pipeline, better)
+	testFork(t, processor, length, 5, full, pipeline, better)
+	testFork(t, processor, length, 10, full, pipeline, better)
 }
 
 // Tests that given a starting canonical chain of a given size, creating shorter
 // forks do not take canonical ownership.
-func TestShorterForkHeaders(t *testing.T) { testShorterFork(t, false) }
-func TestShorterForkBlocks(t *testing.T)  { testShorterFork(t, true) }
+func TestShorterForkHeaders(t *testing.T) { testShorterFork(t, false, false) }
+func TestShorterForkBlocks(t *testing.T) {
+	testShorterFork(t, true, false)
+	testShorterFork(t, true, true)
+}
 
-func testShorterFork(t *testing.T, full bool) {
+func testShorterFork(t *testing.T, full, pipeline bool) {
 	length := 10
 
 	// Make first chain starting from genesis
-	_, processor, err := newCanonical(ethash.NewFaker(), length, full)
+	_, processor, err := newCanonical(ethash.NewFaker(), length, full, pipeline)
 	if err != nil {
 		t.Fatalf("failed to make new canonical chain: %v", err)
 	}
@@ -253,24 +331,30 @@ func testShorterFork(t *testing.T, full bool) {
 		}
 	}
 	// Sum of numbers must be less than `length` for this to be a shorter fork
-	testFork(t, processor, 0, 3, full, worse)
-	testFork(t, processor, 0, 7, full, worse)
-	testFork(t, processor, 1, 1, full, worse)
-	testFork(t, processor, 1, 7, full, worse)
-	testFork(t, processor, 5, 3, full, worse)
-	testFork(t, processor, 5, 4, full, worse)
+	testFork(t, processor, 0, 3, full, pipeline, worse)
+	testFork(t, processor, 0, 7, full, pipeline, worse)
+	testFork(t, processor, 1, 1, full, pipeline, worse)
+	testFork(t, processor, 1, 7, full, pipeline, worse)
+	testFork(t, processor, 5, 3, full, pipeline, worse)
+	testFork(t, processor, 5, 4, full, pipeline, worse)
 }
 
 // Tests that given a starting canonical chain of a given size, creating longer
 // forks do take canonical ownership.
-func TestLongerForkHeaders(t *testing.T) { testLongerFork(t, false) }
-func TestLongerForkBlocks(t *testing.T)  { testLongerFork(t, true) }
+func TestLongerForkHeaders(t *testing.T) {
+	testLongerFork(t, false, false)
+}
+func TestLongerForkBlocks(t *testing.T) {
+	testLongerFork(t, true, false)
+	testLongerFork(t, true, true)
+
+}
 
-func testLongerFork(t *testing.T, full bool) {
+func testLongerFork(t *testing.T, full, pipeline bool) {
 	length := 10
 
 	// Make first chain starting from genesis
-	_, processor, err := newCanonical(ethash.NewFaker(), length, full)
+	_, processor, err := newCanonical(ethash.NewFaker(), length, full, pipeline)
 	if err != nil {
 		t.Fatalf("failed to make new canonical chain: %v", err)
 	}
@@ -283,24 +367,28 @@ func testLongerFork(t *testing.T, full bool) {
 		}
 	}
 	// Sum of numbers must be greater than `length` for this to be a longer fork
-	testFork(t, processor, 0, 11, full, better)
-	testFork(t, processor, 0, 15, full, better)
-	testFork(t, processor, 1, 10, full, better)
-	testFork(t, processor, 1, 12, full, better)
-	testFork(t, processor, 5, 6, full, better)
-	testFork(t, processor, 5, 8, full, better)
+	testFork(t, processor, 0, 11, full, pipeline, better)
+	testFork(t, processor, 0, 15, full, pipeline, better)
+	testFork(t, processor, 1, 10, full, pipeline, better)
+	testFork(t, processor, 1, 12, full, pipeline, better)
+	testFork(t, processor, 5, 6, full, pipeline, better)
+	testFork(t, processor, 5, 8, full, pipeline, better)
 }
 
 // Tests that given a starting canonical chain of a given size, creating equal
 // forks do take canonical ownership.
-func TestEqualForkHeaders(t *testing.T) { testEqualFork(t, false) }
-func TestEqualForkBlocks(t *testing.T)  { testEqualFork(t, true) }
+func TestEqualForkHeaders(t *testing.T) { testEqualFork(t, false, false) }
+func TestEqualForkBlocks(t *testing.T) {
+	testEqualFork(t, true, true)
+	testEqualFork(t, true, false)
 
-func testEqualFork(t *testing.T, full bool) {
+}
+
+func testEqualFork(t *testing.T, full, pipeline bool) {
 	length := 10
 
 	// Make first chain starting from genesis
-	_, processor, err := newCanonical(ethash.NewFaker(), length, full)
+	_, processor, err := newCanonical(ethash.NewFaker(), length, full, pipeline)
 	if err != nil {
 		t.Fatalf("failed to make new canonical chain: %v", err)
 	}
@@ -313,21 +401,24 @@ func testEqualFork(t *testing.T, full bool) {
 		}
 	}
 	// Sum of numbers must be equal to `length` for this to be an equal fork
-	testFork(t, processor, 0, 10, full, equal)
-	testFork(t, processor, 1, 9, full, equal)
-	testFork(t, processor, 2, 8, full, equal)
-	testFork(t, processor, 5, 5, full, equal)
-	testFork(t, processor, 6, 4, full, equal)
-	testFork(t, processor, 9, 1, full, equal)
+	testFork(t, processor, 0, 10, full, pipeline, equal)
+	testFork(t, processor, 1, 9, full, pipeline, equal)
+	testFork(t, processor, 2, 8, full, pipeline, equal)
+	testFork(t, processor, 5, 5, full, pipeline, equal)
+	testFork(t, processor, 6, 4, full, pipeline, equal)
+	testFork(t, processor, 9, 1, full, pipeline, equal)
 }
 
 // Tests that chains missing links do not get accepted by the processor.
-func TestBrokenHeaderChain(t *testing.T) { testBrokenChain(t, false) }
-func TestBrokenBlockChain(t *testing.T)  { testBrokenChain(t, true) }
+func TestBrokenHeaderChain(t *testing.T) { testBrokenChain(t, false, false) }
+func TestBrokenBlockChain(t *testing.T) {
+	testBrokenChain(t, true, false)
+	testBrokenChain(t, true, true)
+}
 
-func testBrokenChain(t *testing.T, full bool) {
+func testBrokenChain(t *testing.T, full, pipeline bool) {
 	// Make chain starting from genesis
-	db, blockchain, err := newCanonical(ethash.NewFaker(), 10, full)
+	db, blockchain, err := newCanonical(ethash.NewFaker(), 10, full, pipeline)
 	if err != nil {
 		t.Fatalf("failed to make new canonical chain: %v", err)
 	}
@@ -335,12 +426,12 @@ func testBrokenChain(t *testing.T, full bool) {
 
 	// Create a forked chain, and try to insert with a missing link
 	if full {
-		chain := makeBlockChain(blockchain.CurrentBlock(), 5, ethash.NewFaker(), db, forkSeed)[1:]
-		if err := testBlockChainImport(chain, blockchain); err == nil {
+		chain := makeBlockChain(blockchain.CurrentBlock(), 5, ethash.NewFaker(), db, forkSeed1)[1:]
+		if err := testBlockChainImport(chain, pipeline, blockchain); err == nil {
 			t.Errorf("broken block chain not reported")
 		}
 	} else {
-		chain := makeHeaderChain(blockchain.CurrentHeader(), 5, ethash.NewFaker(), db, forkSeed)[1:]
+		chain := makeHeaderChain(blockchain.CurrentHeader(), 5, ethash.NewFaker(), db, forkSeed1)[1:]
 		if err := testHeaderChainImport(chain, blockchain); err == nil {
 			t.Errorf("broken header chain not reported")
 		}
@@ -349,19 +440,25 @@ func testBrokenChain(t *testing.T, full bool) {
 
 // Tests that reorganising a long difficult chain after a short easy one
 // overwrites the canonical numbers and links in the database.
-func TestReorgLongHeaders(t *testing.T) { testReorgLong(t, false) }
-func TestReorgLongBlocks(t *testing.T)  { testReorgLong(t, true) }
+func TestReorgLongHeaders(t *testing.T) { testReorgLong(t, false, false) }
+func TestReorgLongBlocks(t *testing.T) {
+	testReorgLong(t, true, false)
+	testReorgLong(t, true, true)
+}
 
-func testReorgLong(t *testing.T, full bool) {
-	testReorg(t, []int64{0, 0, -9}, []int64{0, 0, 0, -9}, 393280, full)
+func testReorgLong(t *testing.T, full, pipeline bool) {
+	testReorg(t, []int64{0, 0, -9}, []int64{0, 0, 0, -9}, 393280, full, pipeline)
 }
 
 // Tests that reorganising a short difficult chain after a long easy one
 // overwrites the canonical numbers and links in the database.
-func TestReorgShortHeaders(t *testing.T) { testReorgShort(t, false) }
-func TestReorgShortBlocks(t *testing.T)  { testReorgShort(t, true) }
+func TestReorgShortHeaders(t *testing.T) { testReorgShort(t, false, false) }
+func TestReorgShortBlocks(t *testing.T) {
+	testReorgShort(t, true, false)
+	testReorgShort(t, true, true)
+}
 
-func testReorgShort(t *testing.T, full bool) {
+func testReorgShort(t *testing.T, full, pipeline bool) {
 	// Create a long easy chain vs. a short heavy one. Due to difficulty adjustment
 	// we need a fairly long chain of blocks with different difficulties for a short
 	// one to become heavyer than a long one. The 96 is an empirical value.
@@ -373,12 +470,12 @@ func testReorgShort(t *testing.T, full bool) {
 	for i := 0; i < len(diff); i++ {
 		diff[i] = -9
 	}
-	testReorg(t, easy, diff, 12615120, full)
+	testReorg(t, easy, diff, 12615120, full, pipeline)
 }
 
-func testReorg(t *testing.T, first, second []int64, td int64, full bool) {
+func testReorg(t *testing.T, first, second []int64, td int64, full, pipeline bool) {
 	// Create a pristine chain and database
-	db, blockchain, err := newCanonical(ethash.NewFaker(), 0, full)
+	db, blockchain, err := newCanonical(ethash.NewFaker(), 0, full, pipeline)
 	if err != nil {
 		t.Fatalf("failed to create pristine chain: %v", err)
 	}
@@ -444,12 +541,16 @@ func testReorg(t *testing.T, first, second []int64, td int64, full bool) {
 }
 
 // Tests that the insertion functions detect banned hashes.
-func TestBadHeaderHashes(t *testing.T) { testBadHashes(t, false) }
-func TestBadBlockHashes(t *testing.T)  { testBadHashes(t, true) }
+func TestBadHeaderHashes(t *testing.T) { testBadHashes(t, false, false) }
+func TestBadBlockHashes(t *testing.T) {
+	testBadHashes(t, true, true)
+	testBadHashes(t, true, false)
+
+}
 
-func testBadHashes(t *testing.T, full bool) {
+func testBadHashes(t *testing.T, full, pipeline bool) {
 	// Create a pristine chain and database
-	db, blockchain, err := newCanonical(ethash.NewFaker(), 0, full)
+	db, blockchain, err := newCanonical(ethash.NewFaker(), 0, full, pipeline)
 	if err != nil {
 		t.Fatalf("failed to create pristine chain: %v", err)
 	}
@@ -478,12 +579,16 @@ func testBadHashes(t *testing.T, full bool) {
 
 // Tests that bad hashes are detected on boot, and the chain rolled back to a
 // good state prior to the bad hash.
-func TestReorgBadHeaderHashes(t *testing.T) { testReorgBadHashes(t, false) }
-func TestReorgBadBlockHashes(t *testing.T)  { testReorgBadHashes(t, true) }
+func TestReorgBadHeaderHashes(t *testing.T) { testReorgBadHashes(t, false, false) }
+func TestReorgBadBlockHashes(t *testing.T) {
+	testReorgBadHashes(t, true, false)
+	testReorgBadHashes(t, true, true)
 
-func testReorgBadHashes(t *testing.T, full bool) {
+}
+
+func testReorgBadHashes(t *testing.T, full, pipeline bool) {
 	// Create a pristine chain and database
-	db, blockchain, err := newCanonical(ethash.NewFaker(), 0, full)
+	db, blockchain, err := newCanonical(ethash.NewFaker(), 0, full, pipeline)
 	if err != nil {
 		t.Fatalf("failed to create pristine chain: %v", err)
 	}
@@ -533,13 +638,16 @@ func testReorgBadHashes(t *testing.T, full bool) {
 }
 
 // Tests chain insertions in the face of one entity containing an invalid nonce.
-func TestHeadersInsertNonceError(t *testing.T) { testInsertNonceError(t, false) }
-func TestBlocksInsertNonceError(t *testing.T)  { testInsertNonceError(t, true) }
+func TestHeadersInsertNonceError(t *testing.T) { testInsertNonceError(t, false, false) }
+func TestBlocksInsertNonceError(t *testing.T) {
+	testInsertNonceError(t, true, false)
+	testInsertNonceError(t, true, true)
+}
 
-func testInsertNonceError(t *testing.T, full bool) {
+func testInsertNonceError(t *testing.T, full, pipeline bool) {
 	for i := 1; i < 25 && !t.Failed(); i++ {
 		// Create a pristine chain and database
-		db, blockchain, err := newCanonical(ethash.NewFaker(), 0, full)
+		db, blockchain, err := newCanonical(ethash.NewFaker(), 0, full, pipeline)
 		if err != nil {
 			t.Fatalf("failed to create pristine chain: %v", err)
 		}
@@ -1212,7 +1320,7 @@ done:
 
 // Tests if the canonical block can be fetched from the database during chain insertion.
 func TestCanonicalBlockRetrieval(t *testing.T) {
-	_, blockchain, err := newCanonical(ethash.NewFaker(), 0, true)
+	_, blockchain, err := newCanonical(ethash.NewFaker(), 0, true, false)
 	if err != nil {
 		t.Fatalf("failed to create pristine chain: %v", err)
 	}

+ 3 - 3
core/chain_makers.go

@@ -223,7 +223,7 @@ func GenerateChain(config *params.ChainConfig, parent *types.Block, engine conse
 			block, _, _ := b.engine.FinalizeAndAssemble(chainreader, b.header, statedb, b.txs, b.uncles, b.receipts)
 
 			// Write state changes to db
-			root, _, err := statedb.Commit(config.IsEIP158(b.header.Number))
+			root, _, err := statedb.Commit(nil)
 			if err != nil {
 				panic(fmt.Sprintf("state write error: %v", err))
 			}
@@ -254,9 +254,9 @@ func makeHeader(chain consensus.ChainReader, parent *types.Block, state *state.S
 	} else {
 		time = parent.Time() + 10 // block time is fixed at 10 seconds
 	}
-
+	root := state.IntermediateRoot(chain.Config().IsEIP158(parent.Number()))
 	return &types.Header{
-		Root:       state.IntermediateRoot(chain.Config().IsEIP158(parent.Number())),
+		Root:       root,
 		ParentHash: parent.Hash(),
 		Coinbase:   parent.Coinbase(),
 		Difficulty: engine.CalcDifficulty(chain, time, &types.Header{

+ 3 - 0
core/error.go

@@ -34,6 +34,9 @@ var (
 
 	// ErrDiffLayerNotFound is returned when diff layer not found.
 	ErrDiffLayerNotFound = errors.New("diff layer not found")
+
+	// ErrKnownBadBlock is return when the block is a known bad block
+	ErrKnownBadBlock = errors.New("already known bad block")
 )
 
 // List of evm-call-message pre-checking errors. All state transition messages will

+ 1 - 1
core/genesis.go

@@ -298,7 +298,7 @@ func (g *Genesis) ToBlock(db ethdb.Database) *types.Block {
 	if g.Difficulty == nil {
 		head.Difficulty = params.GenesisDifficulty
 	}
-	statedb.Commit(false)
+	statedb.Commit(nil)
 	statedb.Database().TrieDB().Commit(root, true, nil)
 
 	return types.NewBlock(head, nil, nil, nil, trie.NewStackTrie(nil))

+ 3 - 0
core/state/database.go

@@ -257,6 +257,9 @@ func (db *cachingDB) Purge() {
 
 // CopyTrie returns an independent copy of the given trie.
 func (db *cachingDB) CopyTrie(t Trie) Trie {
+	if t == nil {
+		return nil
+	}
 	switch t := t.(type) {
 	case *trie.SecureTrie:
 		return t.Copy()

+ 33 - 3
core/state/snapshot/difflayer.go

@@ -118,6 +118,9 @@ type diffLayer struct {
 	storageList map[common.Hash][]common.Hash          // List of storage slots for iterated retrievals, one per account. Any existing lists are sorted if non-nil
 	storageData map[common.Hash]map[common.Hash][]byte // Keyed storage slots for direct retrieval. one per account (nil means deleted)
 
+	verifiedCh chan struct{} // the difflayer is verified when verifiedCh is nil or closed
+	valid      bool          // mark the difflayer is valid or not.
+
 	diffed *bloomfilter.Filter // Bloom filter tracking all the diffed items up to the disk layer
 
 	lock sync.RWMutex
@@ -168,7 +171,7 @@ func (h storageBloomHasher) Sum64() uint64 {
 
 // newDiffLayer creates a new diff on top of an existing snapshot, whether that's a low
 // level persistent database or a hierarchical diff already.
-func newDiffLayer(parent snapshot, root common.Hash, destructs map[common.Hash]struct{}, accounts map[common.Hash][]byte, storage map[common.Hash]map[common.Hash][]byte) *diffLayer {
+func newDiffLayer(parent snapshot, root common.Hash, destructs map[common.Hash]struct{}, accounts map[common.Hash][]byte, storage map[common.Hash]map[common.Hash][]byte, verified chan struct{}) *diffLayer {
 	// Create the new layer with some pre-allocated data segments
 	dl := &diffLayer{
 		parent:      parent,
@@ -177,6 +180,7 @@ func newDiffLayer(parent snapshot, root common.Hash, destructs map[common.Hash]s
 		accountData: accounts,
 		storageData: storage,
 		storageList: make(map[common.Hash][]common.Hash),
+		verifiedCh:  verified,
 	}
 	switch parent := parent.(type) {
 	case *diskLayer:
@@ -256,6 +260,32 @@ func (dl *diffLayer) Root() common.Hash {
 	return dl.root
 }
 
+// WaitAndGetVerifyRes will wait until the diff layer been verified and return the verification result
+func (dl *diffLayer) WaitAndGetVerifyRes() bool {
+	if dl.verifiedCh == nil {
+		return true
+	}
+	<-dl.verifiedCh
+	return dl.valid
+}
+
+func (dl *diffLayer) MarkValid() {
+	dl.valid = true
+}
+
+// Represent whether the difflayer is been verified, does not means it is a valid or invalid difflayer
+func (dl *diffLayer) Verified() bool {
+	if dl.verifiedCh == nil {
+		return true
+	}
+	select {
+	case <-dl.verifiedCh:
+		return true
+	default:
+		return false
+	}
+}
+
 // Parent returns the subsequent layer of a diff layer.
 func (dl *diffLayer) Parent() snapshot {
 	return dl.parent
@@ -423,8 +453,8 @@ func (dl *diffLayer) storage(accountHash, storageHash common.Hash, depth int) ([
 
 // Update creates a new layer on top of the existing snapshot diff tree with
 // the specified data items.
-func (dl *diffLayer) Update(blockRoot common.Hash, destructs map[common.Hash]struct{}, accounts map[common.Hash][]byte, storage map[common.Hash]map[common.Hash][]byte) *diffLayer {
-	return newDiffLayer(dl, blockRoot, destructs, accounts, storage)
+func (dl *diffLayer) Update(blockRoot common.Hash, destructs map[common.Hash]struct{}, accounts map[common.Hash][]byte, storage map[common.Hash]map[common.Hash][]byte, verified chan struct{}) *diffLayer {
+	return newDiffLayer(dl, blockRoot, destructs, accounts, storage, verified)
 }
 
 // flatten pushes all data from this point downwards, flattening everything into

+ 18 - 18
core/state/snapshot/difflayer_test.go

@@ -79,11 +79,11 @@ func TestMergeBasics(t *testing.T) {
 		}
 	}
 	// Add some (identical) layers on top
-	parent := newDiffLayer(emptyLayer(), common.Hash{}, copyDestructs(destructs), copyAccounts(accounts), copyStorage(storage))
-	child := newDiffLayer(parent, common.Hash{}, copyDestructs(destructs), copyAccounts(accounts), copyStorage(storage))
-	child = newDiffLayer(child, common.Hash{}, copyDestructs(destructs), copyAccounts(accounts), copyStorage(storage))
-	child = newDiffLayer(child, common.Hash{}, copyDestructs(destructs), copyAccounts(accounts), copyStorage(storage))
-	child = newDiffLayer(child, common.Hash{}, copyDestructs(destructs), copyAccounts(accounts), copyStorage(storage))
+	parent := newDiffLayer(emptyLayer(), common.Hash{}, copyDestructs(destructs), copyAccounts(accounts), copyStorage(storage), nil)
+	child := newDiffLayer(parent, common.Hash{}, copyDestructs(destructs), copyAccounts(accounts), copyStorage(storage), nil)
+	child = newDiffLayer(child, common.Hash{}, copyDestructs(destructs), copyAccounts(accounts), copyStorage(storage), nil)
+	child = newDiffLayer(child, common.Hash{}, copyDestructs(destructs), copyAccounts(accounts), copyStorage(storage), nil)
+	child = newDiffLayer(child, common.Hash{}, copyDestructs(destructs), copyAccounts(accounts), copyStorage(storage), nil)
 	// And flatten
 	merged := (child.flatten()).(*diffLayer)
 
@@ -151,13 +151,13 @@ func TestMergeDelete(t *testing.T) {
 		}
 	}
 	// Add some flipAccs-flopping layers on top
-	parent := newDiffLayer(emptyLayer(), common.Hash{}, flipDrops(), flipAccs(), storage)
-	child := parent.Update(common.Hash{}, flopDrops(), flopAccs(), storage)
-	child = child.Update(common.Hash{}, flipDrops(), flipAccs(), storage)
-	child = child.Update(common.Hash{}, flopDrops(), flopAccs(), storage)
-	child = child.Update(common.Hash{}, flipDrops(), flipAccs(), storage)
-	child = child.Update(common.Hash{}, flopDrops(), flopAccs(), storage)
-	child = child.Update(common.Hash{}, flipDrops(), flipAccs(), storage)
+	parent := newDiffLayer(emptyLayer(), common.Hash{}, flipDrops(), flipAccs(), storage, nil)
+	child := parent.Update(common.Hash{}, flopDrops(), flopAccs(), storage, nil)
+	child = child.Update(common.Hash{}, flipDrops(), flipAccs(), storage, nil)
+	child = child.Update(common.Hash{}, flopDrops(), flopAccs(), storage, nil)
+	child = child.Update(common.Hash{}, flipDrops(), flipAccs(), storage, nil)
+	child = child.Update(common.Hash{}, flopDrops(), flopAccs(), storage, nil)
+	child = child.Update(common.Hash{}, flipDrops(), flipAccs(), storage, nil)
 
 	if data, _ := child.Account(h1); data == nil {
 		t.Errorf("last diff layer: expected %x account to be non-nil", h1)
@@ -209,7 +209,7 @@ func TestInsertAndMerge(t *testing.T) {
 			accounts  = make(map[common.Hash][]byte)
 			storage   = make(map[common.Hash]map[common.Hash][]byte)
 		)
-		parent = newDiffLayer(emptyLayer(), common.Hash{}, destructs, accounts, storage)
+		parent = newDiffLayer(emptyLayer(), common.Hash{}, destructs, accounts, storage, nil)
 	}
 	{
 		var (
@@ -220,7 +220,7 @@ func TestInsertAndMerge(t *testing.T) {
 		accounts[acc] = randomAccount()
 		storage[acc] = make(map[common.Hash][]byte)
 		storage[acc][slot] = []byte{0x01}
-		child = newDiffLayer(parent, common.Hash{}, destructs, accounts, storage)
+		child = newDiffLayer(parent, common.Hash{}, destructs, accounts, storage, nil)
 	}
 	// And flatten
 	merged := (child.flatten()).(*diffLayer)
@@ -256,7 +256,7 @@ func BenchmarkSearch(b *testing.B) {
 		for i := 0; i < 10000; i++ {
 			accounts[randomHash()] = randomAccount()
 		}
-		return newDiffLayer(parent, common.Hash{}, destructs, accounts, storage)
+		return newDiffLayer(parent, common.Hash{}, destructs, accounts, storage, nil)
 	}
 	var layer snapshot
 	layer = emptyLayer()
@@ -298,7 +298,7 @@ func BenchmarkSearchSlot(b *testing.B) {
 			accStorage[randomHash()] = value
 			storage[accountKey] = accStorage
 		}
-		return newDiffLayer(parent, common.Hash{}, destructs, accounts, storage)
+		return newDiffLayer(parent, common.Hash{}, destructs, accounts, storage, nil)
 	}
 	var layer snapshot
 	layer = emptyLayer()
@@ -336,7 +336,7 @@ func BenchmarkFlatten(b *testing.B) {
 			}
 			storage[accountKey] = accStorage
 		}
-		return newDiffLayer(parent, common.Hash{}, destructs, accounts, storage)
+		return newDiffLayer(parent, common.Hash{}, destructs, accounts, storage, nil)
 	}
 	b.ResetTimer()
 	for i := 0; i < b.N; i++ {
@@ -386,7 +386,7 @@ func BenchmarkJournal(b *testing.B) {
 			}
 			storage[accountKey] = accStorage
 		}
-		return newDiffLayer(parent, common.Hash{}, destructs, accounts, storage)
+		return newDiffLayer(parent, common.Hash{}, destructs, accounts, storage, nil)
 	}
 	layer := snapshot(new(diskLayer))
 	for i := 1; i < 128; i++ {

+ 12 - 2
core/state/snapshot/disklayer.go

@@ -49,6 +49,16 @@ func (dl *diskLayer) Root() common.Hash {
 	return dl.root
 }
 
+func (dl *diskLayer) WaitAndGetVerifyRes() bool {
+	return true
+}
+
+func (dl *diskLayer) MarkValid() {}
+
+func (dl *diskLayer) Verified() bool {
+	return true
+}
+
 // Parent always returns nil as there's no layer below the disk.
 func (dl *diskLayer) Parent() snapshot {
 	return nil
@@ -161,6 +171,6 @@ func (dl *diskLayer) Storage(accountHash, storageHash common.Hash) ([]byte, erro
 // Update creates a new layer on top of the existing snapshot diff tree with
 // the specified data items. Note, the maps are retained by the method to avoid
 // copying everything.
-func (dl *diskLayer) Update(blockHash common.Hash, destructs map[common.Hash]struct{}, accounts map[common.Hash][]byte, storage map[common.Hash]map[common.Hash][]byte) *diffLayer {
-	return newDiffLayer(dl, blockHash, destructs, accounts, storage)
+func (dl *diskLayer) Update(blockHash common.Hash, destructs map[common.Hash]struct{}, accounts map[common.Hash][]byte, storage map[common.Hash]map[common.Hash][]byte, verified chan struct{}) *diffLayer {
+	return newDiffLayer(dl, blockHash, destructs, accounts, storage, verified)
 }

+ 4 - 4
core/state/snapshot/disklayer_test.go

@@ -134,7 +134,7 @@ func TestDiskMerge(t *testing.T) {
 		conModCache:   {conModCacheSlot: reverse(conModCacheSlot[:])},
 		conDelNoCache: {conDelNoCacheSlot: nil},
 		conDelCache:   {conDelCacheSlot: nil},
-	}); err != nil {
+	}, nil); err != nil {
 		t.Fatalf("failed to update snapshot tree: %v", err)
 	}
 	if err := snaps.Cap(diffRoot, 0); err != nil {
@@ -357,7 +357,7 @@ func TestDiskPartialMerge(t *testing.T) {
 			conModCache:   {conModCacheSlot: reverse(conModCacheSlot[:])},
 			conDelNoCache: {conDelNoCacheSlot: nil},
 			conDelCache:   {conDelCacheSlot: nil},
-		}); err != nil {
+		}, nil); err != nil {
 			t.Fatalf("test %d: failed to update snapshot tree: %v", i, err)
 		}
 		if err := snaps.Cap(diffRoot, 0); err != nil {
@@ -468,7 +468,7 @@ func TestDiskGeneratorPersistence(t *testing.T) {
 	// Modify or delete some accounts, flatten everything onto disk
 	if err := snaps.update(diffRoot, baseRoot, nil, map[common.Hash][]byte{
 		accTwo: accTwo[:],
-	}, nil); err != nil {
+	}, nil, nil); err != nil {
 		t.Fatalf("failed to update snapshot tree: %v", err)
 	}
 	if err := snaps.Cap(diffRoot, 0); err != nil {
@@ -488,7 +488,7 @@ func TestDiskGeneratorPersistence(t *testing.T) {
 		accThree: accThree.Bytes(),
 	}, map[common.Hash]map[common.Hash][]byte{
 		accThree: {accThreeSlot: accThreeSlot.Bytes()},
-	}); err != nil {
+	}, nil); err != nil {
 		t.Fatalf("failed to update snapshot tree: %v", err)
 	}
 	diskLayer := snaps.layers[snaps.diskRoot()].(*diskLayer)

+ 45 - 45
core/state/snapshot/iterator_test.go

@@ -53,7 +53,7 @@ func TestAccountIteratorBasics(t *testing.T) {
 		}
 	}
 	// Add some (identical) layers on top
-	diffLayer := newDiffLayer(emptyLayer(), common.Hash{}, copyDestructs(destructs), copyAccounts(accounts), copyStorage(storage))
+	diffLayer := newDiffLayer(emptyLayer(), common.Hash{}, copyDestructs(destructs), copyAccounts(accounts), copyStorage(storage), nil)
 	it := diffLayer.AccountIterator(common.Hash{})
 	verifyIterator(t, 100, it, verifyNothing) // Nil is allowed for single layer iterator
 
@@ -91,7 +91,7 @@ func TestStorageIteratorBasics(t *testing.T) {
 		nilStorage[h] = nilstorage
 	}
 	// Add some (identical) layers on top
-	diffLayer := newDiffLayer(emptyLayer(), common.Hash{}, nil, copyAccounts(accounts), copyStorage(storage))
+	diffLayer := newDiffLayer(emptyLayer(), common.Hash{}, nil, copyAccounts(accounts), copyStorage(storage), nil)
 	for account := range accounts {
 		it, _ := diffLayer.StorageIterator(account, common.Hash{})
 		verifyIterator(t, 100, it, verifyNothing) // Nil is allowed for single layer iterator
@@ -222,13 +222,13 @@ func TestAccountIteratorTraversal(t *testing.T) {
 	}
 	// Stack three diff layers on top with various overlaps
 	snaps.update(common.HexToHash("0x02"), common.HexToHash("0x01"), nil,
-		randomAccountSet("0xaa", "0xee", "0xff", "0xf0"), nil)
+		randomAccountSet("0xaa", "0xee", "0xff", "0xf0"), nil, nil)
 
 	snaps.update(common.HexToHash("0x03"), common.HexToHash("0x02"), nil,
-		randomAccountSet("0xbb", "0xdd", "0xf0"), nil)
+		randomAccountSet("0xbb", "0xdd", "0xf0"), nil, nil)
 
 	snaps.update(common.HexToHash("0x04"), common.HexToHash("0x03"), nil,
-		randomAccountSet("0xcc", "0xf0", "0xff"), nil)
+		randomAccountSet("0xcc", "0xf0", "0xff"), nil, nil)
 
 	// Verify the single and multi-layer iterators
 	head := snaps.Snapshot(common.HexToHash("0x04"))
@@ -269,13 +269,13 @@ func TestStorageIteratorTraversal(t *testing.T) {
 	}
 	// Stack three diff layers on top with various overlaps
 	snaps.update(common.HexToHash("0x02"), common.HexToHash("0x01"), nil,
-		randomAccountSet("0xaa"), randomStorageSet([]string{"0xaa"}, [][]string{{"0x01", "0x02", "0x03"}}, nil))
+		randomAccountSet("0xaa"), randomStorageSet([]string{"0xaa"}, [][]string{{"0x01", "0x02", "0x03"}}, nil), nil)
 
 	snaps.update(common.HexToHash("0x03"), common.HexToHash("0x02"), nil,
-		randomAccountSet("0xaa"), randomStorageSet([]string{"0xaa"}, [][]string{{"0x04", "0x05", "0x06"}}, nil))
+		randomAccountSet("0xaa"), randomStorageSet([]string{"0xaa"}, [][]string{{"0x04", "0x05", "0x06"}}, nil), nil)
 
 	snaps.update(common.HexToHash("0x04"), common.HexToHash("0x03"), nil,
-		randomAccountSet("0xaa"), randomStorageSet([]string{"0xaa"}, [][]string{{"0x01", "0x02", "0x03"}}, nil))
+		randomAccountSet("0xaa"), randomStorageSet([]string{"0xaa"}, [][]string{{"0x01", "0x02", "0x03"}}, nil), nil)
 
 	// Verify the single and multi-layer iterators
 	head := snaps.Snapshot(common.HexToHash("0x04"))
@@ -353,14 +353,14 @@ func TestAccountIteratorTraversalValues(t *testing.T) {
 		}
 	}
 	// Assemble a stack of snapshots from the account layers
-	snaps.update(common.HexToHash("0x02"), common.HexToHash("0x01"), nil, a, nil)
-	snaps.update(common.HexToHash("0x03"), common.HexToHash("0x02"), nil, b, nil)
-	snaps.update(common.HexToHash("0x04"), common.HexToHash("0x03"), nil, c, nil)
-	snaps.update(common.HexToHash("0x05"), common.HexToHash("0x04"), nil, d, nil)
-	snaps.update(common.HexToHash("0x06"), common.HexToHash("0x05"), nil, e, nil)
-	snaps.update(common.HexToHash("0x07"), common.HexToHash("0x06"), nil, f, nil)
-	snaps.update(common.HexToHash("0x08"), common.HexToHash("0x07"), nil, g, nil)
-	snaps.update(common.HexToHash("0x09"), common.HexToHash("0x08"), nil, h, nil)
+	snaps.update(common.HexToHash("0x02"), common.HexToHash("0x01"), nil, a, nil, nil)
+	snaps.update(common.HexToHash("0x03"), common.HexToHash("0x02"), nil, b, nil, nil)
+	snaps.update(common.HexToHash("0x04"), common.HexToHash("0x03"), nil, c, nil, nil)
+	snaps.update(common.HexToHash("0x05"), common.HexToHash("0x04"), nil, d, nil, nil)
+	snaps.update(common.HexToHash("0x06"), common.HexToHash("0x05"), nil, e, nil, nil)
+	snaps.update(common.HexToHash("0x07"), common.HexToHash("0x06"), nil, f, nil, nil)
+	snaps.update(common.HexToHash("0x08"), common.HexToHash("0x07"), nil, g, nil, nil)
+	snaps.update(common.HexToHash("0x09"), common.HexToHash("0x08"), nil, h, nil, nil)
 
 	it, _ := snaps.AccountIterator(common.HexToHash("0x09"), common.Hash{})
 	head := snaps.Snapshot(common.HexToHash("0x09"))
@@ -452,14 +452,14 @@ func TestStorageIteratorTraversalValues(t *testing.T) {
 		}
 	}
 	// Assemble a stack of snapshots from the account layers
-	snaps.update(common.HexToHash("0x02"), common.HexToHash("0x01"), nil, randomAccountSet("0xaa"), wrapStorage(a))
-	snaps.update(common.HexToHash("0x03"), common.HexToHash("0x02"), nil, randomAccountSet("0xaa"), wrapStorage(b))
-	snaps.update(common.HexToHash("0x04"), common.HexToHash("0x03"), nil, randomAccountSet("0xaa"), wrapStorage(c))
-	snaps.update(common.HexToHash("0x05"), common.HexToHash("0x04"), nil, randomAccountSet("0xaa"), wrapStorage(d))
-	snaps.update(common.HexToHash("0x06"), common.HexToHash("0x05"), nil, randomAccountSet("0xaa"), wrapStorage(e))
-	snaps.update(common.HexToHash("0x07"), common.HexToHash("0x06"), nil, randomAccountSet("0xaa"), wrapStorage(e))
-	snaps.update(common.HexToHash("0x08"), common.HexToHash("0x07"), nil, randomAccountSet("0xaa"), wrapStorage(g))
-	snaps.update(common.HexToHash("0x09"), common.HexToHash("0x08"), nil, randomAccountSet("0xaa"), wrapStorage(h))
+	snaps.update(common.HexToHash("0x02"), common.HexToHash("0x01"), nil, randomAccountSet("0xaa"), wrapStorage(a), nil)
+	snaps.update(common.HexToHash("0x03"), common.HexToHash("0x02"), nil, randomAccountSet("0xaa"), wrapStorage(b), nil)
+	snaps.update(common.HexToHash("0x04"), common.HexToHash("0x03"), nil, randomAccountSet("0xaa"), wrapStorage(c), nil)
+	snaps.update(common.HexToHash("0x05"), common.HexToHash("0x04"), nil, randomAccountSet("0xaa"), wrapStorage(d), nil)
+	snaps.update(common.HexToHash("0x06"), common.HexToHash("0x05"), nil, randomAccountSet("0xaa"), wrapStorage(e), nil)
+	snaps.update(common.HexToHash("0x07"), common.HexToHash("0x06"), nil, randomAccountSet("0xaa"), wrapStorage(e), nil)
+	snaps.update(common.HexToHash("0x08"), common.HexToHash("0x07"), nil, randomAccountSet("0xaa"), wrapStorage(g), nil)
+	snaps.update(common.HexToHash("0x09"), common.HexToHash("0x08"), nil, randomAccountSet("0xaa"), wrapStorage(h), nil)
 
 	it, _ := snaps.StorageIterator(common.HexToHash("0x09"), common.HexToHash("0xaa"), common.Hash{})
 	head := snaps.Snapshot(common.HexToHash("0x09"))
@@ -522,7 +522,7 @@ func TestAccountIteratorLargeTraversal(t *testing.T) {
 		},
 	}
 	for i := 1; i < 128; i++ {
-		snaps.update(common.HexToHash(fmt.Sprintf("0x%02x", i+1)), common.HexToHash(fmt.Sprintf("0x%02x", i)), nil, makeAccounts(200), nil)
+		snaps.update(common.HexToHash(fmt.Sprintf("0x%02x", i+1)), common.HexToHash(fmt.Sprintf("0x%02x", i)), nil, makeAccounts(200), nil, nil)
 	}
 	// Iterate the entire stack and ensure everything is hit only once
 	head := snaps.Snapshot(common.HexToHash("0x80"))
@@ -567,13 +567,13 @@ func TestAccountIteratorFlattening(t *testing.T) {
 	}
 	// Create a stack of diffs on top
 	snaps.update(common.HexToHash("0x02"), common.HexToHash("0x01"), nil,
-		randomAccountSet("0xaa", "0xee", "0xff", "0xf0"), nil)
+		randomAccountSet("0xaa", "0xee", "0xff", "0xf0"), nil, nil)
 
 	snaps.update(common.HexToHash("0x03"), common.HexToHash("0x02"), nil,
-		randomAccountSet("0xbb", "0xdd", "0xf0"), nil)
+		randomAccountSet("0xbb", "0xdd", "0xf0"), nil, nil)
 
 	snaps.update(common.HexToHash("0x04"), common.HexToHash("0x03"), nil,
-		randomAccountSet("0xcc", "0xf0", "0xff"), nil)
+		randomAccountSet("0xcc", "0xf0", "0xff"), nil, nil)
 
 	// Create an iterator and flatten the data from underneath it
 	it, _ := snaps.AccountIterator(common.HexToHash("0x04"), common.Hash{})
@@ -598,13 +598,13 @@ func TestAccountIteratorSeek(t *testing.T) {
 		},
 	}
 	snaps.update(common.HexToHash("0x02"), common.HexToHash("0x01"), nil,
-		randomAccountSet("0xaa", "0xee", "0xff", "0xf0"), nil)
+		randomAccountSet("0xaa", "0xee", "0xff", "0xf0"), nil, nil)
 
 	snaps.update(common.HexToHash("0x03"), common.HexToHash("0x02"), nil,
-		randomAccountSet("0xbb", "0xdd", "0xf0"), nil)
+		randomAccountSet("0xbb", "0xdd", "0xf0"), nil, nil)
 
 	snaps.update(common.HexToHash("0x04"), common.HexToHash("0x03"), nil,
-		randomAccountSet("0xcc", "0xf0", "0xff"), nil)
+		randomAccountSet("0xcc", "0xf0", "0xff"), nil, nil)
 
 	// Account set is now
 	// 02: aa, ee, f0, ff
@@ -662,13 +662,13 @@ func TestStorageIteratorSeek(t *testing.T) {
 	}
 	// Stack three diff layers on top with various overlaps
 	snaps.update(common.HexToHash("0x02"), common.HexToHash("0x01"), nil,
-		randomAccountSet("0xaa"), randomStorageSet([]string{"0xaa"}, [][]string{{"0x01", "0x03", "0x05"}}, nil))
+		randomAccountSet("0xaa"), randomStorageSet([]string{"0xaa"}, [][]string{{"0x01", "0x03", "0x05"}}, nil), nil)
 
 	snaps.update(common.HexToHash("0x03"), common.HexToHash("0x02"), nil,
-		randomAccountSet("0xaa"), randomStorageSet([]string{"0xaa"}, [][]string{{"0x02", "0x05", "0x06"}}, nil))
+		randomAccountSet("0xaa"), randomStorageSet([]string{"0xaa"}, [][]string{{"0x02", "0x05", "0x06"}}, nil), nil)
 
 	snaps.update(common.HexToHash("0x04"), common.HexToHash("0x03"), nil,
-		randomAccountSet("0xaa"), randomStorageSet([]string{"0xaa"}, [][]string{{"0x01", "0x05", "0x08"}}, nil))
+		randomAccountSet("0xaa"), randomStorageSet([]string{"0xaa"}, [][]string{{"0x01", "0x05", "0x08"}}, nil), nil)
 
 	// Account set is now
 	// 02: 01, 03, 05
@@ -725,17 +725,17 @@ func TestAccountIteratorDeletions(t *testing.T) {
 	}
 	// Stack three diff layers on top with various overlaps
 	snaps.update(common.HexToHash("0x02"), common.HexToHash("0x01"),
-		nil, randomAccountSet("0x11", "0x22", "0x33"), nil)
+		nil, randomAccountSet("0x11", "0x22", "0x33"), nil, nil)
 
 	deleted := common.HexToHash("0x22")
 	destructed := map[common.Hash]struct{}{
 		deleted: {},
 	}
 	snaps.update(common.HexToHash("0x03"), common.HexToHash("0x02"),
-		destructed, randomAccountSet("0x11", "0x33"), nil)
+		destructed, randomAccountSet("0x11", "0x33"), nil, nil)
 
 	snaps.update(common.HexToHash("0x04"), common.HexToHash("0x03"),
-		nil, randomAccountSet("0x33", "0x44", "0x55"), nil)
+		nil, randomAccountSet("0x33", "0x44", "0x55"), nil, nil)
 
 	// The output should be 11,33,44,55
 	it, _ := snaps.AccountIterator(common.HexToHash("0x04"), common.Hash{})
@@ -771,10 +771,10 @@ func TestStorageIteratorDeletions(t *testing.T) {
 	}
 	// Stack three diff layers on top with various overlaps
 	snaps.update(common.HexToHash("0x02"), common.HexToHash("0x01"), nil,
-		randomAccountSet("0xaa"), randomStorageSet([]string{"0xaa"}, [][]string{{"0x01", "0x03", "0x05"}}, nil))
+		randomAccountSet("0xaa"), randomStorageSet([]string{"0xaa"}, [][]string{{"0x01", "0x03", "0x05"}}, nil), nil)
 
 	snaps.update(common.HexToHash("0x03"), common.HexToHash("0x02"), nil,
-		randomAccountSet("0xaa"), randomStorageSet([]string{"0xaa"}, [][]string{{"0x02", "0x04", "0x06"}}, [][]string{{"0x01", "0x03"}}))
+		randomAccountSet("0xaa"), randomStorageSet([]string{"0xaa"}, [][]string{{"0x02", "0x04", "0x06"}}, [][]string{{"0x01", "0x03"}}), nil)
 
 	// The output should be 02,04,05,06
 	it, _ := snaps.StorageIterator(common.HexToHash("0x03"), common.HexToHash("0xaa"), common.Hash{})
@@ -790,7 +790,7 @@ func TestStorageIteratorDeletions(t *testing.T) {
 	destructed := map[common.Hash]struct{}{
 		common.HexToHash("0xaa"): {},
 	}
-	snaps.update(common.HexToHash("0x04"), common.HexToHash("0x03"), destructed, nil, nil)
+	snaps.update(common.HexToHash("0x04"), common.HexToHash("0x03"), destructed, nil, nil, nil)
 
 	it, _ = snaps.StorageIterator(common.HexToHash("0x04"), common.HexToHash("0xaa"), common.Hash{})
 	verifyIterator(t, 0, it, verifyStorage)
@@ -798,7 +798,7 @@ func TestStorageIteratorDeletions(t *testing.T) {
 
 	// Re-insert the slots of the same account
 	snaps.update(common.HexToHash("0x05"), common.HexToHash("0x04"), nil,
-		randomAccountSet("0xaa"), randomStorageSet([]string{"0xaa"}, [][]string{{"0x07", "0x08", "0x09"}}, nil))
+		randomAccountSet("0xaa"), randomStorageSet([]string{"0xaa"}, [][]string{{"0x07", "0x08", "0x09"}}, nil), nil)
 
 	// The output should be 07,08,09
 	it, _ = snaps.StorageIterator(common.HexToHash("0x05"), common.HexToHash("0xaa"), common.Hash{})
@@ -806,7 +806,7 @@ func TestStorageIteratorDeletions(t *testing.T) {
 	it.Release()
 
 	// Destruct the whole storage but re-create the account in the same layer
-	snaps.update(common.HexToHash("0x06"), common.HexToHash("0x05"), destructed, randomAccountSet("0xaa"), randomStorageSet([]string{"0xaa"}, [][]string{{"0x11", "0x12"}}, nil))
+	snaps.update(common.HexToHash("0x06"), common.HexToHash("0x05"), destructed, randomAccountSet("0xaa"), randomStorageSet([]string{"0xaa"}, [][]string{{"0x11", "0x12"}}, nil), nil)
 	it, _ = snaps.StorageIterator(common.HexToHash("0x06"), common.HexToHash("0xaa"), common.Hash{})
 	verifyIterator(t, 2, it, verifyStorage) // The output should be 11,12
 	it.Release()
@@ -848,7 +848,7 @@ func BenchmarkAccountIteratorTraversal(b *testing.B) {
 		},
 	}
 	for i := 1; i <= 100; i++ {
-		snaps.update(common.HexToHash(fmt.Sprintf("0x%02x", i+1)), common.HexToHash(fmt.Sprintf("0x%02x", i)), nil, makeAccounts(200), nil)
+		snaps.update(common.HexToHash(fmt.Sprintf("0x%02x", i+1)), common.HexToHash(fmt.Sprintf("0x%02x", i)), nil, makeAccounts(200), nil, nil)
 	}
 	// We call this once before the benchmark, so the creation of
 	// sorted accountlists are not included in the results.
@@ -943,9 +943,9 @@ func BenchmarkAccountIteratorLargeBaselayer(b *testing.B) {
 			base.root: base,
 		},
 	}
-	snaps.update(common.HexToHash("0x02"), common.HexToHash("0x01"), nil, makeAccounts(2000), nil)
+	snaps.update(common.HexToHash("0x02"), common.HexToHash("0x01"), nil, makeAccounts(2000), nil, nil)
 	for i := 2; i <= 100; i++ {
-		snaps.update(common.HexToHash(fmt.Sprintf("0x%02x", i+1)), common.HexToHash(fmt.Sprintf("0x%02x", i)), nil, makeAccounts(20), nil)
+		snaps.update(common.HexToHash(fmt.Sprintf("0x%02x", i+1)), common.HexToHash(fmt.Sprintf("0x%02x", i)), nil, makeAccounts(20), nil, nil)
 	}
 	// We call this once before the benchmark, so the creation of
 	// sorted accountlists are not included in the results.

+ 1 - 1
core/state/snapshot/journal.go

@@ -243,7 +243,7 @@ func loadDiffLayer(parent snapshot, r *rlp.Stream) (snapshot, error) {
 		}
 		storageData[entry.Hash] = slots
 	}
-	return loadDiffLayer(newDiffLayer(parent, root, destructSet, accountData, storageData), r)
+	return loadDiffLayer(newDiffLayer(parent, root, destructSet, accountData, storageData, nil), r)
 }
 
 // Journal terminates any in-progress snapshot generation, also implicitly pushing

+ 14 - 5
core/state/snapshot/snapshot.go

@@ -101,6 +101,15 @@ type Snapshot interface {
 	// Root returns the root hash for which this snapshot was made.
 	Root() common.Hash
 
+	// WaitAndGetVerifyRes will wait until the snapshot been verified and return verification result
+	WaitAndGetVerifyRes() bool
+
+	// Verified returns whether the snapshot is verified
+	Verified() bool
+
+	// Store the verification result
+	MarkValid()
+
 	// Account directly retrieves the account associated with a particular hash in
 	// the snapshot slim data format.
 	Account(hash common.Hash) (*Account, error)
@@ -130,7 +139,7 @@ type snapshot interface {
 	// the specified data items.
 	//
 	// Note, the maps are retained by the method to avoid copying everything.
-	Update(blockRoot common.Hash, destructs map[common.Hash]struct{}, accounts map[common.Hash][]byte, storage map[common.Hash]map[common.Hash][]byte) *diffLayer
+	Update(blockRoot common.Hash, destructs map[common.Hash]struct{}, accounts map[common.Hash][]byte, storage map[common.Hash]map[common.Hash][]byte, verified chan struct{}) *diffLayer
 
 	// Journal commits an entire diff hierarchy to disk into a single journal entry.
 	// This is meant to be used during shutdown to persist the snapshot without
@@ -322,14 +331,14 @@ func (t *Tree) Snapshots(root common.Hash, limits int, nodisk bool) []Snapshot {
 	return ret
 }
 
-func (t *Tree) Update(blockRoot common.Hash, parentRoot common.Hash, destructs map[common.Address]struct{}, accounts map[common.Address][]byte, storage map[common.Address]map[string][]byte) error {
+func (t *Tree) Update(blockRoot common.Hash, parentRoot common.Hash, destructs map[common.Address]struct{}, accounts map[common.Address][]byte, storage map[common.Address]map[string][]byte, verified chan struct{}) error {
 	hashDestructs, hashAccounts, hashStorage := transformSnapData(destructs, accounts, storage)
-	return t.update(blockRoot, parentRoot, hashDestructs, hashAccounts, hashStorage)
+	return t.update(blockRoot, parentRoot, hashDestructs, hashAccounts, hashStorage, verified)
 }
 
 // Update adds a new snapshot into the tree, if that can be linked to an existing
 // old parent. It is disallowed to insert a disk layer (the origin of all).
-func (t *Tree) update(blockRoot common.Hash, parentRoot common.Hash, destructs map[common.Hash]struct{}, accounts map[common.Hash][]byte, storage map[common.Hash]map[common.Hash][]byte) error {
+func (t *Tree) update(blockRoot common.Hash, parentRoot common.Hash, destructs map[common.Hash]struct{}, accounts map[common.Hash][]byte, storage map[common.Hash]map[common.Hash][]byte, verified chan struct{}) error {
 	// Reject noop updates to avoid self-loops in the snapshot tree. This is a
 	// special case that can only happen for Clique networks where empty blocks
 	// don't modify the state (0 block subsidy).
@@ -344,7 +353,7 @@ func (t *Tree) update(blockRoot common.Hash, parentRoot common.Hash, destructs m
 	if parent == nil {
 		return fmt.Errorf("parent [%#x] snapshot missing", parentRoot)
 	}
-	snap := parent.(snapshot).Update(blockRoot, destructs, accounts, storage)
+	snap := parent.(snapshot).Update(blockRoot, destructs, accounts, storage, verified)
 
 	// Save the new snapshot for later
 	t.lock.Lock()

+ 12 - 12
core/state/snapshot/snapshot_test.go

@@ -105,7 +105,7 @@ func TestDiskLayerExternalInvalidationFullFlatten(t *testing.T) {
 	accounts := map[common.Hash][]byte{
 		common.HexToHash("0xa1"): randomAccount(),
 	}
-	if err := snaps.update(common.HexToHash("0x02"), common.HexToHash("0x01"), nil, accounts, nil); err != nil {
+	if err := snaps.update(common.HexToHash("0x02"), common.HexToHash("0x01"), nil, accounts, nil, nil); err != nil {
 		t.Fatalf("failed to create a diff layer: %v", err)
 	}
 	if n := len(snaps.layers); n != 2 {
@@ -149,10 +149,10 @@ func TestDiskLayerExternalInvalidationPartialFlatten(t *testing.T) {
 	accounts := map[common.Hash][]byte{
 		common.HexToHash("0xa1"): randomAccount(),
 	}
-	if err := snaps.update(common.HexToHash("0x02"), common.HexToHash("0x01"), nil, accounts, nil); err != nil {
+	if err := snaps.update(common.HexToHash("0x02"), common.HexToHash("0x01"), nil, accounts, nil, nil); err != nil {
 		t.Fatalf("failed to create a diff layer: %v", err)
 	}
-	if err := snaps.update(common.HexToHash("0x03"), common.HexToHash("0x02"), nil, accounts, nil); err != nil {
+	if err := snaps.update(common.HexToHash("0x03"), common.HexToHash("0x02"), nil, accounts, nil, nil); err != nil {
 		t.Fatalf("failed to create a diff layer: %v", err)
 	}
 	if n := len(snaps.layers); n != 3 {
@@ -197,13 +197,13 @@ func TestDiffLayerExternalInvalidationPartialFlatten(t *testing.T) {
 	accounts := map[common.Hash][]byte{
 		common.HexToHash("0xa1"): randomAccount(),
 	}
-	if err := snaps.update(common.HexToHash("0x02"), common.HexToHash("0x01"), nil, accounts, nil); err != nil {
+	if err := snaps.update(common.HexToHash("0x02"), common.HexToHash("0x01"), nil, accounts, nil, nil); err != nil {
 		t.Fatalf("failed to create a diff layer: %v", err)
 	}
-	if err := snaps.update(common.HexToHash("0x03"), common.HexToHash("0x02"), nil, accounts, nil); err != nil {
+	if err := snaps.update(common.HexToHash("0x03"), common.HexToHash("0x02"), nil, accounts, nil, nil); err != nil {
 		t.Fatalf("failed to create a diff layer: %v", err)
 	}
-	if err := snaps.update(common.HexToHash("0x04"), common.HexToHash("0x03"), nil, accounts, nil); err != nil {
+	if err := snaps.update(common.HexToHash("0x04"), common.HexToHash("0x03"), nil, accounts, nil, nil); err != nil {
 		t.Fatalf("failed to create a diff layer: %v", err)
 	}
 	if n := len(snaps.layers); n != 4 {
@@ -257,12 +257,12 @@ func TestPostCapBasicDataAccess(t *testing.T) {
 		},
 	}
 	// The lowest difflayer
-	snaps.update(common.HexToHash("0xa1"), common.HexToHash("0x01"), nil, setAccount("0xa1"), nil)
-	snaps.update(common.HexToHash("0xa2"), common.HexToHash("0xa1"), nil, setAccount("0xa2"), nil)
-	snaps.update(common.HexToHash("0xb2"), common.HexToHash("0xa1"), nil, setAccount("0xb2"), nil)
+	snaps.update(common.HexToHash("0xa1"), common.HexToHash("0x01"), nil, setAccount("0xa1"), nil, nil)
+	snaps.update(common.HexToHash("0xa2"), common.HexToHash("0xa1"), nil, setAccount("0xa2"), nil, nil)
+	snaps.update(common.HexToHash("0xb2"), common.HexToHash("0xa1"), nil, setAccount("0xb2"), nil, nil)
 
-	snaps.update(common.HexToHash("0xa3"), common.HexToHash("0xa2"), nil, setAccount("0xa3"), nil)
-	snaps.update(common.HexToHash("0xb3"), common.HexToHash("0xb2"), nil, setAccount("0xb3"), nil)
+	snaps.update(common.HexToHash("0xa3"), common.HexToHash("0xa2"), nil, setAccount("0xa3"), nil, nil)
+	snaps.update(common.HexToHash("0xb3"), common.HexToHash("0xb2"), nil, setAccount("0xb3"), nil, nil)
 
 	// checkExist verifies if an account exiss in a snapshot
 	checkExist := func(layer *diffLayer, key string) error {
@@ -357,7 +357,7 @@ func TestSnaphots(t *testing.T) {
 	)
 	for i := 0; i < 129; i++ {
 		head = makeRoot(uint64(i + 2))
-		snaps.update(head, last, nil, setAccount(fmt.Sprintf("%d", i+2)), nil)
+		snaps.update(head, last, nil, setAccount(fmt.Sprintf("%d", i+2)), nil, nil)
 		last = head
 		snaps.Cap(head, 128) // 130 layers (128 diffs + 1 accumulator + 1 disk)
 	}

+ 9 - 3
core/state/state_test.go

@@ -54,7 +54,9 @@ func TestDump(t *testing.T) {
 	// write some of them to the trie
 	s.state.updateStateObject(obj1)
 	s.state.updateStateObject(obj2)
-	s.state.Commit(false)
+	s.state.Finalise(false)
+	s.state.AccountsIntermediateRoot()
+	s.state.Commit(nil)
 
 	// check that DumpToCollector contains the state objects that are in trie
 	got := string(s.state.Dump(false, false, true))
@@ -95,7 +97,9 @@ func TestNull(t *testing.T) {
 	var value common.Hash
 
 	s.state.SetState(address, common.Hash{}, value)
-	s.state.Commit(false)
+	s.state.Finalise(false)
+	s.state.AccountsIntermediateRoot()
+	s.state.Commit(nil)
 
 	if value := s.state.GetState(address, common.Hash{}); value != (common.Hash{}) {
 		t.Errorf("expected empty current value, got %x", value)
@@ -167,7 +171,9 @@ func TestSnapshot2(t *testing.T) {
 	so0.deleted = false
 	state.SetStateObject(so0)
 
-	root, _, _ := state.Commit(false)
+	state.Finalise(false)
+	state.AccountsIntermediateRoot()
+	root, _, _ := state.Commit(nil)
 	state, _ = New(root, state.db, state.snaps)
 
 	// and one with deleted == true

+ 194 - 62
core/state/statedb.go

@@ -74,14 +74,20 @@ func (n *proofList) Delete(key []byte) error {
 // * Accounts
 type StateDB struct {
 	db             Database
+	prefetcherLock sync.Mutex
 	prefetcher     *triePrefetcher
 	originalRoot   common.Hash // The pre-state root, before any changes were made
+	expectedRoot   common.Hash // The state root in the block header
+	stateRoot      common.Hash // The calculation result of IntermediateRoot
+
 	trie           Trie
 	hasher         crypto.KeccakState
 	diffLayer      *types.DiffLayer
 	diffTries      map[common.Address]Trie
 	diffCode       map[common.Hash][]byte
 	lightProcessed bool
+	fullProcessed  bool
+	pipeCommit     bool
 
 	snapMux       sync.Mutex
 	snaps         *snapshot.Tree
@@ -154,11 +160,6 @@ func newStateDB(root common.Hash, db Database, snaps *snapshot.Tree) (*StateDB,
 		journal:             newJournal(),
 		hasher:              crypto.NewKeccakState(),
 	}
-	tr, err := db.OpenTrie(root)
-	if err != nil {
-		return nil, err
-	}
-	sdb.trie = tr
 	if sdb.snaps != nil {
 		if sdb.snap = sdb.snaps.Snapshot(root); sdb.snap != nil {
 			sdb.snapDestructs = make(map[common.Address]struct{})
@@ -166,6 +167,14 @@ func newStateDB(root common.Hash, db Database, snaps *snapshot.Tree) (*StateDB,
 			sdb.snapStorage = make(map[common.Address]map[string][]byte)
 		}
 	}
+
+	snapVerified := sdb.snap != nil && sdb.snap.Verified()
+	tr, err := db.OpenTrie(root)
+	// return error when 1. failed to open trie and 2. the snap is nil or the snap is not nil and done verification
+	if err != nil && (sdb.snap == nil || snapVerified) {
+		return nil, err
+	}
+	sdb.trie = tr
 	return sdb, nil
 }
 
@@ -173,6 +182,8 @@ func newStateDB(root common.Hash, db Database, snaps *snapshot.Tree) (*StateDB,
 // state trie concurrently while the state is mutated so that when we reach the
 // commit phase, most of the needed data is already hot.
 func (s *StateDB) StartPrefetcher(namespace string) {
+	s.prefetcherLock.Lock()
+	defer s.prefetcherLock.Unlock()
 	if s.prefetcher != nil {
 		s.prefetcher.close()
 		s.prefetcher = nil
@@ -185,17 +196,36 @@ func (s *StateDB) StartPrefetcher(namespace string) {
 // StopPrefetcher terminates a running prefetcher and reports any leftover stats
 // from the gathered metrics.
 func (s *StateDB) StopPrefetcher() {
+	s.prefetcherLock.Lock()
+	defer s.prefetcherLock.Unlock()
 	if s.prefetcher != nil {
 		s.prefetcher.close()
 		s.prefetcher = nil
 	}
 }
 
+// Mark that the block is processed by diff layer
+func (s *StateDB) SetExpectedStateRoot(root common.Hash) {
+	s.expectedRoot = root
+}
+
 // Mark that the block is processed by diff layer
 func (s *StateDB) MarkLightProcessed() {
 	s.lightProcessed = true
 }
 
+// Enable the pipeline commit function of statedb
+func (s *StateDB) EnablePipeCommit() {
+	if s.snap != nil {
+		s.pipeCommit = true
+	}
+}
+
+// Mark that the block is full processed
+func (s *StateDB) MarkFullProcessed() {
+	s.fullProcessed = true
+}
+
 func (s *StateDB) IsLightProcessed() bool {
 	return s.lightProcessed
 }
@@ -211,8 +241,20 @@ func (s *StateDB) Error() error {
 	return s.dbErr
 }
 
-func (s *StateDB) Trie() Trie {
-	return s.trie
+// Not thread safe
+func (s *StateDB) Trie() (Trie, error) {
+	if s.trie == nil {
+		err := s.WaitPipeVerification()
+		if err != nil {
+			return nil, err
+		}
+		tr, err := s.db.OpenTrie(s.originalRoot)
+		if err != nil {
+			return nil, err
+		}
+		s.trie = tr
+	}
+	return s.trie, nil
 }
 
 func (s *StateDB) SetDiff(diffLayer *types.DiffLayer, diffTries map[common.Address]Trie, diffCode map[common.Hash][]byte) {
@@ -360,6 +402,9 @@ func (s *StateDB) GetProof(addr common.Address) ([][]byte, error) {
 // GetProofByHash returns the Merkle proof for a given account.
 func (s *StateDB) GetProofByHash(addrHash common.Hash) ([][]byte, error) {
 	var proof proofList
+	if _, err := s.Trie(); err != nil {
+		return nil, err
+	}
 	err := s.trie.Prove(addrHash[:], 0, &proof)
 	return proof, err
 }
@@ -904,6 +949,17 @@ func (s *StateDB) GetRefund() uint64 {
 	return s.refund
 }
 
+// GetRefund returns the current value of the refund counter.
+func (s *StateDB) WaitPipeVerification() error {
+	// We need wait for the parent trie to commit
+	if s.snap != nil {
+		if valid := s.snap.WaitAndGetVerifyRes(); !valid {
+			return fmt.Errorf("verification on parent snap failed")
+		}
+	}
+	return nil
+}
+
 // Finalise finalises the state by removing the s destructed objects and clears
 // the journal as well as the refunds. Finalise, however, will not push any updates
 // into the tries just yet. Only IntermediateRoot or Commit will do that.
@@ -963,22 +1019,11 @@ func (s *StateDB) IntermediateRoot(deleteEmptyObjects bool) common.Hash {
 	}
 	// Finalise all the dirty storage states and write them into the tries
 	s.Finalise(deleteEmptyObjects)
+	s.AccountsIntermediateRoot()
+	return s.StateIntermediateRoot()
+}
 
-	// If there was a trie prefetcher operating, it gets aborted and irrevocably
-	// modified after we start retrieving tries. Remove it from the statedb after
-	// this round of use.
-	//
-	// This is weird pre-byzantium since the first tx runs with a prefetcher and
-	// the remainder without, but pre-byzantium even the initial prefetcher is
-	// useless, so no sleep lost.
-	prefetcher := s.prefetcher
-	if s.prefetcher != nil {
-		defer func() {
-			s.prefetcher.close()
-			s.prefetcher = nil
-		}()
-	}
-
+func (s *StateDB) AccountsIntermediateRoot() {
 	tasks := make(chan func())
 	finishCh := make(chan struct{})
 	defer close(finishCh)
@@ -995,6 +1040,7 @@ func (s *StateDB) IntermediateRoot(deleteEmptyObjects bool) common.Hash {
 			}
 		}()
 	}
+
 	// Although naively it makes sense to retrieve the account trie and then do
 	// the contract storage and account updates sequentially, that short circuits
 	// the account prefetcher. Instead, let's process all the storage updates
@@ -1026,6 +1072,27 @@ func (s *StateDB) IntermediateRoot(deleteEmptyObjects bool) common.Hash {
 		}
 	}
 	wg.Wait()
+}
+
+func (s *StateDB) StateIntermediateRoot() common.Hash {
+	// If there was a trie prefetcher operating, it gets aborted and irrevocably
+	// modified after we start retrieving tries. Remove it from the statedb after
+	// this round of use.
+	//
+	// This is weird pre-byzantium since the first tx runs with a prefetcher and
+	// the remainder without, but pre-byzantium even the initial prefetcher is
+	// useless, so no sleep lost.
+	prefetcher := s.prefetcher
+	defer func() {
+		s.prefetcherLock.Lock()
+		if s.prefetcher != nil {
+			s.prefetcher.close()
+			s.prefetcher = nil
+		}
+		// try not use defer inside defer
+		s.prefetcherLock.Unlock()
+	}()
+
 	// Now we're about to start to write changes to the trie. The trie is so far
 	// _untouched_. We can check with the prefetcher, if it can give us a trie
 	// which has the same root, but also has some content loaded into it.
@@ -1037,7 +1104,7 @@ func (s *StateDB) IntermediateRoot(deleteEmptyObjects bool) common.Hash {
 	if s.trie == nil {
 		tr, err := s.db.OpenTrie(s.originalRoot)
 		if err != nil {
-			panic("Failed to open trie tree")
+			panic(fmt.Sprintf("Failed to open trie tree %s", s.originalRoot))
 		}
 		s.trie = tr
 	}
@@ -1081,9 +1148,12 @@ func (s *StateDB) clearJournalAndRefund() {
 	s.validRevisions = s.validRevisions[:0] // Snapshots can be created without journal entires
 }
 
-func (s *StateDB) LightCommit(root common.Hash) (common.Hash, *types.DiffLayer, error) {
+func (s *StateDB) LightCommit() (common.Hash, *types.DiffLayer, error) {
 	codeWriter := s.db.TrieDB().DiskDB().NewBatch()
 
+	// light process already verified it, expectedRoot is trustworthy.
+	root := s.expectedRoot
+
 	commitFuncs := []func() error{
 		func() error {
 			for codeHash, code := range s.diffCode {
@@ -1171,7 +1241,8 @@ func (s *StateDB) LightCommit(root common.Hash) (common.Hash, *types.DiffLayer,
 				}
 				// Only update if there's a state transition (skip empty Clique blocks)
 				if parent := s.snap.Root(); parent != root {
-					if err := s.snaps.Update(root, parent, s.snapDestructs, s.snapAccounts, s.snapStorage); err != nil {
+					// for light commit, always do sync commit
+					if err := s.snaps.Update(root, parent, s.snapDestructs, s.snapAccounts, s.snapStorage, nil); err != nil {
 						log.Warn("Failed to update snapshot tree", "from", parent, "to", root, "err", err)
 					}
 					// Keep n diff layers in the memory
@@ -1205,23 +1276,42 @@ func (s *StateDB) LightCommit(root common.Hash) (common.Hash, *types.DiffLayer,
 }
 
 // Commit writes the state to the underlying in-memory trie database.
-func (s *StateDB) Commit(deleteEmptyObjects bool) (common.Hash, *types.DiffLayer, error) {
+func (s *StateDB) Commit(failPostCommitFunc func(), postCommitFuncs ...func() error) (common.Hash, *types.DiffLayer, error) {
 	if s.dbErr != nil {
 		return common.Hash{}, nil, fmt.Errorf("commit aborted due to earlier error: %v", s.dbErr)
 	}
 	// Finalize any pending changes and merge everything into the tries
-	root := s.IntermediateRoot(deleteEmptyObjects)
 	if s.lightProcessed {
-		return s.LightCommit(root)
+		root, diff, err := s.LightCommit()
+		if err != nil {
+			return root, diff, err
+		}
+		for _, postFunc := range postCommitFuncs {
+			err = postFunc()
+			if err != nil {
+				return root, diff, err
+			}
+		}
+		return root, diff, nil
 	}
 	var diffLayer *types.DiffLayer
+	var verified chan struct{}
+	var snapUpdated chan struct{}
 	if s.snap != nil {
 		diffLayer = &types.DiffLayer{}
 	}
-	commitFuncs := []func() error{
-		func() error {
-			// Commit objects to the trie, measuring the elapsed time
-			tasks := make(chan func(batch ethdb.KeyValueWriter))
+	if s.pipeCommit {
+		// async commit the MPT
+		verified = make(chan struct{})
+		snapUpdated = make(chan struct{})
+	}
+
+	commmitTrie := func() error {
+		commitErr := func() error {
+			if s.stateRoot = s.StateIntermediateRoot(); s.fullProcessed && s.expectedRoot != s.stateRoot {
+				return fmt.Errorf("invalid merkle root (remote: %x local: %x)", s.expectedRoot, s.stateRoot)
+			}
+			tasks := make(chan func())
 			taskResults := make(chan error, len(s.stateObjectsDirty))
 			tasksNum := 0
 			finishCh := make(chan struct{})
@@ -1232,17 +1322,11 @@ func (s *StateDB) Commit(deleteEmptyObjects bool) (common.Hash, *types.DiffLayer
 				wg.Add(1)
 				go func() {
 					defer wg.Done()
-					codeWriter := s.db.TrieDB().DiskDB().NewBatch()
 					for {
 						select {
 						case task := <-tasks:
-							task(codeWriter)
+							task()
 						case <-finishCh:
-							if codeWriter.ValueSize() > 0 {
-								if err := codeWriter.Write(); err != nil {
-									log.Crit("Failed to commit dirty codes", "error", err)
-								}
-							}
 							return
 						}
 					}
@@ -1265,11 +1349,7 @@ func (s *StateDB) Commit(deleteEmptyObjects bool) (common.Hash, *types.DiffLayer
 			for addr := range s.stateObjectsDirty {
 				if obj := s.stateObjects[addr]; !obj.deleted {
 					// Write any contract code associated with the state object
-					tasks <- func(codeWriter ethdb.KeyValueWriter) {
-						if obj.code != nil && obj.dirtyCode {
-							rawdb.WriteCode(codeWriter, common.BytesToHash(obj.CodeHash()), obj.code)
-							obj.dirtyCode = false
-						}
+					tasks <- func() {
 						// Write any storage changes in the state object to its storage trie
 						if err := obj.CommitTrie(s.db); err != nil {
 							taskResults <- err
@@ -1289,14 +1369,6 @@ func (s *StateDB) Commit(deleteEmptyObjects bool) (common.Hash, *types.DiffLayer
 			}
 			close(finishCh)
 
-			if len(s.stateObjectsDirty) > 0 {
-				s.stateObjectsDirty = make(map[common.Address]struct{}, len(s.stateObjectsDirty)/2)
-			}
-			// Write the account trie changes, measuing the amount of wasted time
-			var start time.Time
-			if metrics.EnabledExpensive {
-				start = time.Now()
-			}
 			// The onleaf func is called _serially_, so we can reuse the same account
 			// for unmarshalling every time.
 			var account Account
@@ -1312,14 +1384,60 @@ func (s *StateDB) Commit(deleteEmptyObjects bool) (common.Hash, *types.DiffLayer
 			if err != nil {
 				return err
 			}
-			if metrics.EnabledExpensive {
-				s.AccountCommits += time.Since(start)
-			}
 			if root != emptyRoot {
 				s.db.CacheAccount(root, s.trie)
 			}
+			for _, postFunc := range postCommitFuncs {
+				err = postFunc()
+				if err != nil {
+					return err
+				}
+			}
 			wg.Wait()
 			return nil
+		}()
+
+		if s.pipeCommit {
+			if commitErr == nil {
+				<-snapUpdated
+				s.snaps.Snapshot(s.stateRoot).MarkValid()
+			} else {
+				// The blockchain will do the further rewind if write block not finish yet
+				if failPostCommitFunc != nil {
+					<-snapUpdated
+					failPostCommitFunc()
+				}
+				log.Error("state verification failed", "err", commitErr)
+			}
+			close(verified)
+		}
+		return commitErr
+	}
+
+	commitFuncs := []func() error{
+		func() error {
+			codeWriter := s.db.TrieDB().DiskDB().NewBatch()
+			for addr := range s.stateObjectsDirty {
+				if obj := s.stateObjects[addr]; !obj.deleted {
+					if obj.code != nil && obj.dirtyCode {
+						rawdb.WriteCode(codeWriter, common.BytesToHash(obj.CodeHash()), obj.code)
+						obj.dirtyCode = false
+						if codeWriter.ValueSize() > ethdb.IdealBatchSize {
+							if err := codeWriter.Write(); err != nil {
+								return err
+							}
+							codeWriter.Reset()
+						}
+					}
+				}
+			}
+			if codeWriter.ValueSize() > 0 {
+				if err := codeWriter.Write(); err != nil {
+					log.Crit("Failed to commit dirty codes", "error", err)
+					return err
+				}
+			}
+			return nil
 		},
 		func() error {
 			// If snapshotting is enabled, update the snapshot tree with this new version
@@ -1327,18 +1445,23 @@ func (s *StateDB) Commit(deleteEmptyObjects bool) (common.Hash, *types.DiffLayer
 				if metrics.EnabledExpensive {
 					defer func(start time.Time) { s.SnapshotCommits += time.Since(start) }(time.Now())
 				}
+				if s.pipeCommit {
+					defer close(snapUpdated)
+				}
 				// Only update if there's a state transition (skip empty Clique blocks)
-				if parent := s.snap.Root(); parent != root {
-					if err := s.snaps.Update(root, parent, s.snapDestructs, s.snapAccounts, s.snapStorage); err != nil {
-						log.Warn("Failed to update snapshot tree", "from", parent, "to", root, "err", err)
+				if parent := s.snap.Root(); parent != s.expectedRoot {
+					if err := s.snaps.Update(s.expectedRoot, parent, s.snapDestructs, s.snapAccounts, s.snapStorage, verified); err != nil {
+						log.Warn("Failed to update snapshot tree", "from", parent, "to", s.expectedRoot, "err", err)
 					}
 					// Keep n diff layers in the memory
 					// - head layer is paired with HEAD state
 					// - head-1 layer is paired with HEAD-1 state
 					// - head-(n-1) layer(bottom-most diff layer) is paired with HEAD-(n-1)state
-					if err := s.snaps.Cap(root, s.snaps.CapLimit()); err != nil {
-						log.Warn("Failed to cap snapshot tree", "root", root, "layers", s.snaps.CapLimit(), "err", err)
-					}
+					go func() {
+						if err := s.snaps.Cap(s.expectedRoot, s.snaps.CapLimit()); err != nil {
+							log.Warn("Failed to cap snapshot tree", "root", s.expectedRoot, "layers", s.snaps.CapLimit(), "err", err)
+						}
+					}()
 				}
 			}
 			return nil
@@ -1350,6 +1473,11 @@ func (s *StateDB) Commit(deleteEmptyObjects bool) (common.Hash, *types.DiffLayer
 			return nil
 		},
 	}
+	if s.pipeCommit {
+		go commmitTrie()
+	} else {
+		commitFuncs = append(commitFuncs, commmitTrie)
+	}
 	commitRes := make(chan error, len(commitFuncs))
 	for _, f := range commitFuncs {
 		tmpFunc := f
@@ -1363,7 +1491,11 @@ func (s *StateDB) Commit(deleteEmptyObjects bool) (common.Hash, *types.DiffLayer
 			return common.Hash{}, nil, r
 		}
 	}
-	s.snap, s.snapDestructs, s.snapAccounts, s.snapStorage = nil, nil, nil, nil
+	root := s.stateRoot
+	if s.pipeCommit {
+		root = s.expectedRoot
+	}
+
 	return root, diffLayer, nil
 }
 

+ 26 - 9
core/state/statedb_test.go

@@ -102,7 +102,9 @@ func TestIntermediateLeaks(t *testing.T) {
 	}
 
 	// Commit and cross check the databases.
-	transRoot, _, err := transState.Commit(false)
+	transState.Finalise(false)
+	transState.AccountsIntermediateRoot()
+	transRoot, _, err := transState.Commit(nil)
 	if err != nil {
 		t.Fatalf("failed to commit transition state: %v", err)
 	}
@@ -110,7 +112,9 @@ func TestIntermediateLeaks(t *testing.T) {
 		t.Errorf("can not commit trie %v to persistent database", transRoot.Hex())
 	}
 
-	finalRoot, _, err := finalState.Commit(false)
+	finalState.Finalise(false)
+	finalState.AccountsIntermediateRoot()
+	finalRoot, _, err := finalState.Commit(nil)
 	if err != nil {
 		t.Fatalf("failed to commit final state: %v", err)
 	}
@@ -473,7 +477,7 @@ func (test *snapshotTest) checkEqual(state, checkstate *StateDB) error {
 func TestTouchDelete(t *testing.T) {
 	s := newStateTest()
 	s.state.GetOrNewStateObject(common.Address{})
-	root, _, _ := s.state.Commit(false)
+	root, _, _ := s.state.Commit(nil)
 	s.state, _ = New(root, s.state.db, s.state.snaps)
 
 	snapshot := s.state.Snapshot()
@@ -546,7 +550,9 @@ func TestCopyCommitCopy(t *testing.T) {
 		t.Fatalf("first copy pre-commit committed storage slot mismatch: have %x, want %x", val, common.Hash{})
 	}
 
-	copyOne.Commit(false)
+	copyOne.Finalise(false)
+	copyOne.AccountsIntermediateRoot()
+	copyOne.Commit(nil)
 	if balance := copyOne.GetBalance(addr); balance.Cmp(big.NewInt(42)) != 0 {
 		t.Fatalf("first copy post-commit balance mismatch: have %v, want %v", balance, 42)
 	}
@@ -631,7 +637,10 @@ func TestCopyCopyCommitCopy(t *testing.T) {
 	if val := copyTwo.GetCommittedState(addr, skey); val != (common.Hash{}) {
 		t.Fatalf("second copy pre-commit committed storage slot mismatch: have %x, want %x", val, common.Hash{})
 	}
-	copyTwo.Commit(false)
+
+	copyTwo.Finalise(false)
+	copyTwo.AccountsIntermediateRoot()
+	copyTwo.Commit(nil)
 	if balance := copyTwo.GetBalance(addr); balance.Cmp(big.NewInt(42)) != 0 {
 		t.Fatalf("second copy post-commit balance mismatch: have %v, want %v", balance, 42)
 	}
@@ -675,7 +684,9 @@ func TestDeleteCreateRevert(t *testing.T) {
 	addr := common.BytesToAddress([]byte("so"))
 	state.SetBalance(addr, big.NewInt(1))
 
-	root, _, _ := state.Commit(false)
+	state.Finalise(false)
+	state.AccountsIntermediateRoot()
+	root, _, _ := state.Commit(nil)
 	state, _ = New(root, state.db, state.snaps)
 
 	// Simulate self-destructing in one transaction, then create-reverting in another
@@ -686,8 +697,10 @@ func TestDeleteCreateRevert(t *testing.T) {
 	state.SetBalance(addr, big.NewInt(2))
 	state.RevertToSnapshot(id)
 
+	state.Finalise(true)
+	state.AccountsIntermediateRoot()
 	// Commit the entire state and make sure we don't crash and have the correct state
-	root, _, _ = state.Commit(true)
+	root, _, _ = state.Commit(nil)
 	state, _ = New(root, state.db, state.snaps)
 
 	if state.getStateObject(addr) != nil {
@@ -712,7 +725,9 @@ func TestMissingTrieNodes(t *testing.T) {
 		a2 := common.BytesToAddress([]byte("another"))
 		state.SetBalance(a2, big.NewInt(100))
 		state.SetCode(a2, []byte{1, 2, 4})
-		root, _, _ = state.Commit(false)
+		state.Finalise(false)
+		state.AccountsIntermediateRoot()
+		root, _, _ = state.Commit(nil)
 		t.Logf("root: %x", root)
 		// force-flush
 		state.Database().TrieDB().Cap(0)
@@ -736,7 +751,9 @@ func TestMissingTrieNodes(t *testing.T) {
 	}
 	// Modify the state
 	state.SetBalance(addr, big.NewInt(2))
-	root, _, err := state.Commit(false)
+	state.Finalise(false)
+	state.AccountsIntermediateRoot()
+	root, _, err := state.Commit(nil)
 	if err == nil {
 		t.Fatalf("expected error, got root :%x", root)
 	}

+ 3 - 1
core/state/sync_test.go

@@ -69,7 +69,9 @@ func makeTestState() (Database, common.Hash, []*testAccount) {
 		state.updateStateObject(obj)
 		accounts = append(accounts, acc)
 	}
-	root, _, _ := state.Commit(false)
+	state.Finalise(false)
+	state.AccountsIntermediateRoot()
+	root, _, _ := state.Commit(nil)
 
 	// Return the generated state
 	return db, root, accounts

+ 15 - 7
core/state/trie_prefetcher.go

@@ -20,7 +20,6 @@ import (
 	"sync"
 
 	"github.com/ethereum/go-ethereum/common"
-	"github.com/ethereum/go-ethereum/common/gopool"
 	"github.com/ethereum/go-ethereum/log"
 	"github.com/ethereum/go-ethereum/metrics"
 )
@@ -106,7 +105,7 @@ func (p *triePrefetcher) close() {
 	for _, fetcher := range p.fetchers {
 		p.abortChan <- fetcher // safe to do multiple times
 		<-fetcher.term
-		if metrics.Enabled {
+		if metrics.EnabledExpensive {
 			if fetcher.root == p.root {
 				p.accountLoadMeter.Mark(int64(len(fetcher.seen)))
 				p.accountDupMeter.Mark(int64(fetcher.dups))
@@ -257,9 +256,7 @@ func newSubfetcher(db Database, root common.Hash, accountHash common.Hash) *subf
 		seen:        make(map[string]struct{}),
 		accountHash: accountHash,
 	}
-	gopool.Submit(func() {
-		sf.loop()
-	})
+	go sf.loop()
 	return sf
 }
 
@@ -322,8 +319,7 @@ func (sf *subfetcher) loop() {
 		trie, err = sf.db.OpenStorageTrie(sf.accountHash, sf.root)
 	}
 	if err != nil {
-		log.Warn("Trie prefetcher failed opening trie", "root", sf.root, "err", err)
-		return
+		log.Debug("Trie prefetcher failed opening trie", "root", sf.root, "err", err)
 	}
 	sf.trie = trie
 
@@ -332,6 +328,18 @@ func (sf *subfetcher) loop() {
 		select {
 		case <-sf.wake:
 			// Subfetcher was woken up, retrieve any tasks to avoid spinning the lock
+			if sf.trie == nil {
+				if sf.accountHash == emptyAddr {
+					sf.trie, err = sf.db.OpenTrie(sf.root)
+				} else {
+					// address is useless
+					sf.trie, err = sf.db.OpenStorageTrie(sf.accountHash, sf.root)
+				}
+				if err != nil {
+					continue
+				}
+			}
+
 			sf.lock.Lock()
 			tasks := sf.tasks
 			sf.tasks = nil

+ 17 - 22
core/state_prefetcher.go

@@ -17,7 +17,6 @@
 package core
 
 import (
-	"runtime"
 	"sync/atomic"
 
 	"github.com/ethereum/go-ethereum/consensus"
@@ -27,6 +26,8 @@ import (
 	"github.com/ethereum/go-ethereum/params"
 )
 
+const prefetchThread = 2
+
 // statePrefetcher is a basic Prefetcher, which blindly executes a block on top
 // of an arbitrary state with the goal of prefetching potentially useful state
 // data from disk before the main block processor start executing.
@@ -54,25 +55,23 @@ func (p *statePrefetcher) Prefetch(block *types.Block, statedb *state.StateDB, c
 		signer = types.MakeSigner(p.config, header.Number)
 	)
 	transactions := block.Transactions()
-	threads := runtime.NumCPU()
-	batch := len(transactions) / (threads + 1)
-	if batch == 0 {
-		return
+	sortTransactions := make([][]*types.Transaction, prefetchThread)
+	for i := 0; i < prefetchThread; i++ {
+		sortTransactions[i] = make([]*types.Transaction, 0, len(transactions)/prefetchThread)
+	}
+	for idx := range transactions {
+		threadIdx := idx % prefetchThread
+		sortTransactions[threadIdx] = append(sortTransactions[threadIdx], transactions[idx])
 	}
 	// No need to execute the first batch, since the main processor will do it.
-	for i := 1; i <= threads; i++ {
-		start := i * batch
-		end := (i + 1) * batch
-		if i == threads {
-			end = len(transactions)
-		}
-		go func(start, end int) {
+	for i := 0; i < prefetchThread; i++ {
+		go func(idx int) {
 			newStatedb := statedb.Copy()
 			gaspool := new(GasPool).AddGas(block.GasLimit())
 			blockContext := NewEVMBlockContext(header, p.bc, nil)
 			evm := vm.NewEVM(blockContext, vm.TxContext{}, statedb, p.config, cfg)
 			// Iterate over and process the individual transactions
-			for i, tx := range transactions[start:end] {
+			for i, tx := range sortTransactions[idx] {
 				// If block precaching was interrupted, abort
 				if interrupt != nil && atomic.LoadUint32(interrupt) == 1 {
 					return
@@ -82,23 +81,19 @@ func (p *statePrefetcher) Prefetch(block *types.Block, statedb *state.StateDB, c
 				if err != nil {
 					return // Also invalid block, bail out
 				}
-				newStatedb.Prepare(tx.Hash(), block.Hash(), i)
-				if err := precacheTransaction(msg, p.config, gaspool, newStatedb, header, evm); err != nil {
-					return // Ugh, something went horribly wrong, bail out
-				}
+				newStatedb.Prepare(tx.Hash(), header.Hash(), i)
+				precacheTransaction(msg, p.config, gaspool, newStatedb, header, evm)
 			}
-		}(start, end)
+		}(i)
 	}
-
 }
 
 // precacheTransaction attempts to apply a transaction to the given state database
 // and uses the input parameters for its environment. The goal is not to execute
 // the transaction successfully, rather to warm up touched data slots.
-func precacheTransaction(msg types.Message, config *params.ChainConfig, gaspool *GasPool, statedb *state.StateDB, header *types.Header, evm *vm.EVM) error {
+func precacheTransaction(msg types.Message, config *params.ChainConfig, gaspool *GasPool, statedb *state.StateDB, header *types.Header, evm *vm.EVM) {
 	// Update the evm with the new transaction context.
 	evm.Reset(NewEVMTxContext(msg), statedb)
 	// Add addresses to access list if applicable
-	_, err := ApplyMessage(evm, msg, gaspool)
-	return err
+	ApplyMessage(evm, msg, gaspool)
 }

+ 14 - 5
core/state_processor.go

@@ -123,6 +123,10 @@ func (p *LightStateProcessor) Process(block *types.Block, statedb *state.StateDB
 			statedb.StopPrefetcher()
 			parent := p.bc.GetHeader(block.ParentHash(), block.NumberU64()-1)
 			statedb, err = state.New(parent.Root, p.bc.stateCache, p.bc.snaps)
+			statedb.SetExpectedStateRoot(block.Root())
+			if p.bc.pipeCommit {
+				statedb.EnablePipeCommit()
+			}
 			if err != nil {
 				return statedb, nil, nil, 0, err
 			}
@@ -148,9 +152,12 @@ func (p *LightStateProcessor) LightProcess(diffLayer *types.DiffLayer, block *ty
 	for _, c := range diffLayer.Codes {
 		fullDiffCode[c.Hash] = c.Code
 	}
-
+	stateTrie, err := statedb.Trie()
+	if err != nil {
+		return nil, nil, 0, err
+	}
 	for des := range snapDestructs {
-		statedb.Trie().TryDelete(des[:])
+		stateTrie.TryDelete(des[:])
 	}
 	threads := gopool.Threads(len(snapAccounts))
 
@@ -191,7 +198,7 @@ func (p *LightStateProcessor) LightProcess(diffLayer *types.DiffLayer, block *ty
 				// fetch previous state
 				var previousAccount state.Account
 				stateMux.Lock()
-				enc, err := statedb.Trie().TryGet(diffAccount[:])
+				enc, err := stateTrie.TryGet(diffAccount[:])
 				stateMux.Unlock()
 				if err != nil {
 					errChan <- err
@@ -303,7 +310,7 @@ func (p *LightStateProcessor) LightProcess(diffLayer *types.DiffLayer, block *ty
 					return
 				}
 				stateMux.Lock()
-				err = statedb.Trie().TryUpdate(diffAccount[:], bz)
+				err = stateTrie.TryUpdate(diffAccount[:], bz)
 				stateMux.Unlock()
 				if err != nil {
 					errChan <- err
@@ -330,7 +337,7 @@ func (p *LightStateProcessor) LightProcess(diffLayer *types.DiffLayer, block *ty
 	}
 
 	// Do validate in advance so that we can fall back to full process
-	if err := p.bc.validator.ValidateState(block, statedb, diffLayer.Receipts, gasUsed); err != nil {
+	if err := p.bc.validator.ValidateState(block, statedb, diffLayer.Receipts, gasUsed, false); err != nil {
 		log.Error("validate state failed during diff sync", "error", err)
 		return nil, nil, 0, err
 	}
@@ -378,6 +385,7 @@ func (p *StateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg
 		gp      = new(GasPool).AddGas(block.GasLimit())
 	)
 	signer := types.MakeSigner(p.bc.chainConfig, block.Number())
+	statedb.TryPreload(block, signer)
 	var receipts = make([]*types.Receipt, 0)
 	// Mutate the block and state according to any hard-fork specs
 	if p.config.DAOForkSupport && p.config.DAOForkBlock != nil && p.config.DAOForkBlock.Cmp(block.Number()) == 0 {
@@ -396,6 +404,7 @@ func (p *StateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg
 
 	// initilise bloom processors
 	bloomProcessors := NewAsyncReceiptBloomGenerator(txNum)
+	statedb.MarkFullProcessed()
 
 	// usually do have two tx, one for validator set contract, another for system reward contract.
 	systemTxs := make([]*types.Transaction, 0, 2)

+ 1 - 1
core/types.go

@@ -31,7 +31,7 @@ type Validator interface {
 
 	// ValidateState validates the given statedb and optionally the receipts and
 	// gas used.
-	ValidateState(block *types.Block, state *state.StateDB, receipts types.Receipts, usedGas uint64) error
+	ValidateState(block *types.Block, state *state.StateDB, receipts types.Receipts, usedGas uint64, skipHeavyVerify bool) error
 }
 
 // Prefetcher is an interface for pre-caching transaction signatures and state.

+ 4 - 2
eth/api_test.go

@@ -77,7 +77,9 @@ func TestAccountRange(t *testing.T) {
 			m[addr] = true
 		}
 	}
-	state.Commit(true)
+	state.Finalise(true)
+	state.AccountsIntermediateRoot()
+	state.Commit(nil)
 	root := state.IntermediateRoot(true)
 
 	trie, err := statedb.OpenTrie(root)
@@ -134,7 +136,7 @@ func TestEmptyAccountRange(t *testing.T) {
 		statedb  = state.NewDatabase(rawdb.NewMemoryDatabase())
 		state, _ = state.New(common.Hash{}, statedb, nil)
 	)
-	state.Commit(true)
+	state.Commit(nil)
 	state.IntermediateRoot(true)
 	results := state.IteratorDump(true, true, true, (common.Hash{}).Bytes(), AccountRangeMaxResults)
 	if bytes.Equal(results.Next, (common.Hash{}).Bytes()) {

+ 3 - 0
eth/backend.go

@@ -203,6 +203,9 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
 	if config.DiffSync {
 		bcOps = append(bcOps, core.EnableLightProcessor)
 	}
+	if config.PipeCommit {
+		bcOps = append(bcOps, core.EnablePipelineCommit)
+	}
 	if config.PersistDiff {
 		bcOps = append(bcOps, core.EnablePersistDiff(config.DiffBlock))
 	}

+ 1 - 0
eth/ethconfig/config.go

@@ -137,6 +137,7 @@ type Config struct {
 	DirectBroadcast     bool
 	DisableSnapProtocol bool //Whether disable snap protocol
 	DiffSync            bool // Whether support diff sync
+	PipeCommit          bool
 	RangeLimit          bool
 
 	TxLookupLimit uint64 `toml:",omitempty"` // The maximum number of blocks from head whose tx indices are reserved.

+ 3 - 1
eth/state_accessor.go

@@ -138,7 +138,9 @@ func (eth *Ethereum) stateAtBlock(block *types.Block, reexec uint64, base *state
 			return nil, fmt.Errorf("processing block %d failed: %v", current.NumberU64(), err)
 		}
 		// Finalize the state so any modifications are written to the trie
-		root, _, err := statedb.Commit(eth.blockchain.Config().IsEIP158(current.Number()))
+		statedb.Finalise(eth.blockchain.Config().IsEIP158(current.Number()))
+		statedb.AccountsIntermediateRoot()
+		root, _, err := statedb.Commit(nil)
 		if err != nil {
 			return nil, fmt.Errorf("stateAtBlock commit failed, number %d root %v: %w",
 				current.NumberU64(), current.Root().Hex(), err)

+ 3 - 1
eth/tracers/api.go

@@ -556,7 +556,9 @@ func (api *API) IntermediateRoots(ctx context.Context, hash common.Hash, config
 		}
 		// calling IntermediateRoot will internally call Finalize on the state
 		// so any modifications are written to the trie
-		roots = append(roots, statedb.IntermediateRoot(deleteEmptyObjects))
+		root := statedb.IntermediateRoot(deleteEmptyObjects)
+
+		roots = append(roots, root)
 	}
 	return roots, nil
 }

+ 1 - 0
ethclient/ethclient_test.go

@@ -271,6 +271,7 @@ func newTestBackend(t *testing.T) (*node.Node, []*types.Block) {
 	config := &ethconfig.Config{Genesis: genesis}
 	config.Ethash.PowMode = ethash.ModeFake
 	config.SnapshotCache = 256
+	config.TriesInMemory = 128
 	ethservice, err := eth.New(n, config)
 	if err != nil {
 		t.Fatalf("can't create new ethereum service: %v", err)

+ 5 - 0
miner/worker.go

@@ -634,6 +634,7 @@ func (w *worker) resultLoop() {
 				logs = append(logs, receipt.Logs...)
 			}
 			// Commit block and state to database.
+			task.state.SetExpectedStateRoot(block.Root())
 			_, err := w.chain.WriteBlockWithState(block, receipts, logs, task.state, true)
 			if err != nil {
 				log.Error("Failed writing block to chain", "err", err)
@@ -994,6 +995,10 @@ func (w *worker) commitNewWork(interrupt *int32, noempty bool, timestamp int64)
 // and commits new work if consensus engine is running.
 func (w *worker) commit(uncles []*types.Header, interval func(), update bool, start time.Time) error {
 	s := w.current.state
+	err := s.WaitPipeVerification()
+	if err != nil {
+		return err
+	}
 	block, receipts, err := w.engine.FinalizeAndAssemble(w.chain, types.CopyHeader(w.current.header), s, w.current.txs, uncles, w.current.receipts)
 	if err != nil {
 		return err

+ 0 - 1
params/protocol_params.go

@@ -114,7 +114,6 @@ const (
 
 	// Precompiled contract gas prices
 
-	//TODO need further discussion
 	TendermintHeaderValidateGas uint64 = 3000 // Gas for validate tendermiint consensus state
 	IAVLMerkleProofValidateGas  uint64 = 3000 // Gas for validate merkle proof
 

+ 6 - 2
tests/state_test_util.go

@@ -198,7 +198,9 @@ func (t *StateTest) RunNoVerify(subtest StateSubtest, vmconfig vm.Config, snapsh
 	}
 
 	// Commit block
-	statedb.Commit(config.IsEIP158(block.Number()))
+	statedb.Finalise(config.IsEIP158(block.Number()))
+	statedb.AccountsIntermediateRoot()
+	statedb.Commit(nil)
 	// Add 0-value mining reward. This only makes a difference in the cases
 	// where
 	// - the coinbase suicided, or
@@ -226,7 +228,9 @@ func MakePreState(db ethdb.Database, accounts core.GenesisAlloc, snapshotter boo
 		}
 	}
 	// Commit and re-open to start with a clean state.
-	root, _, _ := statedb.Commit(false)
+	statedb.Finalise(false)
+	statedb.AccountsIntermediateRoot()
+	root, _, _ := statedb.Commit(nil)
 
 	var snaps *snapshot.Tree
 	if snapshotter {

+ 37 - 21
trie/database.go

@@ -605,14 +605,16 @@ func (db *Database) Cap(limit common.StorageSize) error {
 	// outside code doesn't see an inconsistent state (referenced data removed from
 	// memory cache during commit but not yet in persistent storage). This is ensured
 	// by only uncaching existing data when the database write finalizes.
+	db.lock.RLock()
 	nodes, storage, start := len(db.dirties), db.dirtiesSize, time.Now()
-	batch := db.diskdb.NewBatch()
-
 	// db.dirtiesSize only contains the useful data in the cache, but when reporting
 	// the total memory consumption, the maintenance metadata is also needed to be
 	// counted.
 	size := db.dirtiesSize + common.StorageSize((len(db.dirties)-1)*cachedNodeSize)
 	size += db.childrenSize - common.StorageSize(len(db.dirties[common.Hash{}].children)*(common.HashLength+2))
+	db.lock.RUnlock()
+
+	batch := db.diskdb.NewBatch()
 
 	// If the preimage cache got large enough, push to disk. If it's still small
 	// leave for later to deduplicate writes.
@@ -632,27 +634,35 @@ func (db *Database) Cap(limit common.StorageSize) error {
 	}
 	// Keep committing nodes from the flush-list until we're below allowance
 	oldest := db.oldest
-	for size > limit && oldest != (common.Hash{}) {
-		// Fetch the oldest referenced node and push into the batch
-		node := db.dirties[oldest]
-		rawdb.WriteTrieNode(batch, oldest, node.rlp())
-
-		// If we exceeded the ideal batch size, commit and reset
-		if batch.ValueSize() >= ethdb.IdealBatchSize {
-			if err := batch.Write(); err != nil {
-				log.Error("Failed to write flush list to disk", "err", err)
-				return err
+	err := func() error {
+		db.lock.RLock()
+		defer db.lock.RUnlock()
+		for size > limit && oldest != (common.Hash{}) {
+			// Fetch the oldest referenced node and push into the batch
+			node := db.dirties[oldest]
+			rawdb.WriteTrieNode(batch, oldest, node.rlp())
+
+			// If we exceeded the ideal batch size, commit and reset
+			if batch.ValueSize() >= ethdb.IdealBatchSize {
+				if err := batch.Write(); err != nil {
+					log.Error("Failed to write flush list to disk", "err", err)
+					return err
+				}
+				batch.Reset()
 			}
-			batch.Reset()
-		}
-		// Iterate to the next flush item, or abort if the size cap was achieved. Size
-		// is the total size, including the useful cached data (hash -> blob), the
-		// cache item metadata, as well as external children mappings.
-		size -= common.StorageSize(common.HashLength + int(node.size) + cachedNodeSize)
-		if node.children != nil {
-			size -= common.StorageSize(cachedNodeChildrenSize + len(node.children)*(common.HashLength+2))
+			// Iterate to the next flush item, or abort if the size cap was achieved. Size
+			// is the total size, including the useful cached data (hash -> blob), the
+			// cache item metadata, as well as external children mappings.
+			size -= common.StorageSize(common.HashLength + int(node.size) + cachedNodeSize)
+			if node.children != nil {
+				size -= common.StorageSize(cachedNodeChildrenSize + len(node.children)*(common.HashLength+2))
+			}
+			oldest = node.flushNext
 		}
-		oldest = node.flushNext
+		return nil
+	}()
+	if err != nil {
+		return err
 	}
 	// Flush out any remainder data from the last batch
 	if err := batch.Write(); err != nil {
@@ -722,7 +732,9 @@ func (db *Database) Commit(node common.Hash, report bool, callback func(common.H
 		batch.Reset()
 	}
 	// Move the trie itself into the batch, flushing if enough data is accumulated
+	db.lock.RLock()
 	nodes, storage := len(db.dirties), db.dirtiesSize
+	db.lock.RUnlock()
 
 	uncacher := &cleaner{db}
 	if err := db.commit(node, batch, uncacher, callback); err != nil {
@@ -766,10 +778,14 @@ func (db *Database) Commit(node common.Hash, report bool, callback func(common.H
 // commit is the private locked version of Commit.
 func (db *Database) commit(hash common.Hash, batch ethdb.Batch, uncacher *cleaner, callback func(common.Hash)) error {
 	// If the node does not exist, it's a previously committed node
+	db.lock.RLock()
 	node, ok := db.dirties[hash]
 	if !ok {
+		db.lock.RUnlock()
 		return nil
 	}
+	db.lock.RUnlock()
+
 	var err error
 	node.forChilds(func(child common.Hash) {
 		if err == nil {