state_processor.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523
  1. // Copyright 2015 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package core
  17. import (
  18. "bytes"
  19. "errors"
  20. "fmt"
  21. "math/big"
  22. "math/rand"
  23. "sync"
  24. "time"
  25. "github.com/ethereum/go-ethereum/common"
  26. "github.com/ethereum/go-ethereum/common/gopool"
  27. "github.com/ethereum/go-ethereum/consensus"
  28. "github.com/ethereum/go-ethereum/consensus/misc"
  29. "github.com/ethereum/go-ethereum/core/rawdb"
  30. "github.com/ethereum/go-ethereum/core/state"
  31. "github.com/ethereum/go-ethereum/core/state/snapshot"
  32. "github.com/ethereum/go-ethereum/core/systemcontracts"
  33. "github.com/ethereum/go-ethereum/core/types"
  34. "github.com/ethereum/go-ethereum/core/vm"
  35. "github.com/ethereum/go-ethereum/crypto"
  36. "github.com/ethereum/go-ethereum/log"
  37. "github.com/ethereum/go-ethereum/params"
  38. "github.com/ethereum/go-ethereum/rlp"
  39. )
  40. const (
  41. fullProcessCheck = 21 // On diff sync mode, will do full process every fullProcessCheck randomly
  42. recentTime = 1024 * 3
  43. recentDiffLayerTimeout = 5
  44. farDiffLayerTimeout = 2
  45. )
  46. // StateProcessor is a basic Processor, which takes care of transitioning
  47. // state from one point to another.
  48. //
  49. // StateProcessor implements Processor.
  50. type StateProcessor struct {
  51. config *params.ChainConfig // Chain configuration options
  52. bc *BlockChain // Canonical block chain
  53. engine consensus.Engine // Consensus engine used for block rewards
  54. }
  55. // NewStateProcessor initialises a new StateProcessor.
  56. func NewStateProcessor(config *params.ChainConfig, bc *BlockChain, engine consensus.Engine) *StateProcessor {
  57. return &StateProcessor{
  58. config: config,
  59. bc: bc,
  60. engine: engine,
  61. }
  62. }
  63. type LightStateProcessor struct {
  64. check int64
  65. StateProcessor
  66. }
  67. func NewLightStateProcessor(config *params.ChainConfig, bc *BlockChain, engine consensus.Engine) *LightStateProcessor {
  68. randomGenerator := rand.New(rand.NewSource(int64(time.Now().Nanosecond())))
  69. check := randomGenerator.Int63n(fullProcessCheck)
  70. return &LightStateProcessor{
  71. check: check,
  72. StateProcessor: *NewStateProcessor(config, bc, engine),
  73. }
  74. }
  75. func (p *LightStateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg vm.Config) (*state.StateDB, types.Receipts, []*types.Log, uint64, error) {
  76. allowLightProcess := true
  77. if posa, ok := p.engine.(consensus.PoSA); ok {
  78. allowLightProcess = posa.AllowLightProcess(p.bc, block.Header())
  79. }
  80. // random fallback to full process
  81. if allowLightProcess && block.NumberU64()%fullProcessCheck != uint64(p.check) && len(block.Transactions()) != 0 {
  82. var pid string
  83. if peer, ok := block.ReceivedFrom.(PeerIDer); ok {
  84. pid = peer.ID()
  85. }
  86. var diffLayer *types.DiffLayer
  87. var diffLayerTimeout = recentDiffLayerTimeout
  88. if time.Now().Unix()-int64(block.Time()) > recentTime {
  89. diffLayerTimeout = farDiffLayerTimeout
  90. }
  91. for tried := 0; tried < diffLayerTimeout; tried++ {
  92. // wait a bit for the diff layer
  93. diffLayer = p.bc.GetUnTrustedDiffLayer(block.Hash(), pid)
  94. if diffLayer != nil {
  95. break
  96. }
  97. time.Sleep(time.Millisecond)
  98. }
  99. if diffLayer != nil {
  100. if err := diffLayer.Receipts.DeriveFields(p.bc.chainConfig, block.Hash(), block.NumberU64(), block.Transactions()); err != nil {
  101. log.Error("Failed to derive block receipts fields", "hash", block.Hash(), "number", block.NumberU64(), "err", err)
  102. // fallback to full process
  103. return p.StateProcessor.Process(block, statedb, cfg)
  104. }
  105. receipts, logs, gasUsed, err := p.LightProcess(diffLayer, block, statedb)
  106. if err == nil {
  107. log.Info("do light process success at block", "num", block.NumberU64())
  108. return statedb, receipts, logs, gasUsed, nil
  109. }
  110. log.Error("do light process err at block", "num", block.NumberU64(), "err", err)
  111. p.bc.removeDiffLayers(diffLayer.DiffHash)
  112. // prepare new statedb
  113. statedb.StopPrefetcher()
  114. parent := p.bc.GetHeader(block.ParentHash(), block.NumberU64()-1)
  115. statedb, err = state.New(parent.Root, p.bc.stateCache, p.bc.snaps)
  116. statedb.SetExpectedStateRoot(block.Root())
  117. if p.bc.pipeCommit {
  118. statedb.EnablePipeCommit()
  119. }
  120. if err != nil {
  121. return statedb, nil, nil, 0, err
  122. }
  123. // Enable prefetching to pull in trie node paths while processing transactions
  124. statedb.StartPrefetcher("chain")
  125. }
  126. }
  127. // fallback to full process
  128. return p.StateProcessor.Process(block, statedb, cfg)
  129. }
  130. func (p *LightStateProcessor) LightProcess(diffLayer *types.DiffLayer, block *types.Block, statedb *state.StateDB) (types.Receipts, []*types.Log, uint64, error) {
  131. statedb.MarkLightProcessed()
  132. fullDiffCode := make(map[common.Hash][]byte, len(diffLayer.Codes))
  133. diffTries := make(map[common.Address]state.Trie)
  134. diffCode := make(map[common.Hash][]byte)
  135. snapDestructs, snapAccounts, snapStorage, err := statedb.DiffLayerToSnap(diffLayer)
  136. if err != nil {
  137. return nil, nil, 0, err
  138. }
  139. for _, c := range diffLayer.Codes {
  140. fullDiffCode[c.Hash] = c.Code
  141. }
  142. stateTrie, err := statedb.Trie()
  143. if err != nil {
  144. return nil, nil, 0, err
  145. }
  146. for des := range snapDestructs {
  147. stateTrie.TryDelete(des[:])
  148. }
  149. threads := gopool.Threads(len(snapAccounts))
  150. iteAccounts := make([]common.Address, 0, len(snapAccounts))
  151. for diffAccount := range snapAccounts {
  152. iteAccounts = append(iteAccounts, diffAccount)
  153. }
  154. errChan := make(chan error, threads)
  155. exitChan := make(chan struct{})
  156. var snapMux sync.RWMutex
  157. var stateMux, diffMux sync.Mutex
  158. for i := 0; i < threads; i++ {
  159. start := i * len(iteAccounts) / threads
  160. end := (i + 1) * len(iteAccounts) / threads
  161. if i+1 == threads {
  162. end = len(iteAccounts)
  163. }
  164. go func(start, end int) {
  165. for index := start; index < end; index++ {
  166. select {
  167. // fast fail
  168. case <-exitChan:
  169. return
  170. default:
  171. }
  172. diffAccount := iteAccounts[index]
  173. snapMux.RLock()
  174. blob := snapAccounts[diffAccount]
  175. snapMux.RUnlock()
  176. addrHash := crypto.Keccak256Hash(diffAccount[:])
  177. latestAccount, err := snapshot.FullAccount(blob)
  178. if err != nil {
  179. errChan <- err
  180. return
  181. }
  182. // fetch previous state
  183. var previousAccount state.Account
  184. stateMux.Lock()
  185. enc, err := stateTrie.TryGet(diffAccount[:])
  186. stateMux.Unlock()
  187. if err != nil {
  188. errChan <- err
  189. return
  190. }
  191. if len(enc) != 0 {
  192. if err := rlp.DecodeBytes(enc, &previousAccount); err != nil {
  193. errChan <- err
  194. return
  195. }
  196. }
  197. if latestAccount.Balance == nil {
  198. latestAccount.Balance = new(big.Int)
  199. }
  200. if previousAccount.Balance == nil {
  201. previousAccount.Balance = new(big.Int)
  202. }
  203. if previousAccount.Root == (common.Hash{}) {
  204. previousAccount.Root = types.EmptyRootHash
  205. }
  206. if len(previousAccount.CodeHash) == 0 {
  207. previousAccount.CodeHash = types.EmptyCodeHash
  208. }
  209. // skip no change account
  210. if previousAccount.Nonce == latestAccount.Nonce &&
  211. bytes.Equal(previousAccount.CodeHash, latestAccount.CodeHash) &&
  212. previousAccount.Balance.Cmp(latestAccount.Balance) == 0 &&
  213. previousAccount.Root == common.BytesToHash(latestAccount.Root) {
  214. // It is normal to receive redundant message since the collected message is redundant.
  215. log.Debug("receive redundant account change in diff layer", "account", diffAccount, "num", block.NumberU64())
  216. snapMux.Lock()
  217. delete(snapAccounts, diffAccount)
  218. delete(snapStorage, diffAccount)
  219. snapMux.Unlock()
  220. continue
  221. }
  222. // update code
  223. codeHash := common.BytesToHash(latestAccount.CodeHash)
  224. if !bytes.Equal(latestAccount.CodeHash, previousAccount.CodeHash) &&
  225. !bytes.Equal(latestAccount.CodeHash, types.EmptyCodeHash) {
  226. if code, exist := fullDiffCode[codeHash]; exist {
  227. if crypto.Keccak256Hash(code) != codeHash {
  228. errChan <- fmt.Errorf("code and code hash mismatch, account %s", diffAccount.String())
  229. return
  230. }
  231. diffMux.Lock()
  232. diffCode[codeHash] = code
  233. diffMux.Unlock()
  234. } else {
  235. rawCode := rawdb.ReadCode(p.bc.db, codeHash)
  236. if len(rawCode) == 0 {
  237. errChan <- fmt.Errorf("missing code, account %s", diffAccount.String())
  238. return
  239. }
  240. }
  241. }
  242. //update storage
  243. latestRoot := common.BytesToHash(latestAccount.Root)
  244. if latestRoot != previousAccount.Root {
  245. accountTrie, err := statedb.Database().OpenStorageTrie(addrHash, previousAccount.Root)
  246. if err != nil {
  247. errChan <- err
  248. return
  249. }
  250. snapMux.RLock()
  251. storageChange, exist := snapStorage[diffAccount]
  252. snapMux.RUnlock()
  253. if !exist {
  254. errChan <- errors.New("missing storage change in difflayer")
  255. return
  256. }
  257. for k, v := range storageChange {
  258. if len(v) != 0 {
  259. accountTrie.TryUpdate([]byte(k), v)
  260. } else {
  261. accountTrie.TryDelete([]byte(k))
  262. }
  263. }
  264. // check storage root
  265. accountRootHash := accountTrie.Hash()
  266. if latestRoot != accountRootHash {
  267. errChan <- errors.New("account storage root mismatch")
  268. return
  269. }
  270. diffMux.Lock()
  271. diffTries[diffAccount] = accountTrie
  272. diffMux.Unlock()
  273. } else {
  274. snapMux.Lock()
  275. delete(snapStorage, diffAccount)
  276. snapMux.Unlock()
  277. }
  278. // can't trust the blob, need encode by our-self.
  279. latestStateAccount := state.Account{
  280. Nonce: latestAccount.Nonce,
  281. Balance: latestAccount.Balance,
  282. Root: common.BytesToHash(latestAccount.Root),
  283. CodeHash: latestAccount.CodeHash,
  284. }
  285. bz, err := rlp.EncodeToBytes(&latestStateAccount)
  286. if err != nil {
  287. errChan <- err
  288. return
  289. }
  290. stateMux.Lock()
  291. err = stateTrie.TryUpdate(diffAccount[:], bz)
  292. stateMux.Unlock()
  293. if err != nil {
  294. errChan <- err
  295. return
  296. }
  297. }
  298. errChan <- nil
  299. }(start, end)
  300. }
  301. for i := 0; i < threads; i++ {
  302. err := <-errChan
  303. if err != nil {
  304. close(exitChan)
  305. return nil, nil, 0, err
  306. }
  307. }
  308. var allLogs []*types.Log
  309. var gasUsed uint64
  310. for _, receipt := range diffLayer.Receipts {
  311. allLogs = append(allLogs, receipt.Logs...)
  312. gasUsed += receipt.GasUsed
  313. }
  314. // Do validate in advance so that we can fall back to full process
  315. if err := p.bc.validator.ValidateState(block, statedb, diffLayer.Receipts, gasUsed, false); err != nil {
  316. log.Error("validate state failed during diff sync", "error", err)
  317. return nil, nil, 0, err
  318. }
  319. // remove redundant storage change
  320. for account := range snapStorage {
  321. if _, exist := snapAccounts[account]; !exist {
  322. log.Warn("receive redundant storage change in diff layer")
  323. delete(snapStorage, account)
  324. }
  325. }
  326. // remove redundant code
  327. if len(fullDiffCode) != len(diffLayer.Codes) {
  328. diffLayer.Codes = make([]types.DiffCode, 0, len(diffCode))
  329. for hash, code := range diffCode {
  330. diffLayer.Codes = append(diffLayer.Codes, types.DiffCode{
  331. Hash: hash,
  332. Code: code,
  333. })
  334. }
  335. }
  336. statedb.SetSnapData(snapDestructs, snapAccounts, snapStorage)
  337. if len(snapAccounts) != len(diffLayer.Accounts) || len(snapStorage) != len(diffLayer.Storages) {
  338. diffLayer.Destructs, diffLayer.Accounts, diffLayer.Storages = statedb.SnapToDiffLayer()
  339. }
  340. statedb.SetDiff(diffLayer, diffTries, diffCode)
  341. return diffLayer.Receipts, allLogs, gasUsed, nil
  342. }
  343. // Process processes the state changes according to the Ethereum rules by running
  344. // the transaction messages using the statedb and applying any rewards to both
  345. // the processor (coinbase) and any included uncles.
  346. //
  347. // Process returns the receipts and logs accumulated during the process and
  348. // returns the amount of gas that was used in the process. If any of the
  349. // transactions failed to execute due to insufficient gas it will return an error.
  350. func (p *StateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg vm.Config) (*state.StateDB, types.Receipts, []*types.Log, uint64, error) {
  351. var (
  352. usedGas = new(uint64)
  353. header = block.Header()
  354. allLogs []*types.Log
  355. gp = new(GasPool).AddGas(block.GasLimit())
  356. )
  357. signer := types.MakeSigner(p.bc.chainConfig, block.Number())
  358. statedb.TryPreload(block, signer)
  359. var receipts = make([]*types.Receipt, 0)
  360. // Mutate the block and state according to any hard-fork specs
  361. if p.config.DAOForkSupport && p.config.DAOForkBlock != nil && p.config.DAOForkBlock.Cmp(block.Number()) == 0 {
  362. misc.ApplyDAOHardFork(statedb)
  363. }
  364. // Handle upgrade build-in system contract code
  365. systemcontracts.UpgradeBuildInSystemContract(p.config, block.Number(), statedb)
  366. blockContext := NewEVMBlockContext(header, p.bc, nil)
  367. vmenv := vm.NewEVM(blockContext, vm.TxContext{}, statedb, p.config, cfg)
  368. txNum := len(block.Transactions())
  369. // Iterate over and process the individual transactions
  370. posa, isPoSA := p.engine.(consensus.PoSA)
  371. commonTxs := make([]*types.Transaction, 0, txNum)
  372. // initilise bloom processors
  373. bloomProcessors := NewAsyncReceiptBloomGenerator(txNum)
  374. statedb.MarkFullProcessed()
  375. // usually do have two tx, one for validator set contract, another for system reward contract.
  376. systemTxs := make([]*types.Transaction, 0, 2)
  377. for _, tx := range block.Transactions() {
  378. if isPoSA {
  379. if isSystemTx, err := posa.IsSystemTransaction(tx, block.Header()); err != nil {
  380. return statedb, nil, nil, 0, err
  381. } else if isSystemTx {
  382. systemTxs = append(systemTxs, tx)
  383. continue
  384. }
  385. }
  386. }
  387. err := p.engine.BeforeValidateTx(p.bc, header, statedb, &commonTxs, block.Uncles(), &receipts, &systemTxs, usedGas)
  388. if err != nil {
  389. return statedb, receipts, allLogs, *usedGas, err
  390. }
  391. for i, tx := range block.Transactions() {
  392. if isPoSA {
  393. if isSystemTx, _ := posa.IsSystemTransaction(tx, block.Header()); isSystemTx {
  394. continue
  395. }
  396. }
  397. msg, err := tx.AsMessage(signer)
  398. if err != nil {
  399. return statedb, nil, nil, 0, err
  400. }
  401. statedb.Prepare(tx.Hash(), block.Hash(), i)
  402. receipt, err := applyTransaction(msg, p.config, p.bc, nil, gp, statedb, header, tx, usedGas, vmenv, bloomProcessors)
  403. if err != nil {
  404. return statedb, nil, nil, 0, fmt.Errorf("could not apply tx %d [%v]: %w", i, tx.Hash().Hex(), err)
  405. }
  406. commonTxs = append(commonTxs, tx)
  407. receipts = append(receipts, receipt)
  408. }
  409. bloomProcessors.Close()
  410. // Finalize the block, applying any consensus engine specific extras (e.g. block rewards)
  411. err = p.engine.Finalize(p.bc, header, statedb, &commonTxs, block.Uncles(), &receipts, &systemTxs, usedGas)
  412. if err != nil {
  413. return statedb, receipts, allLogs, *usedGas, err
  414. }
  415. for _, receipt := range receipts {
  416. allLogs = append(allLogs, receipt.Logs...)
  417. }
  418. return statedb, receipts, allLogs, *usedGas, nil
  419. }
  420. func applyTransaction(msg types.Message, config *params.ChainConfig, bc ChainContext, author *common.Address, gp *GasPool, statedb *state.StateDB, header *types.Header, tx *types.Transaction, usedGas *uint64, evm *vm.EVM, receiptProcessors ...ReceiptProcessor) (*types.Receipt, error) {
  421. // Create a new context to be used in the EVM environment.
  422. txContext := NewEVMTxContext(msg)
  423. evm.Reset(txContext, statedb)
  424. // Apply the transaction to the current state (included in the env).
  425. result, err := ApplyMessage(evm, msg, gp)
  426. if err != nil {
  427. return nil, err
  428. }
  429. // Update the state with pending changes.
  430. var root []byte
  431. if config.IsByzantium(header.Number) {
  432. statedb.Finalise(true)
  433. } else {
  434. root = statedb.IntermediateRoot(config.IsEIP158(header.Number)).Bytes()
  435. }
  436. *usedGas += result.UsedGas
  437. // Create a new receipt for the transaction, storing the intermediate root and gas used
  438. // by the tx.
  439. receipt := &types.Receipt{Type: tx.Type(), PostState: root, CumulativeGasUsed: *usedGas}
  440. if result.Failed() {
  441. receipt.Status = types.ReceiptStatusFailed
  442. } else {
  443. receipt.Status = types.ReceiptStatusSuccessful
  444. }
  445. receipt.TxHash = tx.Hash()
  446. receipt.GasUsed = result.UsedGas
  447. // If the transaction created a contract, store the creation address in the receipt.
  448. if msg.To() == nil {
  449. receipt.ContractAddress = crypto.CreateAddress(evm.TxContext.Origin, tx.Nonce())
  450. }
  451. // Set the receipt logs and create the bloom filter.
  452. receipt.Logs = statedb.GetLogs(tx.Hash())
  453. receipt.BlockHash = statedb.BlockHash()
  454. receipt.BlockNumber = header.Number
  455. receipt.TransactionIndex = uint(statedb.TxIndex())
  456. for _, receiptProcessor := range receiptProcessors {
  457. receiptProcessor.Apply(receipt)
  458. }
  459. return receipt, err
  460. }
  461. // ApplyTransaction attempts to apply a transaction to the given state database
  462. // and uses the input parameters for its environment. It returns the receipt
  463. // for the transaction, gas used and an error if the transaction failed,
  464. // indicating the block was invalid.
  465. func ApplyTransaction(config *params.ChainConfig, bc ChainContext, author *common.Address, gp *GasPool, statedb *state.StateDB, header *types.Header, tx *types.Transaction, usedGas *uint64, cfg vm.Config, receiptProcessors ...ReceiptProcessor) (*types.Receipt, error) {
  466. msg, err := tx.AsMessage(types.MakeSigner(config, header.Number))
  467. if err != nil {
  468. return nil, err
  469. }
  470. // Create a new context to be used in the EVM environment
  471. blockContext := NewEVMBlockContext(header, bc, author)
  472. vmenv := vm.NewEVM(blockContext, vm.TxContext{}, statedb, config, cfg)
  473. defer func() {
  474. ite := vmenv.Interpreter()
  475. vm.EVMInterpreterPool.Put(ite)
  476. vm.EvmPool.Put(vmenv)
  477. }()
  478. return applyTransaction(msg, config, bc, author, gp, statedb, header, tx, usedGas, vmenv, receiptProcessors...)
  479. }