| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497 |
- // Copyright 2015 The go-ethereum Authors
- // This file is part of the go-ethereum library.
- //
- // The go-ethereum library is free software: you can redistribute it and/or modify
- // it under the terms of the GNU Lesser General Public License as published by
- // the Free Software Foundation, either version 3 of the License, or
- // (at your option) any later version.
- //
- // The go-ethereum library is distributed in the hope that it will be useful,
- // but WITHOUT ANY WARRANTY; without even the implied warranty of
- // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- // GNU Lesser General Public License for more details.
- //
- // You should have received a copy of the GNU Lesser General Public License
- // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
- package core
- import (
- "bytes"
- "errors"
- "fmt"
- "math/big"
- "math/rand"
- "sync"
- "time"
- "github.com/ethereum/go-ethereum/common"
- "github.com/ethereum/go-ethereum/common/gopool"
- "github.com/ethereum/go-ethereum/consensus"
- "github.com/ethereum/go-ethereum/consensus/misc"
- "github.com/ethereum/go-ethereum/core/rawdb"
- "github.com/ethereum/go-ethereum/core/state"
- "github.com/ethereum/go-ethereum/core/state/snapshot"
- "github.com/ethereum/go-ethereum/core/systemcontracts"
- "github.com/ethereum/go-ethereum/core/types"
- "github.com/ethereum/go-ethereum/core/vm"
- "github.com/ethereum/go-ethereum/crypto"
- "github.com/ethereum/go-ethereum/log"
- "github.com/ethereum/go-ethereum/params"
- "github.com/ethereum/go-ethereum/rlp"
- )
- const (
- fullProcessCheck = 21 // On diff sync mode, will do full process every fullProcessCheck randomly
- minNumberOfAccountPerTask = 5
- recentTime = 2048 * 3
- recentDiffLayerTimeout = 20
- farDiffLayerTimeout = 2
- )
- // StateProcessor is a basic Processor, which takes care of transitioning
- // state from one point to another.
- //
- // StateProcessor implements Processor.
- type StateProcessor struct {
- config *params.ChainConfig // Chain configuration options
- bc *BlockChain // Canonical block chain
- engine consensus.Engine // Consensus engine used for block rewards
- }
- // NewStateProcessor initialises a new StateProcessor.
- func NewStateProcessor(config *params.ChainConfig, bc *BlockChain, engine consensus.Engine) *StateProcessor {
- return &StateProcessor{
- config: config,
- bc: bc,
- engine: engine,
- }
- }
- type LightStateProcessor struct {
- randomGenerator *rand.Rand
- StateProcessor
- }
- func NewLightStateProcessor(config *params.ChainConfig, bc *BlockChain, engine consensus.Engine) *LightStateProcessor {
- randomGenerator := rand.New(rand.NewSource(int64(time.Now().Nanosecond())))
- return &LightStateProcessor{
- randomGenerator: randomGenerator,
- StateProcessor: *NewStateProcessor(config, bc, engine),
- }
- }
- func (p *LightStateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg vm.Config) (*state.StateDB, types.Receipts, []*types.Log, uint64, error) {
- allowLightProcess := true
- if posa, ok := p.engine.(consensus.PoSA); ok {
- allowLightProcess = posa.AllowLightProcess(p.bc, block.Header())
- }
- // random fallback to full process
- if check := p.randomGenerator.Int63n(fullProcessCheck); allowLightProcess && check != 0 && len(block.Transactions()) != 0 {
- var pid string
- if peer, ok := block.ReceivedFrom.(PeerIDer); ok {
- pid = peer.ID()
- }
- var diffLayer *types.DiffLayer
- var diffLayerTimeout = recentDiffLayerTimeout
- if time.Now().Unix()-int64(block.Time()) > recentTime {
- diffLayerTimeout = farDiffLayerTimeout
- }
- for tried := 0; tried < diffLayerTimeout; tried++ {
- // wait a bit for the diff layer
- diffLayer = p.bc.GetUnTrustedDiffLayer(block.Hash(), pid)
- if diffLayer != nil {
- break
- }
- time.Sleep(time.Millisecond)
- }
- if diffLayer != nil {
- if err := diffLayer.Receipts.DeriveFields(p.bc.chainConfig, block.Hash(), block.NumberU64(), block.Transactions()); err != nil {
- log.Error("Failed to derive block receipts fields", "hash", block.Hash(), "number", block.NumberU64(), "err", err)
- // fallback to full process
- return p.StateProcessor.Process(block, statedb, cfg)
- }
- receipts, logs, gasUsed, err := p.LightProcess(diffLayer, block, statedb)
- if err == nil {
- log.Info("do light process success at block", "num", block.NumberU64())
- return statedb, receipts, logs, gasUsed, nil
- }
- log.Error("do light process err at block", "num", block.NumberU64(), "err", err)
- p.bc.removeDiffLayers(diffLayer.DiffHash)
- // prepare new statedb
- statedb.StopPrefetcher()
- parent := p.bc.GetHeader(block.ParentHash(), block.NumberU64()-1)
- statedb, err = state.New(parent.Root, p.bc.stateCache, p.bc.snaps)
- if err != nil {
- return statedb, nil, nil, 0, err
- }
- // Enable prefetching to pull in trie node paths while processing transactions
- statedb.StartPrefetcher("chain")
- }
- }
- // fallback to full process
- return p.StateProcessor.Process(block, statedb, cfg)
- }
- func (p *LightStateProcessor) LightProcess(diffLayer *types.DiffLayer, block *types.Block, statedb *state.StateDB) (types.Receipts, []*types.Log, uint64, error) {
- statedb.MarkLightProcessed()
- fullDiffCode := make(map[common.Hash][]byte, len(diffLayer.Codes))
- diffTries := make(map[common.Address]state.Trie)
- diffCode := make(map[common.Hash][]byte)
- snapDestructs, snapAccounts, snapStorage, err := statedb.DiffLayerToSnap(diffLayer)
- if err != nil {
- return nil, nil, 0, err
- }
- for _, c := range diffLayer.Codes {
- fullDiffCode[c.Hash] = c.Code
- }
- for des := range snapDestructs {
- statedb.Trie().TryDelete(des[:])
- }
- threads := gopool.Threads(len(snapAccounts))
- iteAccounts := make([]common.Address, 0, len(snapAccounts))
- for diffAccount := range snapAccounts {
- iteAccounts = append(iteAccounts, diffAccount)
- }
- errChan := make(chan error, threads)
- exitChan := make(chan struct{})
- var snapMux sync.RWMutex
- var stateMux, diffMux sync.Mutex
- for i := 0; i < threads; i++ {
- start := i * len(iteAccounts) / threads
- end := (i + 1) * len(iteAccounts) / threads
- if i+1 == threads {
- end = len(iteAccounts)
- }
- go func(start, end int) {
- for index := start; index < end; index++ {
- select {
- // fast fail
- case <-exitChan:
- return
- default:
- }
- diffAccount := iteAccounts[index]
- snapMux.RLock()
- blob := snapAccounts[diffAccount]
- snapMux.RUnlock()
- addrHash := crypto.Keccak256Hash(diffAccount[:])
- latestAccount, err := snapshot.FullAccount(blob)
- if err != nil {
- errChan <- err
- return
- }
- // fetch previous state
- var previousAccount state.Account
- stateMux.Lock()
- enc, err := statedb.Trie().TryGet(diffAccount[:])
- stateMux.Unlock()
- if err != nil {
- errChan <- err
- return
- }
- if len(enc) != 0 {
- if err := rlp.DecodeBytes(enc, &previousAccount); err != nil {
- errChan <- err
- return
- }
- }
- if latestAccount.Balance == nil {
- latestAccount.Balance = new(big.Int)
- }
- if previousAccount.Balance == nil {
- previousAccount.Balance = new(big.Int)
- }
- if previousAccount.Root == (common.Hash{}) {
- previousAccount.Root = types.EmptyRootHash
- }
- if len(previousAccount.CodeHash) == 0 {
- previousAccount.CodeHash = types.EmptyCodeHash
- }
- // skip no change account
- if previousAccount.Nonce == latestAccount.Nonce &&
- bytes.Equal(previousAccount.CodeHash, latestAccount.CodeHash) &&
- previousAccount.Balance.Cmp(latestAccount.Balance) == 0 &&
- previousAccount.Root == common.BytesToHash(latestAccount.Root) {
- // It is normal to receive redundant message since the collected message is redundant.
- log.Debug("receive redundant account change in diff layer", "account", diffAccount, "num", block.NumberU64())
- snapMux.Lock()
- delete(snapAccounts, diffAccount)
- delete(snapStorage, diffAccount)
- snapMux.Unlock()
- continue
- }
- // update code
- codeHash := common.BytesToHash(latestAccount.CodeHash)
- if !bytes.Equal(latestAccount.CodeHash, previousAccount.CodeHash) &&
- !bytes.Equal(latestAccount.CodeHash, types.EmptyCodeHash) {
- if code, exist := fullDiffCode[codeHash]; exist {
- if crypto.Keccak256Hash(code) != codeHash {
- errChan <- fmt.Errorf("code and code hash mismatch, account %s", diffAccount.String())
- return
- }
- diffMux.Lock()
- diffCode[codeHash] = code
- diffMux.Unlock()
- } else {
- rawCode := rawdb.ReadCode(p.bc.db, codeHash)
- if len(rawCode) == 0 {
- errChan <- fmt.Errorf("missing code, account %s", diffAccount.String())
- return
- }
- }
- }
- //update storage
- latestRoot := common.BytesToHash(latestAccount.Root)
- if latestRoot != previousAccount.Root && latestRoot != types.EmptyRootHash {
- accountTrie, err := statedb.Database().OpenStorageTrie(addrHash, previousAccount.Root)
- if err != nil {
- errChan <- err
- return
- }
- snapMux.RLock()
- storageChange, exist := snapStorage[diffAccount]
- snapMux.RUnlock()
- if !exist {
- errChan <- errors.New("missing storage change in difflayer")
- return
- }
- for k, v := range storageChange {
- if len(v) != 0 {
- accountTrie.TryUpdate([]byte(k), v)
- } else {
- accountTrie.TryDelete([]byte(k))
- }
- }
- // check storage root
- accountRootHash := accountTrie.Hash()
- if latestRoot != accountRootHash {
- errChan <- errors.New("account storage root mismatch")
- return
- }
- diffMux.Lock()
- diffTries[diffAccount] = accountTrie
- diffMux.Unlock()
- } else {
- snapMux.Lock()
- delete(snapStorage, diffAccount)
- snapMux.Unlock()
- }
- // can't trust the blob, need encode by our-self.
- latestStateAccount := state.Account{
- Nonce: latestAccount.Nonce,
- Balance: latestAccount.Balance,
- Root: common.BytesToHash(latestAccount.Root),
- CodeHash: latestAccount.CodeHash,
- }
- bz, err := rlp.EncodeToBytes(&latestStateAccount)
- if err != nil {
- errChan <- err
- return
- }
- stateMux.Lock()
- err = statedb.Trie().TryUpdate(diffAccount[:], bz)
- stateMux.Unlock()
- if err != nil {
- errChan <- err
- return
- }
- }
- errChan <- nil
- }(start, end)
- }
- for i := 0; i < threads; i++ {
- err := <-errChan
- if err != nil {
- close(exitChan)
- return nil, nil, 0, err
- }
- }
- var allLogs []*types.Log
- var gasUsed uint64
- for _, receipt := range diffLayer.Receipts {
- allLogs = append(allLogs, receipt.Logs...)
- gasUsed += receipt.GasUsed
- }
- // Do validate in advance so that we can fall back to full process
- if err := p.bc.validator.ValidateState(block, statedb, diffLayer.Receipts, gasUsed); err != nil {
- log.Error("validate state failed during diff sync", "error", err)
- return nil, nil, 0, err
- }
- // remove redundant storage change
- for account := range snapStorage {
- if _, exist := snapAccounts[account]; !exist {
- log.Warn("receive redundant storage change in diff layer")
- delete(snapStorage, account)
- }
- }
- // remove redundant code
- if len(fullDiffCode) != len(diffLayer.Codes) {
- diffLayer.Codes = make([]types.DiffCode, 0, len(diffCode))
- for hash, code := range diffCode {
- diffLayer.Codes = append(diffLayer.Codes, types.DiffCode{
- Hash: hash,
- Code: code,
- })
- }
- }
- statedb.SetSnapData(snapDestructs, snapAccounts, snapStorage)
- if len(snapAccounts) != len(diffLayer.Accounts) || len(snapStorage) != len(diffLayer.Storages) {
- diffLayer.Destructs, diffLayer.Accounts, diffLayer.Storages = statedb.SnapToDiffLayer()
- }
- statedb.SetDiff(diffLayer, diffTries, diffCode)
- return diffLayer.Receipts, allLogs, gasUsed, nil
- }
- // Process processes the state changes according to the Ethereum rules by running
- // the transaction messages using the statedb and applying any rewards to both
- // the processor (coinbase) and any included uncles.
- //
- // Process returns the receipts and logs accumulated during the process and
- // returns the amount of gas that was used in the process. If any of the
- // transactions failed to execute due to insufficient gas it will return an error.
- func (p *StateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg vm.Config) (*state.StateDB, types.Receipts, []*types.Log, uint64, error) {
- var (
- usedGas = new(uint64)
- header = block.Header()
- allLogs []*types.Log
- gp = new(GasPool).AddGas(block.GasLimit())
- )
- signer := types.MakeSigner(p.bc.chainConfig, block.Number())
- statedb.TryPreload(block, signer)
- var receipts = make([]*types.Receipt, 0)
- // Mutate the block and state according to any hard-fork specs
- if p.config.DAOForkSupport && p.config.DAOForkBlock != nil && p.config.DAOForkBlock.Cmp(block.Number()) == 0 {
- misc.ApplyDAOHardFork(statedb)
- }
- // Handle upgrade build-in system contract code
- systemcontracts.UpgradeBuildInSystemContract(p.config, block.Number(), statedb)
- blockContext := NewEVMBlockContext(header, p.bc, nil)
- vmenv := vm.NewEVM(blockContext, vm.TxContext{}, statedb, p.config, cfg)
- // Iterate over and process the individual transactions
- posa, isPoSA := p.engine.(consensus.PoSA)
- commonTxs := make([]*types.Transaction, 0, len(block.Transactions()))
- // usually do have two tx, one for validator set contract, another for system reward contract.
- systemTxs := make([]*types.Transaction, 0, 2)
- for i, tx := range block.Transactions() {
- if isPoSA {
- if isSystemTx, err := posa.IsSystemTransaction(tx, block.Header()); err != nil {
- return statedb, nil, nil, 0, err
- } else if isSystemTx {
- systemTxs = append(systemTxs, tx)
- continue
- }
- }
- msg, err := tx.AsMessage(signer)
- if err != nil {
- return statedb, nil, nil, 0, err
- }
- statedb.Prepare(tx.Hash(), block.Hash(), i)
- receipt, err := applyTransaction(msg, p.config, p.bc, nil, gp, statedb, header, tx, usedGas, vmenv)
- if err != nil {
- return statedb, nil, nil, 0, fmt.Errorf("could not apply tx %d [%v]: %w", i, tx.Hash().Hex(), err)
- }
- commonTxs = append(commonTxs, tx)
- receipts = append(receipts, receipt)
- }
- // Finalize the block, applying any consensus engine specific extras (e.g. block rewards)
- err := p.engine.Finalize(p.bc, header, statedb, &commonTxs, block.Uncles(), &receipts, &systemTxs, usedGas)
- if err != nil {
- return statedb, receipts, allLogs, *usedGas, err
- }
- for _, receipt := range receipts {
- allLogs = append(allLogs, receipt.Logs...)
- }
- return statedb, receipts, allLogs, *usedGas, nil
- }
- func applyTransaction(msg types.Message, config *params.ChainConfig, bc ChainContext, author *common.Address, gp *GasPool, statedb *state.StateDB, header *types.Header, tx *types.Transaction, usedGas *uint64, evm *vm.EVM) (*types.Receipt, error) {
- // Create a new context to be used in the EVM environment.
- txContext := NewEVMTxContext(msg)
- evm.Reset(txContext, statedb)
- // Apply the transaction to the current state (included in the env).
- result, err := ApplyMessage(evm, msg, gp)
- if err != nil {
- return nil, err
- }
- // Update the state with pending changes.
- var root []byte
- if config.IsByzantium(header.Number) {
- statedb.Finalise(true)
- } else {
- root = statedb.IntermediateRoot(config.IsEIP158(header.Number)).Bytes()
- }
- *usedGas += result.UsedGas
- // Create a new receipt for the transaction, storing the intermediate root and gas used
- // by the tx.
- receipt := &types.Receipt{Type: tx.Type(), PostState: root, CumulativeGasUsed: *usedGas}
- if result.Failed() {
- receipt.Status = types.ReceiptStatusFailed
- } else {
- receipt.Status = types.ReceiptStatusSuccessful
- }
- receipt.TxHash = tx.Hash()
- receipt.GasUsed = result.UsedGas
- // If the transaction created a contract, store the creation address in the receipt.
- if msg.To() == nil {
- receipt.ContractAddress = crypto.CreateAddress(evm.TxContext.Origin, tx.Nonce())
- }
- // Set the receipt logs and create the bloom filter.
- receipt.Logs = statedb.GetLogs(tx.Hash())
- receipt.Bloom = types.CreateBloom(types.Receipts{receipt})
- receipt.BlockHash = statedb.BlockHash()
- receipt.BlockNumber = header.Number
- receipt.TransactionIndex = uint(statedb.TxIndex())
- return receipt, err
- }
- // ApplyTransaction attempts to apply a transaction to the given state database
- // and uses the input parameters for its environment. It returns the receipt
- // for the transaction, gas used and an error if the transaction failed,
- // indicating the block was invalid.
- func ApplyTransaction(config *params.ChainConfig, bc ChainContext, author *common.Address, gp *GasPool, statedb *state.StateDB, header *types.Header, tx *types.Transaction, usedGas *uint64, cfg vm.Config) (*types.Receipt, error) {
- msg, err := tx.AsMessage(types.MakeSigner(config, header.Number))
- if err != nil {
- return nil, err
- }
- // Create a new context to be used in the EVM environment
- blockContext := NewEVMBlockContext(header, bc, author)
- vmenv := vm.NewEVM(blockContext, vm.TxContext{}, statedb, config, cfg)
- defer func() {
- ite := vmenv.Interpreter()
- vm.EVMInterpreterPool.Put(ite)
- vm.EvmPool.Put(vmenv)
- }()
- return applyTransaction(msg, config, bc, author, gp, statedb, header, tx, usedGas, vmenv)
- }
|