state_processor.go 17 KB


  1. // Copyright 2015 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package core
  17. import (
  18. "bytes"
  19. "errors"
  20. "fmt"
  21. "math/big"
  22. "math/rand"
  23. "sync"
  24. "time"
  25. "github.com/ethereum/go-ethereum/common"
  26. "github.com/ethereum/go-ethereum/common/gopool"
  27. "github.com/ethereum/go-ethereum/consensus"
  28. "github.com/ethereum/go-ethereum/consensus/misc"
  29. "github.com/ethereum/go-ethereum/core/rawdb"
  30. "github.com/ethereum/go-ethereum/core/state"
  31. "github.com/ethereum/go-ethereum/core/state/snapshot"
  32. "github.com/ethereum/go-ethereum/core/systemcontracts"
  33. "github.com/ethereum/go-ethereum/core/types"
  34. "github.com/ethereum/go-ethereum/core/vm"
  35. "github.com/ethereum/go-ethereum/crypto"
  36. "github.com/ethereum/go-ethereum/log"
  37. "github.com/ethereum/go-ethereum/params"
  38. "github.com/ethereum/go-ethereum/rlp"
  39. )
  40. const (
  41. fullProcessCheck = 21 // On diff sync mode, will do full process every fullProcessCheck randomly
  42. recentTime = 1024 * 3
  43. recentDiffLayerTimeout = 5
  44. farDiffLayerTimeout = 2
  45. )
  46. // StateProcessor is a basic Processor, which takes care of transitioning
  47. // state from one point to another.
  48. //
  49. // StateProcessor implements Processor.
  50. type StateProcessor struct {
  51. config *params.ChainConfig // Chain configuration options
  52. bc *BlockChain // Canonical block chain
  53. engine consensus.Engine // Consensus engine used for block rewards
  54. }
  55. // NewStateProcessor initialises a new StateProcessor.
  56. func NewStateProcessor(config *params.ChainConfig, bc *BlockChain, engine consensus.Engine) *StateProcessor {
  57. return &StateProcessor{
  58. config: config,
  59. bc: bc,
  60. engine: engine,
  61. }
  62. }
  63. type LightStateProcessor struct {
  64. check int64
  65. StateProcessor
  66. }
  67. func NewLightStateProcessor(config *params.ChainConfig, bc *BlockChain, engine consensus.Engine) *LightStateProcessor {
  68. randomGenerator := rand.New(rand.NewSource(int64(time.Now().Nanosecond())))
  69. check := randomGenerator.Int63n(fullProcessCheck)
  70. return &LightStateProcessor{
  71. check: check,
  72. StateProcessor: *NewStateProcessor(config, bc, engine),
  73. }
  74. }
  75. func (p *LightStateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg vm.Config) (*state.StateDB, types.Receipts, []*types.Log, uint64, error) {
  76. allowLightProcess := true
  77. if posa, ok := p.engine.(consensus.PoSA); ok {
  78. allowLightProcess = posa.AllowLightProcess(p.bc, block.Header())
  79. }
  80. // random fallback to full process
  81. if allowLightProcess && block.NumberU64()%fullProcessCheck != uint64(p.check) && len(block.Transactions()) != 0 {
  82. var pid string
  83. if peer, ok := block.ReceivedFrom.(PeerIDer); ok {
  84. pid = peer.ID()
  85. }
  86. var diffLayer *types.DiffLayer
  87. var diffLayerTimeout = recentDiffLayerTimeout
  88. if time.Now().Unix()-int64(block.Time()) > recentTime {
  89. diffLayerTimeout = farDiffLayerTimeout
  90. }
  91. for tried := 0; tried < diffLayerTimeout; tried++ {
  92. // wait a bit for the diff layer
  93. diffLayer = p.bc.GetUnTrustedDiffLayer(block.Hash(), pid)
  94. if diffLayer != nil {
  95. break
  96. }
  97. time.Sleep(time.Millisecond)
  98. }
  99. if diffLayer != nil {
  100. if err := diffLayer.Receipts.DeriveFields(p.bc.chainConfig, block.Hash(), block.NumberU64(), block.Transactions()); err != nil {
  101. log.Error("Failed to derive block receipts fields", "hash", block.Hash(), "number", block.NumberU64(), "err", err)
  102. // fallback to full process
  103. return p.StateProcessor.Process(block, statedb, cfg)
  104. }
  105. receipts, logs, gasUsed, err := p.LightProcess(diffLayer, block, statedb)
  106. if err == nil {
  107. log.Info("do light process success at block", "num", block.NumberU64())
  108. return statedb, receipts, logs, gasUsed, nil
  109. }
  110. log.Error("do light process err at block", "num", block.NumberU64(), "err", err)
  111. p.bc.removeDiffLayers(diffLayer.DiffHash)
  112. // prepare new statedb
  113. statedb.StopPrefetcher()
  114. parent := p.bc.GetHeader(block.ParentHash(), block.NumberU64()-1)
  115. statedb, err = state.New(parent.Root, p.bc.stateCache, p.bc.snaps)
  116. if err != nil {
  117. return statedb, nil, nil, 0, err
  118. }
  119. // Enable prefetching to pull in trie node paths while processing transactions
  120. statedb.StartPrefetcher("chain")
  121. }
  122. }
  123. // fallback to full process
  124. return p.StateProcessor.Process(block, statedb, cfg)
  125. }
  126. func (p *LightStateProcessor) LightProcess(diffLayer *types.DiffLayer, block *types.Block, statedb *state.StateDB) (types.Receipts, []*types.Log, uint64, error) {
  127. statedb.MarkLightProcessed()
  128. fullDiffCode := make(map[common.Hash][]byte, len(diffLayer.Codes))
  129. diffTries := make(map[common.Address]state.Trie)
  130. diffCode := make(map[common.Hash][]byte)
  131. snapDestructs, snapAccounts, snapStorage, err := statedb.DiffLayerToSnap(diffLayer)
  132. if err != nil {
  133. return nil, nil, 0, err
  134. }
  135. for _, c := range diffLayer.Codes {
  136. fullDiffCode[c.Hash] = c.Code
  137. }
  138. for des := range snapDestructs {
  139. statedb.Trie().TryDelete(des[:])
  140. }
  141. threads := gopool.Threads(len(snapAccounts))
  142. iteAccounts := make([]common.Address, 0, len(snapAccounts))
  143. for diffAccount := range snapAccounts {
  144. iteAccounts = append(iteAccounts, diffAccount)
  145. }
  146. errChan := make(chan error, threads)
  147. exitChan := make(chan struct{})
  148. var snapMux sync.RWMutex
  149. var stateMux, diffMux sync.Mutex
  150. for i := 0; i < threads; i++ {
  151. start := i * len(iteAccounts) / threads
  152. end := (i + 1) * len(iteAccounts) / threads
  153. if i+1 == threads {
  154. end = len(iteAccounts)
  155. }
  156. go func(start, end int) {
  157. for index := start; index < end; index++ {
  158. select {
  159. // fast fail
  160. case <-exitChan:
  161. return
  162. default:
  163. }
  164. diffAccount := iteAccounts[index]
  165. snapMux.RLock()
  166. blob := snapAccounts[diffAccount]
  167. snapMux.RUnlock()
  168. addrHash := crypto.Keccak256Hash(diffAccount[:])
  169. latestAccount, err := snapshot.FullAccount(blob)
  170. if err != nil {
  171. errChan <- err
  172. return
  173. }
  174. // fetch previous state
  175. var previousAccount state.Account
  176. stateMux.Lock()
  177. enc, err := statedb.Trie().TryGet(diffAccount[:])
  178. stateMux.Unlock()
  179. if err != nil {
  180. errChan <- err
  181. return
  182. }
  183. if len(enc) != 0 {
  184. if err := rlp.DecodeBytes(enc, &previousAccount); err != nil {
  185. errChan <- err
  186. return
  187. }
  188. }
  189. if latestAccount.Balance == nil {
  190. latestAccount.Balance = new(big.Int)
  191. }
  192. if previousAccount.Balance == nil {
  193. previousAccount.Balance = new(big.Int)
  194. }
  195. if previousAccount.Root == (common.Hash{}) {
  196. previousAccount.Root = types.EmptyRootHash
  197. }
  198. if len(previousAccount.CodeHash) == 0 {
  199. previousAccount.CodeHash = types.EmptyCodeHash
  200. }
  201. // skip no change account
  202. if previousAccount.Nonce == latestAccount.Nonce &&
  203. bytes.Equal(previousAccount.CodeHash, latestAccount.CodeHash) &&
  204. previousAccount.Balance.Cmp(latestAccount.Balance) == 0 &&
  205. previousAccount.Root == common.BytesToHash(latestAccount.Root) {
  206. // It is normal to receive redundant message since the collected message is redundant.
  207. log.Debug("receive redundant account change in diff layer", "account", diffAccount, "num", block.NumberU64())
  208. snapMux.Lock()
  209. delete(snapAccounts, diffAccount)
  210. delete(snapStorage, diffAccount)
  211. snapMux.Unlock()
  212. continue
  213. }
  214. // update code
  215. codeHash := common.BytesToHash(latestAccount.CodeHash)
  216. if !bytes.Equal(latestAccount.CodeHash, previousAccount.CodeHash) &&
  217. !bytes.Equal(latestAccount.CodeHash, types.EmptyCodeHash) {
  218. if code, exist := fullDiffCode[codeHash]; exist {
  219. if crypto.Keccak256Hash(code) != codeHash {
  220. errChan <- fmt.Errorf("code and code hash mismatch, account %s", diffAccount.String())
  221. return
  222. }
  223. diffMux.Lock()
  224. diffCode[codeHash] = code
  225. diffMux.Unlock()
  226. } else {
  227. rawCode := rawdb.ReadCode(p.bc.db, codeHash)
  228. if len(rawCode) == 0 {
  229. errChan <- fmt.Errorf("missing code, account %s", diffAccount.String())
  230. return
  231. }
  232. }
  233. }
  234. //update storage
  235. latestRoot := common.BytesToHash(latestAccount.Root)
  236. if latestRoot != previousAccount.Root {
  237. accountTrie, err := statedb.Database().OpenStorageTrie(addrHash, previousAccount.Root)
  238. if err != nil {
  239. errChan <- err
  240. return
  241. }
  242. snapMux.RLock()
  243. storageChange, exist := snapStorage[diffAccount]
  244. snapMux.RUnlock()
  245. if !exist {
  246. errChan <- errors.New("missing storage change in difflayer")
  247. return
  248. }
  249. for k, v := range storageChange {
  250. if len(v) != 0 {
  251. accountTrie.TryUpdate([]byte(k), v)
  252. } else {
  253. accountTrie.TryDelete([]byte(k))
  254. }
  255. }
  256. // check storage root
  257. accountRootHash := accountTrie.Hash()
  258. if latestRoot != accountRootHash {
  259. errChan <- errors.New("account storage root mismatch")
  260. return
  261. }
  262. diffMux.Lock()
  263. diffTries[diffAccount] = accountTrie
  264. diffMux.Unlock()
  265. } else {
  266. snapMux.Lock()
  267. delete(snapStorage, diffAccount)
  268. snapMux.Unlock()
  269. }
  270. // can't trust the blob, need encode by our-self.
  271. latestStateAccount := state.Account{
  272. Nonce: latestAccount.Nonce,
  273. Balance: latestAccount.Balance,
  274. Root: common.BytesToHash(latestAccount.Root),
  275. CodeHash: latestAccount.CodeHash,
  276. }
  277. bz, err := rlp.EncodeToBytes(&latestStateAccount)
  278. if err != nil {
  279. errChan <- err
  280. return
  281. }
  282. stateMux.Lock()
  283. err = statedb.Trie().TryUpdate(diffAccount[:], bz)
  284. stateMux.Unlock()
  285. if err != nil {
  286. errChan <- err
  287. return
  288. }
  289. }
  290. errChan <- nil
  291. }(start, end)
  292. }
  293. for i := 0; i < threads; i++ {
  294. err := <-errChan
  295. if err != nil {
  296. close(exitChan)
  297. return nil, nil, 0, err
  298. }
  299. }
  300. var allLogs []*types.Log
  301. var gasUsed uint64
  302. for _, receipt := range diffLayer.Receipts {
  303. allLogs = append(allLogs, receipt.Logs...)
  304. gasUsed += receipt.GasUsed
  305. }
  306. // Do validate in advance so that we can fall back to full process
  307. if err := p.bc.validator.ValidateState(block, statedb, diffLayer.Receipts, gasUsed); err != nil {
  308. log.Error("validate state failed during diff sync", "error", err)
  309. return nil, nil, 0, err
  310. }
  311. // remove redundant storage change
  312. for account := range snapStorage {
  313. if _, exist := snapAccounts[account]; !exist {
  314. log.Warn("receive redundant storage change in diff layer")
  315. delete(snapStorage, account)
  316. }
  317. }
  318. // remove redundant code
  319. if len(fullDiffCode) != len(diffLayer.Codes) {
  320. diffLayer.Codes = make([]types.DiffCode, 0, len(diffCode))
  321. for hash, code := range diffCode {
  322. diffLayer.Codes = append(diffLayer.Codes, types.DiffCode{
  323. Hash: hash,
  324. Code: code,
  325. })
  326. }
  327. }
  328. statedb.SetSnapData(snapDestructs, snapAccounts, snapStorage)
  329. if len(snapAccounts) != len(diffLayer.Accounts) || len(snapStorage) != len(diffLayer.Storages) {
  330. diffLayer.Destructs, diffLayer.Accounts, diffLayer.Storages = statedb.SnapToDiffLayer()
  331. }
  332. statedb.SetDiff(diffLayer, diffTries, diffCode)
  333. return diffLayer.Receipts, allLogs, gasUsed, nil
  334. }
  335. // Process processes the state changes according to the Ethereum rules by running
  336. // the transaction messages using the statedb and applying any rewards to both
  337. // the processor (coinbase) and any included uncles.
  338. //
  339. // Process returns the receipts and logs accumulated during the process and
  340. // returns the amount of gas that was used in the process. If any of the
  341. // transactions failed to execute due to insufficient gas it will return an error.
  342. func (p *StateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg vm.Config) (*state.StateDB, types.Receipts, []*types.Log, uint64, error) {
  343. var (
  344. usedGas = new(uint64)
  345. header = block.Header()
  346. allLogs []*types.Log
  347. gp = new(GasPool).AddGas(block.GasLimit())
  348. )
  349. signer := types.MakeSigner(p.bc.chainConfig, block.Number())
  350. var receipts = make([]*types.Receipt, 0)
  351. // Mutate the block and state according to any hard-fork specs
  352. if p.config.DAOForkSupport && p.config.DAOForkBlock != nil && p.config.DAOForkBlock.Cmp(block.Number()) == 0 {
  353. misc.ApplyDAOHardFork(statedb)
  354. }
  355. // Handle upgrade build-in system contract code
  356. systemcontracts.UpgradeBuildInSystemContract(p.config, block.Number(), statedb)
  357. blockContext := NewEVMBlockContext(header, p.bc, nil)
  358. vmenv := vm.NewEVM(blockContext, vm.TxContext{}, statedb, p.config, cfg)
  359. txNum := len(block.Transactions())
  360. // Iterate over and process the individual transactions
  361. posa, isPoSA := p.engine.(consensus.PoSA)
  362. commonTxs := make([]*types.Transaction, 0, txNum)
  363. // initilise bloom processors
  364. bloomProcessors := NewAsyncReceiptBloomGenerator(txNum)
  365. // usually do have two tx, one for validator set contract, another for system reward contract.
  366. systemTxs := make([]*types.Transaction, 0, 2)
  367. for i, tx := range block.Transactions() {
  368. if isPoSA {
  369. if isSystemTx, err := posa.IsSystemTransaction(tx, block.Header()); err != nil {
  370. return statedb, nil, nil, 0, err
  371. } else if isSystemTx {
  372. systemTxs = append(systemTxs, tx)
  373. continue
  374. }
  375. }
  376. msg, err := tx.AsMessage(signer)
  377. if err != nil {
  378. return statedb, nil, nil, 0, err
  379. }
  380. statedb.Prepare(tx.Hash(), block.Hash(), i)
  381. receipt, err := applyTransaction(msg, p.config, p.bc, nil, gp, statedb, header, tx, usedGas, vmenv, bloomProcessors)
  382. if err != nil {
  383. return statedb, nil, nil, 0, fmt.Errorf("could not apply tx %d [%v]: %w", i, tx.Hash().Hex(), err)
  384. }
  385. commonTxs = append(commonTxs, tx)
  386. receipts = append(receipts, receipt)
  387. }
  388. bloomProcessors.Close()
  389. // Finalize the block, applying any consensus engine specific extras (e.g. block rewards)
  390. err := p.engine.Finalize(p.bc, header, statedb, &commonTxs, block.Uncles(), &receipts, &systemTxs, usedGas)
  391. if err != nil {
  392. return statedb, receipts, allLogs, *usedGas, err
  393. }
  394. for _, receipt := range receipts {
  395. allLogs = append(allLogs, receipt.Logs...)
  396. }
  397. return statedb, receipts, allLogs, *usedGas, nil
  398. }
  399. func applyTransaction(msg types.Message, config *params.ChainConfig, bc ChainContext, author *common.Address, gp *GasPool, statedb *state.StateDB, header *types.Header, tx *types.Transaction, usedGas *uint64, evm *vm.EVM, receiptProcessors ...ReceiptProcessor) (*types.Receipt, error) {
  400. // Create a new context to be used in the EVM environment.
  401. txContext := NewEVMTxContext(msg)
  402. evm.Reset(txContext, statedb)
  403. // Apply the transaction to the current state (included in the env).
  404. result, err := ApplyMessage(evm, msg, gp)
  405. if err != nil {
  406. return nil, err
  407. }
  408. // Update the state with pending changes.
  409. var root []byte
  410. if config.IsByzantium(header.Number) {
  411. statedb.Finalise(true)
  412. } else {
  413. root = statedb.IntermediateRoot(config.IsEIP158(header.Number)).Bytes()
  414. }
  415. *usedGas += result.UsedGas
  416. // Create a new receipt for the transaction, storing the intermediate root and gas used
  417. // by the tx.
  418. receipt := &types.Receipt{Type: tx.Type(), PostState: root, CumulativeGasUsed: *usedGas}
  419. if result.Failed() {
  420. receipt.Status = types.ReceiptStatusFailed
  421. } else {
  422. receipt.Status = types.ReceiptStatusSuccessful
  423. }
  424. receipt.TxHash = tx.Hash()
  425. receipt.GasUsed = result.UsedGas
  426. // If the transaction created a contract, store the creation address in the receipt.
  427. if msg.To() == nil {
  428. receipt.ContractAddress = crypto.CreateAddress(evm.TxContext.Origin, tx.Nonce())
  429. }
  430. // Set the receipt logs and create the bloom filter.
  431. receipt.Logs = statedb.GetLogs(tx.Hash())
  432. receipt.BlockHash = statedb.BlockHash()
  433. receipt.BlockNumber = header.Number
  434. receipt.TransactionIndex = uint(statedb.TxIndex())
  435. for _, receiptProcessor := range receiptProcessors {
  436. receiptProcessor.Apply(receipt)
  437. }
  438. return receipt, err
  439. }
  440. // ApplyTransaction attempts to apply a transaction to the given state database
  441. // and uses the input parameters for its environment. It returns the receipt
  442. // for the transaction, gas used and an error if the transaction failed,
  443. // indicating the block was invalid.
  444. func ApplyTransaction(config *params.ChainConfig, bc ChainContext, author *common.Address, gp *GasPool, statedb *state.StateDB, header *types.Header, tx *types.Transaction, usedGas *uint64, cfg vm.Config, receiptProcessors ...ReceiptProcessor) (*types.Receipt, error) {
  445. msg, err := tx.AsMessage(types.MakeSigner(config, header.Number))
  446. if err != nil {
  447. return nil, err
  448. }
  449. // Create a new context to be used in the EVM environment
  450. blockContext := NewEVMBlockContext(header, bc, author)
  451. vmenv := vm.NewEVM(blockContext, vm.TxContext{}, statedb, config, cfg)
  452. defer func() {
  453. ite := vmenv.Interpreter()
  454. vm.EVMInterpreterPool.Put(ite)
  455. vm.EvmPool.Put(vmenv)
  456. }()
  457. return applyTransaction(msg, config, bc, author, gp, statedb, header, tx, usedGas, vmenv, receiptProcessors...)
  458. }