state_processor.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504
  1. // Copyright 2015 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package core
  17. import (
  18. "bytes"
  19. "errors"
  20. "fmt"
  21. "math/big"
  22. "math/rand"
  23. "sync"
  24. "time"
  25. "github.com/ethereum/go-ethereum/common"
  26. "github.com/ethereum/go-ethereum/common/gopool"
  27. "github.com/ethereum/go-ethereum/consensus"
  28. "github.com/ethereum/go-ethereum/consensus/misc"
  29. "github.com/ethereum/go-ethereum/core/rawdb"
  30. "github.com/ethereum/go-ethereum/core/state"
  31. "github.com/ethereum/go-ethereum/core/state/snapshot"
  32. "github.com/ethereum/go-ethereum/core/systemcontracts"
  33. "github.com/ethereum/go-ethereum/core/types"
  34. "github.com/ethereum/go-ethereum/core/vm"
  35. "github.com/ethereum/go-ethereum/crypto"
  36. "github.com/ethereum/go-ethereum/log"
  37. "github.com/ethereum/go-ethereum/params"
  38. "github.com/ethereum/go-ethereum/rlp"
  39. )
  40. const (
  41. fullProcessCheck = 21 // On diff sync mode, will do full process every fullProcessCheck randomly
  42. recentTime = 2048 * 3
  43. recentDiffLayerTimeout = 20
  44. farDiffLayerTimeout = 2
  45. )
  46. // StateProcessor is a basic Processor, which takes care of transitioning
  47. // state from one point to another.
  48. //
  49. // StateProcessor implements Processor.
  50. type StateProcessor struct {
  51. config *params.ChainConfig // Chain configuration options
  52. bc *BlockChain // Canonical block chain
  53. engine consensus.Engine // Consensus engine used for block rewards
  54. }
  55. // NewStateProcessor initialises a new StateProcessor.
  56. func NewStateProcessor(config *params.ChainConfig, bc *BlockChain, engine consensus.Engine) *StateProcessor {
  57. return &StateProcessor{
  58. config: config,
  59. bc: bc,
  60. engine: engine,
  61. }
  62. }
  63. type LightStateProcessor struct {
  64. randomGenerator *rand.Rand
  65. StateProcessor
  66. }
  67. func NewLightStateProcessor(config *params.ChainConfig, bc *BlockChain, engine consensus.Engine) *LightStateProcessor {
  68. randomGenerator := rand.New(rand.NewSource(int64(time.Now().Nanosecond())))
  69. return &LightStateProcessor{
  70. randomGenerator: randomGenerator,
  71. StateProcessor: *NewStateProcessor(config, bc, engine),
  72. }
  73. }
  74. func (p *LightStateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg vm.Config) (*state.StateDB, types.Receipts, []*types.Log, uint64, error) {
  75. allowLightProcess := true
  76. if posa, ok := p.engine.(consensus.PoSA); ok {
  77. allowLightProcess = posa.AllowLightProcess(p.bc, block.Header())
  78. }
  79. // random fallback to full process
  80. if check := p.randomGenerator.Int63n(fullProcessCheck); allowLightProcess && check != 0 && len(block.Transactions()) != 0 {
  81. var pid string
  82. if peer, ok := block.ReceivedFrom.(PeerIDer); ok {
  83. pid = peer.ID()
  84. }
  85. var diffLayer *types.DiffLayer
  86. var diffLayerTimeout = recentDiffLayerTimeout
  87. if time.Now().Unix()-int64(block.Time()) > recentTime {
  88. diffLayerTimeout = farDiffLayerTimeout
  89. }
  90. for tried := 0; tried < diffLayerTimeout; tried++ {
  91. // wait a bit for the diff layer
  92. diffLayer = p.bc.GetUnTrustedDiffLayer(block.Hash(), pid)
  93. if diffLayer != nil {
  94. break
  95. }
  96. time.Sleep(time.Millisecond)
  97. }
  98. if diffLayer != nil {
  99. if err := diffLayer.Receipts.DeriveFields(p.bc.chainConfig, block.Hash(), block.NumberU64(), block.Transactions()); err != nil {
  100. log.Error("Failed to derive block receipts fields", "hash", block.Hash(), "number", block.NumberU64(), "err", err)
  101. // fallback to full process
  102. return p.StateProcessor.Process(block, statedb, cfg)
  103. }
  104. receipts, logs, gasUsed, err := p.LightProcess(diffLayer, block, statedb)
  105. if err == nil {
  106. log.Info("do light process success at block", "num", block.NumberU64())
  107. return statedb, receipts, logs, gasUsed, nil
  108. }
  109. log.Error("do light process err at block", "num", block.NumberU64(), "err", err)
  110. p.bc.removeDiffLayers(diffLayer.DiffHash)
  111. // prepare new statedb
  112. statedb.StopPrefetcher()
  113. parent := p.bc.GetHeader(block.ParentHash(), block.NumberU64()-1)
  114. statedb, err = state.New(parent.Root, p.bc.stateCache, p.bc.snaps)
  115. if err != nil {
  116. return statedb, nil, nil, 0, err
  117. }
  118. // Enable prefetching to pull in trie node paths while processing transactions
  119. statedb.StartPrefetcher("chain")
  120. }
  121. }
  122. // fallback to full process
  123. return p.StateProcessor.Process(block, statedb, cfg)
  124. }
  125. func (p *LightStateProcessor) LightProcess(diffLayer *types.DiffLayer, block *types.Block, statedb *state.StateDB) (types.Receipts, []*types.Log, uint64, error) {
  126. statedb.MarkLightProcessed()
  127. fullDiffCode := make(map[common.Hash][]byte, len(diffLayer.Codes))
  128. diffTries := make(map[common.Address]state.Trie)
  129. diffCode := make(map[common.Hash][]byte)
  130. snapDestructs, snapAccounts, snapStorage, err := statedb.DiffLayerToSnap(diffLayer)
  131. if err != nil {
  132. return nil, nil, 0, err
  133. }
  134. for _, c := range diffLayer.Codes {
  135. fullDiffCode[c.Hash] = c.Code
  136. }
  137. for des := range snapDestructs {
  138. statedb.Trie().TryDelete(des[:])
  139. }
  140. threads := gopool.Threads(len(snapAccounts))
  141. iteAccounts := make([]common.Address, 0, len(snapAccounts))
  142. for diffAccount := range snapAccounts {
  143. iteAccounts = append(iteAccounts, diffAccount)
  144. }
  145. errChan := make(chan error, threads)
  146. exitChan := make(chan struct{})
  147. var snapMux sync.RWMutex
  148. var stateMux, diffMux sync.Mutex
  149. for i := 0; i < threads; i++ {
  150. start := i * len(iteAccounts) / threads
  151. end := (i + 1) * len(iteAccounts) / threads
  152. if i+1 == threads {
  153. end = len(iteAccounts)
  154. }
  155. go func(start, end int) {
  156. for index := start; index < end; index++ {
  157. select {
  158. // fast fail
  159. case <-exitChan:
  160. return
  161. default:
  162. }
  163. diffAccount := iteAccounts[index]
  164. snapMux.RLock()
  165. blob := snapAccounts[diffAccount]
  166. snapMux.RUnlock()
  167. addrHash := crypto.Keccak256Hash(diffAccount[:])
  168. latestAccount, err := snapshot.FullAccount(blob)
  169. if err != nil {
  170. errChan <- err
  171. return
  172. }
  173. // fetch previous state
  174. var previousAccount state.Account
  175. stateMux.Lock()
  176. enc, err := statedb.Trie().TryGet(diffAccount[:])
  177. stateMux.Unlock()
  178. if err != nil {
  179. errChan <- err
  180. return
  181. }
  182. if len(enc) != 0 {
  183. if err := rlp.DecodeBytes(enc, &previousAccount); err != nil {
  184. errChan <- err
  185. return
  186. }
  187. }
  188. if latestAccount.Balance == nil {
  189. latestAccount.Balance = new(big.Int)
  190. }
  191. if previousAccount.Balance == nil {
  192. previousAccount.Balance = new(big.Int)
  193. }
  194. if previousAccount.Root == (common.Hash{}) {
  195. previousAccount.Root = types.EmptyRootHash
  196. }
  197. if len(previousAccount.CodeHash) == 0 {
  198. previousAccount.CodeHash = types.EmptyCodeHash
  199. }
  200. // skip no change account
  201. if previousAccount.Nonce == latestAccount.Nonce &&
  202. bytes.Equal(previousAccount.CodeHash, latestAccount.CodeHash) &&
  203. previousAccount.Balance.Cmp(latestAccount.Balance) == 0 &&
  204. previousAccount.Root == common.BytesToHash(latestAccount.Root) {
  205. // It is normal to receive redundant message since the collected message is redundant.
  206. log.Debug("receive redundant account change in diff layer", "account", diffAccount, "num", block.NumberU64())
  207. snapMux.Lock()
  208. delete(snapAccounts, diffAccount)
  209. delete(snapStorage, diffAccount)
  210. snapMux.Unlock()
  211. continue
  212. }
  213. // update code
  214. codeHash := common.BytesToHash(latestAccount.CodeHash)
  215. if !bytes.Equal(latestAccount.CodeHash, previousAccount.CodeHash) &&
  216. !bytes.Equal(latestAccount.CodeHash, types.EmptyCodeHash) {
  217. if code, exist := fullDiffCode[codeHash]; exist {
  218. if crypto.Keccak256Hash(code) != codeHash {
  219. errChan <- fmt.Errorf("code and code hash mismatch, account %s", diffAccount.String())
  220. return
  221. }
  222. diffMux.Lock()
  223. diffCode[codeHash] = code
  224. diffMux.Unlock()
  225. } else {
  226. rawCode := rawdb.ReadCode(p.bc.db, codeHash)
  227. if len(rawCode) == 0 {
  228. errChan <- fmt.Errorf("missing code, account %s", diffAccount.String())
  229. return
  230. }
  231. }
  232. }
  233. //update storage
  234. latestRoot := common.BytesToHash(latestAccount.Root)
  235. if latestRoot != previousAccount.Root && latestRoot != types.EmptyRootHash {
  236. accountTrie, err := statedb.Database().OpenStorageTrie(addrHash, previousAccount.Root)
  237. if err != nil {
  238. errChan <- err
  239. return
  240. }
  241. snapMux.RLock()
  242. storageChange, exist := snapStorage[diffAccount]
  243. snapMux.RUnlock()
  244. if !exist {
  245. errChan <- errors.New("missing storage change in difflayer")
  246. return
  247. }
  248. for k, v := range storageChange {
  249. if len(v) != 0 {
  250. accountTrie.TryUpdate([]byte(k), v)
  251. } else {
  252. accountTrie.TryDelete([]byte(k))
  253. }
  254. }
  255. // check storage root
  256. accountRootHash := accountTrie.Hash()
  257. if latestRoot != accountRootHash {
  258. errChan <- errors.New("account storage root mismatch")
  259. return
  260. }
  261. diffMux.Lock()
  262. diffTries[diffAccount] = accountTrie
  263. diffMux.Unlock()
  264. } else {
  265. snapMux.Lock()
  266. delete(snapStorage, diffAccount)
  267. snapMux.Unlock()
  268. }
  269. // can't trust the blob, need encode by our-self.
  270. latestStateAccount := state.Account{
  271. Nonce: latestAccount.Nonce,
  272. Balance: latestAccount.Balance,
  273. Root: common.BytesToHash(latestAccount.Root),
  274. CodeHash: latestAccount.CodeHash,
  275. }
  276. bz, err := rlp.EncodeToBytes(&latestStateAccount)
  277. if err != nil {
  278. errChan <- err
  279. return
  280. }
  281. stateMux.Lock()
  282. err = statedb.Trie().TryUpdate(diffAccount[:], bz)
  283. stateMux.Unlock()
  284. if err != nil {
  285. errChan <- err
  286. return
  287. }
  288. }
  289. errChan <- nil
  290. }(start, end)
  291. }
  292. for i := 0; i < threads; i++ {
  293. err := <-errChan
  294. if err != nil {
  295. close(exitChan)
  296. return nil, nil, 0, err
  297. }
  298. }
  299. var allLogs []*types.Log
  300. var gasUsed uint64
  301. for _, receipt := range diffLayer.Receipts {
  302. allLogs = append(allLogs, receipt.Logs...)
  303. gasUsed += receipt.GasUsed
  304. }
  305. // Do validate in advance so that we can fall back to full process
  306. if err := p.bc.validator.ValidateState(block, statedb, diffLayer.Receipts, gasUsed); err != nil {
  307. log.Error("validate state failed during diff sync", "error", err)
  308. return nil, nil, 0, err
  309. }
  310. // remove redundant storage change
  311. for account := range snapStorage {
  312. if _, exist := snapAccounts[account]; !exist {
  313. log.Warn("receive redundant storage change in diff layer")
  314. delete(snapStorage, account)
  315. }
  316. }
  317. // remove redundant code
  318. if len(fullDiffCode) != len(diffLayer.Codes) {
  319. diffLayer.Codes = make([]types.DiffCode, 0, len(diffCode))
  320. for hash, code := range diffCode {
  321. diffLayer.Codes = append(diffLayer.Codes, types.DiffCode{
  322. Hash: hash,
  323. Code: code,
  324. })
  325. }
  326. }
  327. statedb.SetSnapData(snapDestructs, snapAccounts, snapStorage)
  328. if len(snapAccounts) != len(diffLayer.Accounts) || len(snapStorage) != len(diffLayer.Storages) {
  329. diffLayer.Destructs, diffLayer.Accounts, diffLayer.Storages = statedb.SnapToDiffLayer()
  330. }
  331. statedb.SetDiff(diffLayer, diffTries, diffCode)
  332. return diffLayer.Receipts, allLogs, gasUsed, nil
  333. }
  334. // Process processes the state changes according to the Ethereum rules by running
  335. // the transaction messages using the statedb and applying any rewards to both
  336. // the processor (coinbase) and any included uncles.
  337. //
  338. // Process returns the receipts and logs accumulated during the process and
  339. // returns the amount of gas that was used in the process. If any of the
  340. // transactions failed to execute due to insufficient gas it will return an error.
  341. func (p *StateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg vm.Config) (*state.StateDB, types.Receipts, []*types.Log, uint64, error) {
  342. var (
  343. usedGas = new(uint64)
  344. header = block.Header()
  345. allLogs []*types.Log
  346. gp = new(GasPool).AddGas(block.GasLimit())
  347. )
  348. signer := types.MakeSigner(p.bc.chainConfig, block.Number())
  349. statedb.TryPreload(block, signer)
  350. var receipts = make([]*types.Receipt, 0)
  351. // Mutate the block and state according to any hard-fork specs
  352. if p.config.DAOForkSupport && p.config.DAOForkBlock != nil && p.config.DAOForkBlock.Cmp(block.Number()) == 0 {
  353. misc.ApplyDAOHardFork(statedb)
  354. }
  355. // Handle upgrade build-in system contract code
  356. systemcontracts.UpgradeBuildInSystemContract(p.config, block.Number(), statedb)
  357. blockContext := NewEVMBlockContext(header, p.bc, nil)
  358. vmenv := vm.NewEVM(blockContext, vm.TxContext{}, statedb, p.config, cfg)
  359. txNum := len(block.Transactions())
  360. // Iterate over and process the individual transactions
  361. posa, isPoSA := p.engine.(consensus.PoSA)
  362. commonTxs := make([]*types.Transaction, 0, txNum)
  363. // initilise bloom processors
  364. bloomProcessors := NewAsyncReceiptBloomGenerator(txNum)
  365. // usually do have two tx, one for validator set contract, another for system reward contract.
  366. systemTxs := make([]*types.Transaction, 0, 2)
  367. for i, tx := range block.Transactions() {
  368. if isPoSA {
  369. if isSystemTx, err := posa.IsSystemTransaction(tx, block.Header()); err != nil {
  370. return statedb, nil, nil, 0, err
  371. } else if isSystemTx {
  372. systemTxs = append(systemTxs, tx)
  373. continue
  374. }
  375. }
  376. msg, err := tx.AsMessage(signer)
  377. if err != nil {
  378. return statedb, nil, nil, 0, err
  379. }
  380. statedb.Prepare(tx.Hash(), block.Hash(), i)
  381. receipt, err := applyTransaction(msg, p.config, p.bc, nil, gp, statedb, header, tx, usedGas, vmenv, bloomProcessors)
  382. if err != nil {
  383. return statedb, nil, nil, 0, fmt.Errorf("could not apply tx %d [%v]: %w", i, tx.Hash().Hex(), err)
  384. }
  385. commonTxs = append(commonTxs, tx)
  386. receipts = append(receipts, receipt)
  387. }
  388. bloomProcessors.Close()
  389. // Finalize the block, applying any consensus engine specific extras (e.g. block rewards)
  390. err := p.engine.Finalize(p.bc, header, statedb, &commonTxs, block.Uncles(), &receipts, &systemTxs, usedGas)
  391. if err != nil {
  392. return statedb, receipts, allLogs, *usedGas, err
  393. }
  394. for _, receipt := range receipts {
  395. allLogs = append(allLogs, receipt.Logs...)
  396. }
  397. return statedb, receipts, allLogs, *usedGas, nil
  398. }
  399. func applyTransaction(msg types.Message, config *params.ChainConfig, bc ChainContext, author *common.Address, gp *GasPool, statedb *state.StateDB, header *types.Header, tx *types.Transaction, usedGas *uint64, evm *vm.EVM, receiptProcessors ...ReceiptProcessor) (*types.Receipt, error) {
  400. // Create a new context to be used in the EVM environment.
  401. txContext := NewEVMTxContext(msg)
  402. evm.Reset(txContext, statedb)
  403. // Apply the transaction to the current state (included in the env).
  404. result, err := ApplyMessage(evm, msg, gp)
  405. if err != nil {
  406. return nil, err
  407. }
  408. // Update the state with pending changes.
  409. var root []byte
  410. if config.IsByzantium(header.Number) {
  411. statedb.Finalise(true)
  412. } else {
  413. root = statedb.IntermediateRoot(config.IsEIP158(header.Number)).Bytes()
  414. }
  415. *usedGas += result.UsedGas
  416. // Create a new receipt for the transaction, storing the intermediate root and gas used
  417. // by the tx.
  418. receipt := &types.Receipt{Type: tx.Type(), PostState: root, CumulativeGasUsed: *usedGas}
  419. if result.Failed() {
  420. receipt.Status = types.ReceiptStatusFailed
  421. } else {
  422. receipt.Status = types.ReceiptStatusSuccessful
  423. }
  424. receipt.TxHash = tx.Hash()
  425. receipt.GasUsed = result.UsedGas
  426. // If the transaction created a contract, store the creation address in the receipt.
  427. if msg.To() == nil {
  428. receipt.ContractAddress = crypto.CreateAddress(evm.TxContext.Origin, tx.Nonce())
  429. }
  430. // Set the receipt logs and create the bloom filter.
  431. receipt.Logs = statedb.GetLogs(tx.Hash())
  432. receipt.BlockHash = statedb.BlockHash()
  433. receipt.BlockNumber = header.Number
  434. receipt.TransactionIndex = uint(statedb.TxIndex())
  435. for _, receiptProcessor := range receiptProcessors {
  436. receiptProcessor.Apply(receipt)
  437. }
  438. return receipt, err
  439. }
  440. // ApplyTransaction attempts to apply a transaction to the given state database
  441. // and uses the input parameters for its environment. It returns the receipt
  442. // for the transaction, gas used and an error if the transaction failed,
  443. // indicating the block was invalid.
  444. func ApplyTransaction(config *params.ChainConfig, bc ChainContext, author *common.Address, gp *GasPool, statedb *state.StateDB, header *types.Header, tx *types.Transaction, usedGas *uint64, cfg vm.Config, receiptProcessors ...ReceiptProcessor) (*types.Receipt, error) {
  445. msg, err := tx.AsMessage(types.MakeSigner(config, header.Number))
  446. if err != nil {
  447. return nil, err
  448. }
  449. // Create a new context to be used in the EVM environment
  450. blockContext := NewEVMBlockContext(header, bc, author)
  451. vmenv := vm.NewEVM(blockContext, vm.TxContext{}, statedb, config, cfg)
  452. defer func() {
  453. ite := vmenv.Interpreter()
  454. vm.EVMInterpreterPool.Put(ite)
  455. vm.EvmPool.Put(vmenv)
  456. }()
  457. return applyTransaction(msg, config, bc, author, gp, statedb, header, tx, usedGas, vmenv, receiptProcessors...)
  458. }