worker.go 41 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242
  1. // Copyright 2015 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package miner
  17. import (
  18. "errors"
  19. "fmt"
  20. "math/big"
  21. "sync"
  22. "sync/atomic"
  23. "time"
  24. mapset "github.com/deckarep/golang-set"
  25. "github.com/ethereum/go-ethereum/common"
  26. "github.com/ethereum/go-ethereum/consensus"
  27. "github.com/ethereum/go-ethereum/consensus/misc"
  28. "github.com/ethereum/go-ethereum/core"
  29. "github.com/ethereum/go-ethereum/core/state"
  30. "github.com/ethereum/go-ethereum/core/types"
  31. "github.com/ethereum/go-ethereum/event"
  32. "github.com/ethereum/go-ethereum/log"
  33. "github.com/ethereum/go-ethereum/params"
  34. "github.com/ethereum/go-ethereum/trie"
  35. )
  36. const (
  37. // resultQueueSize is the size of channel listening to sealing result.
  38. resultQueueSize = 10
  39. // txChanSize is the size of channel listening to NewTxsEvent.
  40. // The number is referenced from the size of tx pool.
  41. txChanSize = 4096
  42. // chainHeadChanSize is the size of channel listening to ChainHeadEvent.
  43. chainHeadChanSize = 10
  44. // chainSideChanSize is the size of channel listening to ChainSideEvent.
  45. chainSideChanSize = 10
  46. // resubmitAdjustChanSize is the size of resubmitting interval adjustment channel.
  47. resubmitAdjustChanSize = 10
  48. // sealingLogAtDepth is the number of confirmations before logging successful sealing.
  49. sealingLogAtDepth = 7
  50. // minRecommitInterval is the minimal time interval to recreate the sealing block with
  51. // any newly arrived transactions.
  52. minRecommitInterval = 1 * time.Second
  53. // maxRecommitInterval is the maximum time interval to recreate the sealing block with
  54. // any newly arrived transactions.
  55. maxRecommitInterval = 15 * time.Second
  56. // intervalAdjustRatio is the impact a single interval adjustment has on sealing work
  57. // resubmitting interval.
  58. intervalAdjustRatio = 0.1
  59. // intervalAdjustBias is applied during the new resubmit interval calculation in favor of
  60. // increasing upper limit or decreasing lower limit so that the limit can be reachable.
  61. intervalAdjustBias = 200 * 1000.0 * 1000.0
  62. // staleThreshold is the maximum depth of the acceptable stale block.
  63. staleThreshold = 7
  64. )
  65. var (
  66. errBlockInterruptedByNewHead = errors.New("new head arrived while building block")
  67. errBlockInterruptedByRecommit = errors.New("recommit interrupt while building block")
  68. )
  69. // environment is the worker's current environment and holds all
  70. // information of the sealing block generation.
  71. type environment struct {
  72. signer types.Signer
  73. state *state.StateDB // apply state changes here
  74. ancestors mapset.Set // ancestor set (used for checking uncle parent validity)
  75. family mapset.Set // family set (used for checking uncle invalidity)
  76. tcount int // tx count in cycle
  77. gasPool *core.GasPool // available gas used to pack transactions
  78. coinbase common.Address
  79. header *types.Header
  80. txs []*types.Transaction
  81. receipts []*types.Receipt
  82. uncles map[common.Hash]*types.Header
  83. }
  84. // copy creates a deep copy of environment.
  85. func (env *environment) copy() *environment {
  86. cpy := &environment{
  87. signer: env.signer,
  88. state: env.state.Copy(),
  89. ancestors: env.ancestors.Clone(),
  90. family: env.family.Clone(),
  91. tcount: env.tcount,
  92. coinbase: env.coinbase,
  93. header: types.CopyHeader(env.header),
  94. receipts: copyReceipts(env.receipts),
  95. }
  96. if env.gasPool != nil {
  97. gasPool := *env.gasPool
  98. cpy.gasPool = &gasPool
  99. }
  100. // The content of txs and uncles are immutable, unnecessary
  101. // to do the expensive deep copy for them.
  102. cpy.txs = make([]*types.Transaction, len(env.txs))
  103. copy(cpy.txs, env.txs)
  104. cpy.uncles = make(map[common.Hash]*types.Header)
  105. for hash, uncle := range env.uncles {
  106. cpy.uncles[hash] = uncle
  107. }
  108. return cpy
  109. }
  110. // unclelist returns the contained uncles as the list format.
  111. func (env *environment) unclelist() []*types.Header {
  112. var uncles []*types.Header
  113. for _, uncle := range env.uncles {
  114. uncles = append(uncles, uncle)
  115. }
  116. return uncles
  117. }
  118. // discard terminates the background prefetcher go-routine. It should
  119. // always be called for all created environment instances otherwise
  120. // the go-routine leak can happen.
  121. func (env *environment) discard() {
  122. if env.state == nil {
  123. return
  124. }
  125. env.state.StopPrefetcher()
  126. }
  127. // task contains all information for consensus engine sealing and result submitting.
  128. type task struct {
  129. receipts []*types.Receipt
  130. state *state.StateDB
  131. block *types.Block
  132. createdAt time.Time
  133. }
  134. const (
  135. commitInterruptNone int32 = iota
  136. commitInterruptNewHead
  137. commitInterruptResubmit
  138. )
  139. // newWorkReq represents a request for new sealing work submitting with relative interrupt notifier.
  140. type newWorkReq struct {
  141. interrupt *int32
  142. noempty bool
  143. timestamp int64
  144. }
  145. // getWorkReq represents a request for getting a new sealing work with provided parameters.
  146. type getWorkReq struct {
  147. params *generateParams
  148. result chan *types.Block // non-blocking channel
  149. err chan error
  150. }
  151. // intervalAdjust represents a resubmitting interval adjustment.
  152. type intervalAdjust struct {
  153. ratio float64
  154. inc bool
  155. }
  156. // worker is the main object which takes care of submitting new work to consensus engine
  157. // and gathering the sealing result.
  158. type worker struct {
  159. config *Config
  160. chainConfig *params.ChainConfig
  161. engine consensus.Engine
  162. eth Backend
  163. chain *core.BlockChain
  164. // Feeds
  165. pendingLogsFeed event.Feed
  166. // Subscriptions
  167. mux *event.TypeMux
  168. txsCh chan core.NewTxsEvent
  169. txsSub event.Subscription
  170. chainHeadCh chan core.ChainHeadEvent
  171. chainHeadSub event.Subscription
  172. chainSideCh chan core.ChainSideEvent
  173. chainSideSub event.Subscription
  174. // Channels
  175. newWorkCh chan *newWorkReq
  176. getWorkCh chan *getWorkReq
  177. taskCh chan *task
  178. resultCh chan *types.Block
  179. startCh chan struct{}
  180. exitCh chan struct{}
  181. resubmitIntervalCh chan time.Duration
  182. resubmitAdjustCh chan *intervalAdjust
  183. wg sync.WaitGroup
  184. current *environment // An environment for current running cycle.
  185. localUncles map[common.Hash]*types.Block // A set of side blocks generated locally as the possible uncle blocks.
  186. remoteUncles map[common.Hash]*types.Block // A set of side blocks as the possible uncle blocks.
  187. unconfirmed *unconfirmedBlocks // A set of locally mined blocks pending canonicalness confirmations.
  188. mu sync.RWMutex // The lock used to protect the coinbase and extra fields
  189. coinbase common.Address
  190. extra []byte
  191. pendingMu sync.RWMutex
  192. pendingTasks map[common.Hash]*task
  193. snapshotMu sync.RWMutex // The lock used to protect the snapshots below
  194. snapshotBlock *types.Block
  195. snapshotReceipts types.Receipts
  196. snapshotState *state.StateDB
  197. // atomic status counters
  198. running int32 // The indicator whether the consensus engine is running or not.
  199. newTxs int32 // New arrival transaction count since last sealing work submitting.
  200. // noempty is the flag used to control whether the feature of pre-seal empty
  201. // block is enabled. The default value is false(pre-seal is enabled by default).
  202. // But in some special scenario the consensus engine will seal blocks instantaneously,
  203. // in this case this feature will add all empty blocks into canonical chain
  204. // non-stop and no real transaction will be included.
  205. noempty uint32
  206. // External functions
  207. isLocalBlock func(header *types.Header) bool // Function used to determine whether the specified block is mined by local miner.
  208. // Test hooks
  209. newTaskHook func(*task) // Method to call upon receiving a new sealing task.
  210. skipSealHook func(*task) bool // Method to decide whether skipping the sealing.
  211. fullTaskHook func() // Method to call before pushing the full sealing task.
  212. resubmitHook func(time.Duration, time.Duration) // Method to call upon updating resubmitting interval.
  213. }
  214. func newWorker(config *Config, chainConfig *params.ChainConfig, engine consensus.Engine, eth Backend, mux *event.TypeMux, isLocalBlock func(header *types.Header) bool, init bool) *worker {
  215. worker := &worker{
  216. config: config,
  217. chainConfig: chainConfig,
  218. engine: engine,
  219. eth: eth,
  220. mux: mux,
  221. chain: eth.BlockChain(),
  222. isLocalBlock: isLocalBlock,
  223. localUncles: make(map[common.Hash]*types.Block),
  224. remoteUncles: make(map[common.Hash]*types.Block),
  225. unconfirmed: newUnconfirmedBlocks(eth.BlockChain(), sealingLogAtDepth),
  226. pendingTasks: make(map[common.Hash]*task),
  227. txsCh: make(chan core.NewTxsEvent, txChanSize),
  228. chainHeadCh: make(chan core.ChainHeadEvent, chainHeadChanSize),
  229. chainSideCh: make(chan core.ChainSideEvent, chainSideChanSize),
  230. newWorkCh: make(chan *newWorkReq),
  231. getWorkCh: make(chan *getWorkReq),
  232. taskCh: make(chan *task),
  233. resultCh: make(chan *types.Block, resultQueueSize),
  234. exitCh: make(chan struct{}),
  235. startCh: make(chan struct{}, 1),
  236. resubmitIntervalCh: make(chan time.Duration),
  237. resubmitAdjustCh: make(chan *intervalAdjust, resubmitAdjustChanSize),
  238. }
  239. // Subscribe NewTxsEvent for tx pool
  240. worker.txsSub = eth.TxPool().SubscribeNewTxsEvent(worker.txsCh)
  241. // Subscribe events for blockchain
  242. worker.chainHeadSub = eth.BlockChain().SubscribeChainHeadEvent(worker.chainHeadCh)
  243. worker.chainSideSub = eth.BlockChain().SubscribeChainSideEvent(worker.chainSideCh)
  244. // Sanitize recommit interval if the user-specified one is too short.
  245. recommit := worker.config.Recommit
  246. if recommit < minRecommitInterval {
  247. log.Warn("Sanitizing miner recommit interval", "provided", recommit, "updated", minRecommitInterval)
  248. recommit = minRecommitInterval
  249. }
  250. worker.wg.Add(4)
  251. go worker.mainLoop()
  252. go worker.newWorkLoop(recommit)
  253. go worker.resultLoop()
  254. go worker.taskLoop()
  255. // Submit first work to initialize pending state.
  256. if init {
  257. worker.startCh <- struct{}{}
  258. }
  259. return worker
  260. }
  261. // setEtherbase sets the etherbase used to initialize the block coinbase field.
  262. func (w *worker) setEtherbase(addr common.Address) {
  263. w.mu.Lock()
  264. defer w.mu.Unlock()
  265. w.coinbase = addr
  266. }
  267. func (w *worker) setGasCeil(ceil uint64) {
  268. w.mu.Lock()
  269. defer w.mu.Unlock()
  270. w.config.GasCeil = ceil
  271. }
  272. // setExtra sets the content used to initialize the block extra field.
  273. func (w *worker) setExtra(extra []byte) {
  274. w.mu.Lock()
  275. defer w.mu.Unlock()
  276. w.extra = extra
  277. }
  278. // setRecommitInterval updates the interval for miner sealing work recommitting.
  279. func (w *worker) setRecommitInterval(interval time.Duration) {
  280. select {
  281. case w.resubmitIntervalCh <- interval:
  282. case <-w.exitCh:
  283. }
  284. }
  285. // disablePreseal disables pre-sealing feature
  286. func (w *worker) disablePreseal() {
  287. atomic.StoreUint32(&w.noempty, 1)
  288. }
  289. // enablePreseal enables pre-sealing feature
  290. func (w *worker) enablePreseal() {
  291. atomic.StoreUint32(&w.noempty, 0)
  292. }
  293. // pending returns the pending state and corresponding block.
  294. func (w *worker) pending() (*types.Block, *state.StateDB) {
  295. // return a snapshot to avoid contention on currentMu mutex
  296. w.snapshotMu.RLock()
  297. defer w.snapshotMu.RUnlock()
  298. if w.snapshotState == nil {
  299. return nil, nil
  300. }
  301. return w.snapshotBlock, w.snapshotState.Copy()
  302. }
  303. // pendingBlock returns pending block.
  304. func (w *worker) pendingBlock() *types.Block {
  305. // return a snapshot to avoid contention on currentMu mutex
  306. w.snapshotMu.RLock()
  307. defer w.snapshotMu.RUnlock()
  308. return w.snapshotBlock
  309. }
  310. // pendingBlockAndReceipts returns pending block and corresponding receipts.
  311. func (w *worker) pendingBlockAndReceipts() (*types.Block, types.Receipts) {
  312. // return a snapshot to avoid contention on currentMu mutex
  313. w.snapshotMu.RLock()
  314. defer w.snapshotMu.RUnlock()
  315. return w.snapshotBlock, w.snapshotReceipts
  316. }
  317. // start sets the running status as 1 and triggers new work submitting.
  318. func (w *worker) start() {
  319. atomic.StoreInt32(&w.running, 1)
  320. w.startCh <- struct{}{}
  321. }
  322. // stop sets the running status as 0.
  323. func (w *worker) stop() {
  324. atomic.StoreInt32(&w.running, 0)
  325. }
  326. // isRunning returns an indicator whether worker is running or not.
  327. func (w *worker) isRunning() bool {
  328. return atomic.LoadInt32(&w.running) == 1
  329. }
  330. // close terminates all background threads maintained by the worker.
  331. // Note the worker does not support being closed multiple times.
  332. func (w *worker) close() {
  333. atomic.StoreInt32(&w.running, 0)
  334. close(w.exitCh)
  335. w.wg.Wait()
  336. }
  337. // recalcRecommit recalculates the resubmitting interval upon feedback.
  338. func recalcRecommit(minRecommit, prev time.Duration, target float64, inc bool) time.Duration {
  339. var (
  340. prevF = float64(prev.Nanoseconds())
  341. next float64
  342. )
  343. if inc {
  344. next = prevF*(1-intervalAdjustRatio) + intervalAdjustRatio*(target+intervalAdjustBias)
  345. max := float64(maxRecommitInterval.Nanoseconds())
  346. if next > max {
  347. next = max
  348. }
  349. } else {
  350. next = prevF*(1-intervalAdjustRatio) + intervalAdjustRatio*(target-intervalAdjustBias)
  351. min := float64(minRecommit.Nanoseconds())
  352. if next < min {
  353. next = min
  354. }
  355. }
  356. return time.Duration(int64(next))
  357. }
  358. // newWorkLoop is a standalone goroutine to submit new sealing work upon received events.
  359. func (w *worker) newWorkLoop(recommit time.Duration) {
  360. defer w.wg.Done()
  361. var (
  362. interrupt *int32
  363. minRecommit = recommit // minimal resubmit interval specified by user.
  364. timestamp int64 // timestamp for each round of sealing.
  365. )
  366. timer := time.NewTimer(0)
  367. defer timer.Stop()
  368. <-timer.C // discard the initial tick
  369. // commit aborts in-flight transaction execution with given signal and resubmits a new one.
  370. commit := func(noempty bool, s int32) {
  371. if interrupt != nil {
  372. atomic.StoreInt32(interrupt, s)
  373. }
  374. interrupt = new(int32)
  375. select {
  376. case w.newWorkCh <- &newWorkReq{interrupt: interrupt, noempty: noempty, timestamp: timestamp}:
  377. case <-w.exitCh:
  378. return
  379. }
  380. timer.Reset(recommit)
  381. atomic.StoreInt32(&w.newTxs, 0)
  382. }
  383. // clearPending cleans the stale pending tasks.
  384. clearPending := func(number uint64) {
  385. w.pendingMu.Lock()
  386. for h, t := range w.pendingTasks {
  387. if t.block.NumberU64()+staleThreshold <= number {
  388. delete(w.pendingTasks, h)
  389. }
  390. }
  391. w.pendingMu.Unlock()
  392. }
  393. for {
  394. select {
  395. case <-w.startCh:
  396. clearPending(w.chain.CurrentBlock().NumberU64())
  397. timestamp = time.Now().Unix()
  398. commit(false, commitInterruptNewHead)
  399. case head := <-w.chainHeadCh:
  400. clearPending(head.Block.NumberU64())
  401. timestamp = time.Now().Unix()
  402. commit(false, commitInterruptNewHead)
  403. case <-timer.C:
  404. // If sealing is running resubmit a new work cycle periodically to pull in
  405. // higher priced transactions. Disable this overhead for pending blocks.
  406. if w.isRunning() && (w.chainConfig.Clique == nil || w.chainConfig.Clique.Period > 0) {
  407. // Short circuit if no new transaction arrives.
  408. if atomic.LoadInt32(&w.newTxs) == 0 {
  409. timer.Reset(recommit)
  410. continue
  411. }
  412. commit(true, commitInterruptResubmit)
  413. }
  414. case interval := <-w.resubmitIntervalCh:
  415. // Adjust resubmit interval explicitly by user.
  416. if interval < minRecommitInterval {
  417. log.Warn("Sanitizing miner recommit interval", "provided", interval, "updated", minRecommitInterval)
  418. interval = minRecommitInterval
  419. }
  420. log.Info("Miner recommit interval update", "from", minRecommit, "to", interval)
  421. minRecommit, recommit = interval, interval
  422. if w.resubmitHook != nil {
  423. w.resubmitHook(minRecommit, recommit)
  424. }
  425. case adjust := <-w.resubmitAdjustCh:
  426. // Adjust resubmit interval by feedback.
  427. if adjust.inc {
  428. before := recommit
  429. target := float64(recommit.Nanoseconds()) / adjust.ratio
  430. recommit = recalcRecommit(minRecommit, recommit, target, true)
  431. log.Trace("Increase miner recommit interval", "from", before, "to", recommit)
  432. } else {
  433. before := recommit
  434. recommit = recalcRecommit(minRecommit, recommit, float64(minRecommit.Nanoseconds()), false)
  435. log.Trace("Decrease miner recommit interval", "from", before, "to", recommit)
  436. }
  437. if w.resubmitHook != nil {
  438. w.resubmitHook(minRecommit, recommit)
  439. }
  440. case <-w.exitCh:
  441. return
  442. }
  443. }
  444. }
  445. // mainLoop is responsible for generating and submitting sealing work based on
  446. // the received event. It can support two modes: automatically generate task and
  447. // submit it or return task according to given parameters for various proposes.
  448. func (w *worker) mainLoop() {
  449. defer w.wg.Done()
  450. defer w.txsSub.Unsubscribe()
  451. defer w.chainHeadSub.Unsubscribe()
  452. defer w.chainSideSub.Unsubscribe()
  453. defer func() {
  454. if w.current != nil {
  455. w.current.discard()
  456. }
  457. }()
  458. cleanTicker := time.NewTicker(time.Second * 10)
  459. defer cleanTicker.Stop()
  460. for {
  461. select {
  462. case req := <-w.newWorkCh:
  463. w.commitWork(req.interrupt, req.noempty, req.timestamp)
  464. case req := <-w.getWorkCh:
  465. block, err := w.generateWork(req.params)
  466. if err != nil {
  467. req.err <- err
  468. req.result <- nil
  469. } else {
  470. req.err <- nil
  471. req.result <- block
  472. }
  473. case ev := <-w.chainSideCh:
  474. // Short circuit for duplicate side blocks
  475. if _, exist := w.localUncles[ev.Block.Hash()]; exist {
  476. continue
  477. }
  478. if _, exist := w.remoteUncles[ev.Block.Hash()]; exist {
  479. continue
  480. }
  481. // Add side block to possible uncle block set depending on the author.
  482. if w.isLocalBlock != nil && w.isLocalBlock(ev.Block.Header()) {
  483. w.localUncles[ev.Block.Hash()] = ev.Block
  484. } else {
  485. w.remoteUncles[ev.Block.Hash()] = ev.Block
  486. }
  487. // If our sealing block contains less than 2 uncle blocks,
  488. // add the new uncle block if valid and regenerate a new
  489. // sealing block for higher profit.
  490. if w.isRunning() && w.current != nil && len(w.current.uncles) < 2 {
  491. start := time.Now()
  492. if err := w.commitUncle(w.current, ev.Block.Header()); err == nil {
  493. w.commit(w.current.copy(), nil, true, start)
  494. }
  495. }
  496. case <-cleanTicker.C:
  497. chainHead := w.chain.CurrentBlock()
  498. for hash, uncle := range w.localUncles {
  499. if uncle.NumberU64()+staleThreshold <= chainHead.NumberU64() {
  500. delete(w.localUncles, hash)
  501. }
  502. }
  503. for hash, uncle := range w.remoteUncles {
  504. if uncle.NumberU64()+staleThreshold <= chainHead.NumberU64() {
  505. delete(w.remoteUncles, hash)
  506. }
  507. }
  508. case ev := <-w.txsCh:
  509. // Apply transactions to the pending state if we're not sealing
  510. //
  511. // Note all transactions received may not be continuous with transactions
  512. // already included in the current sealing block. These transactions will
  513. // be automatically eliminated.
  514. if !w.isRunning() && w.current != nil {
  515. // If block is already full, abort
  516. if gp := w.current.gasPool; gp != nil && gp.Gas() < params.TxGas {
  517. continue
  518. }
  519. txs := make(map[common.Address]types.Transactions)
  520. for _, tx := range ev.Txs {
  521. acc, _ := types.Sender(w.current.signer, tx)
  522. txs[acc] = append(txs[acc], tx)
  523. }
  524. txset := types.NewTransactionsByPriceAndNonce(w.current.signer, txs, w.current.header.BaseFee)
  525. tcount := w.current.tcount
  526. w.commitTransactions(w.current, txset, nil)
  527. // Only update the snapshot if any new transactions were added
  528. // to the pending block
  529. if tcount != w.current.tcount {
  530. w.updateSnapshot(w.current)
  531. }
  532. } else {
  533. // Special case, if the consensus engine is 0 period clique(dev mode),
  534. // submit sealing work here since all empty submission will be rejected
  535. // by clique. Of course the advance sealing(empty submission) is disabled.
  536. if w.chainConfig.Clique != nil && w.chainConfig.Clique.Period == 0 {
  537. w.commitWork(nil, true, time.Now().Unix())
  538. }
  539. }
  540. atomic.AddInt32(&w.newTxs, int32(len(ev.Txs)))
  541. // System stopped
  542. case <-w.exitCh:
  543. return
  544. case <-w.txsSub.Err():
  545. return
  546. case <-w.chainHeadSub.Err():
  547. return
  548. case <-w.chainSideSub.Err():
  549. return
  550. }
  551. }
  552. }
  553. // taskLoop is a standalone goroutine to fetch sealing task from the generator and
  554. // push them to consensus engine.
  555. func (w *worker) taskLoop() {
  556. defer w.wg.Done()
  557. var (
  558. stopCh chan struct{}
  559. prev common.Hash
  560. )
  561. // interrupt aborts the in-flight sealing task.
  562. interrupt := func() {
  563. if stopCh != nil {
  564. close(stopCh)
  565. stopCh = nil
  566. }
  567. }
  568. for {
  569. select {
  570. case task := <-w.taskCh:
  571. if w.newTaskHook != nil {
  572. w.newTaskHook(task)
  573. }
  574. // Reject duplicate sealing work due to resubmitting.
  575. sealHash := w.engine.SealHash(task.block.Header())
  576. if sealHash == prev {
  577. continue
  578. }
  579. // Interrupt previous sealing operation
  580. interrupt()
  581. stopCh, prev = make(chan struct{}), sealHash
  582. if w.skipSealHook != nil && w.skipSealHook(task) {
  583. continue
  584. }
  585. w.pendingMu.Lock()
  586. w.pendingTasks[sealHash] = task
  587. w.pendingMu.Unlock()
  588. if err := w.engine.Seal(w.chain, task.block, w.resultCh, stopCh); err != nil {
  589. log.Warn("Block sealing failed", "err", err)
  590. w.pendingMu.Lock()
  591. delete(w.pendingTasks, sealHash)
  592. w.pendingMu.Unlock()
  593. }
  594. case <-w.exitCh:
  595. interrupt()
  596. return
  597. }
  598. }
  599. }
  600. // resultLoop is a standalone goroutine to handle sealing result submitting
  601. // and flush relative data to the database.
  602. func (w *worker) resultLoop() {
  603. defer w.wg.Done()
  604. for {
  605. select {
  606. case block := <-w.resultCh:
  607. // Short circuit when receiving empty result.
  608. if block == nil {
  609. continue
  610. }
  611. // Short circuit when receiving duplicate result caused by resubmitting.
  612. if w.chain.HasBlock(block.Hash(), block.NumberU64()) {
  613. continue
  614. }
  615. var (
  616. sealhash = w.engine.SealHash(block.Header())
  617. hash = block.Hash()
  618. )
  619. w.pendingMu.RLock()
  620. task, exist := w.pendingTasks[sealhash]
  621. w.pendingMu.RUnlock()
  622. if !exist {
  623. log.Error("Block found but no relative pending task", "number", block.Number(), "sealhash", sealhash, "hash", hash)
  624. continue
  625. }
  626. // Different block could share same sealhash, deep copy here to prevent write-write conflict.
  627. var (
  628. receipts = make([]*types.Receipt, len(task.receipts))
  629. logs []*types.Log
  630. )
  631. for i, taskReceipt := range task.receipts {
  632. receipt := new(types.Receipt)
  633. receipts[i] = receipt
  634. *receipt = *taskReceipt
  635. // add block location fields
  636. receipt.BlockHash = hash
  637. receipt.BlockNumber = block.Number()
  638. receipt.TransactionIndex = uint(i)
  639. // Update the block hash in all logs since it is now available and not when the
  640. // receipt/log of individual transactions were created.
  641. receipt.Logs = make([]*types.Log, len(taskReceipt.Logs))
  642. for i, taskLog := range taskReceipt.Logs {
  643. log := new(types.Log)
  644. receipt.Logs[i] = log
  645. *log = *taskLog
  646. log.BlockHash = hash
  647. }
  648. logs = append(logs, receipt.Logs...)
  649. }
  650. // Commit block and state to database.
  651. _, err := w.chain.WriteBlockAndSetHead(block, receipts, logs, task.state, true)
  652. if err != nil {
  653. log.Error("Failed writing block to chain", "err", err)
  654. continue
  655. }
  656. log.Info("Successfully sealed new block", "number", block.Number(), "sealhash", sealhash, "hash", hash,
  657. "elapsed", common.PrettyDuration(time.Since(task.createdAt)))
  658. // Broadcast the block and announce chain insertion event
  659. w.mux.Post(core.NewMinedBlockEvent{Block: block})
  660. // Insert the block into the set of pending ones to resultLoop for confirmations
  661. w.unconfirmed.Insert(block.NumberU64(), block.Hash())
  662. case <-w.exitCh:
  663. return
  664. }
  665. }
  666. }
  667. // makeEnv creates a new environment for the sealing block.
  668. func (w *worker) makeEnv(parent *types.Block, header *types.Header, coinbase common.Address) (*environment, error) {
  669. // Retrieve the parent state to execute on top and start a prefetcher for
  670. // the miner to speed block sealing up a bit.
  671. state, err := w.chain.StateAt(parent.Root())
  672. if err != nil {
  673. return nil, err
  674. }
  675. state.StartPrefetcher("miner")
  676. // Note the passed coinbase may be different with header.Coinbase.
  677. env := &environment{
  678. signer: types.MakeSigner(w.chainConfig, header.Number),
  679. state: state,
  680. coinbase: coinbase,
  681. ancestors: mapset.NewSet(),
  682. family: mapset.NewSet(),
  683. header: header,
  684. uncles: make(map[common.Hash]*types.Header),
  685. }
  686. // when 08 is processed ancestors contain 07 (quick block)
  687. for _, ancestor := range w.chain.GetBlocksFromHash(parent.Hash(), 7) {
  688. for _, uncle := range ancestor.Uncles() {
  689. env.family.Add(uncle.Hash())
  690. }
  691. env.family.Add(ancestor.Hash())
  692. env.ancestors.Add(ancestor.Hash())
  693. }
  694. // Keep track of transactions which return errors so they can be removed
  695. env.tcount = 0
  696. return env, nil
  697. }
  698. // commitUncle adds the given block to uncle block set, returns error if failed to add.
  699. func (w *worker) commitUncle(env *environment, uncle *types.Header) error {
  700. if w.isTTDReached(env.header) {
  701. return errors.New("ignore uncle for beacon block")
  702. }
  703. hash := uncle.Hash()
  704. if _, exist := env.uncles[hash]; exist {
  705. return errors.New("uncle not unique")
  706. }
  707. if env.header.ParentHash == uncle.ParentHash {
  708. return errors.New("uncle is sibling")
  709. }
  710. if !env.ancestors.Contains(uncle.ParentHash) {
  711. return errors.New("uncle's parent unknown")
  712. }
  713. if env.family.Contains(hash) {
  714. return errors.New("uncle already included")
  715. }
  716. env.uncles[hash] = uncle
  717. return nil
  718. }
  719. // updateSnapshot updates pending snapshot block, receipts and state.
  720. func (w *worker) updateSnapshot(env *environment) {
  721. w.snapshotMu.Lock()
  722. defer w.snapshotMu.Unlock()
  723. w.snapshotBlock = types.NewBlock(
  724. env.header,
  725. env.txs,
  726. env.unclelist(),
  727. env.receipts,
  728. trie.NewStackTrie(nil),
  729. )
  730. w.snapshotReceipts = copyReceipts(env.receipts)
  731. w.snapshotState = env.state.Copy()
  732. }
  733. func (w *worker) commitTransaction(env *environment, tx *types.Transaction) ([]*types.Log, error) {
  734. snap := env.state.Snapshot()
  735. receipt, err := core.ApplyTransaction(w.chainConfig, w.chain, &env.coinbase, env.gasPool, env.state, env.header, tx, &env.header.GasUsed, *w.chain.GetVMConfig())
  736. if err != nil {
  737. env.state.RevertToSnapshot(snap)
  738. return nil, err
  739. }
  740. env.txs = append(env.txs, tx)
  741. env.receipts = append(env.receipts, receipt)
  742. return receipt.Logs, nil
  743. }
  744. func (w *worker) commitTransactions(env *environment, txs *types.TransactionsByPriceAndNonce, interrupt *int32) error {
  745. gasLimit := env.header.GasLimit
  746. if env.gasPool == nil {
  747. env.gasPool = new(core.GasPool).AddGas(gasLimit)
  748. }
  749. var coalescedLogs []*types.Log
  750. for {
  751. // In the following three cases, we will interrupt the execution of the transaction.
  752. // (1) new head block event arrival, the interrupt signal is 1
  753. // (2) worker start or restart, the interrupt signal is 1
  754. // (3) worker recreate the sealing block with any newly arrived transactions, the interrupt signal is 2.
  755. // For the first two cases, the semi-finished work will be discarded.
  756. // For the third case, the semi-finished work will be submitted to the consensus engine.
  757. if interrupt != nil && atomic.LoadInt32(interrupt) != commitInterruptNone {
  758. // Notify resubmit loop to increase resubmitting interval due to too frequent commits.
  759. if atomic.LoadInt32(interrupt) == commitInterruptResubmit {
  760. ratio := float64(gasLimit-env.gasPool.Gas()) / float64(gasLimit)
  761. if ratio < 0.1 {
  762. ratio = 0.1
  763. }
  764. w.resubmitAdjustCh <- &intervalAdjust{
  765. ratio: ratio,
  766. inc: true,
  767. }
  768. return errBlockInterruptedByRecommit
  769. }
  770. return errBlockInterruptedByNewHead
  771. }
  772. // If we don't have enough gas for any further transactions then we're done
  773. if env.gasPool.Gas() < params.TxGas {
  774. log.Trace("Not enough gas for further transactions", "have", env.gasPool, "want", params.TxGas)
  775. break
  776. }
  777. // Retrieve the next transaction and abort if all done
  778. tx := txs.Peek()
  779. if tx == nil {
  780. break
  781. }
  782. // Error may be ignored here. The error has already been checked
  783. // during transaction acceptance is the transaction pool.
  784. //
  785. // We use the eip155 signer regardless of the current hf.
  786. from, _ := types.Sender(env.signer, tx)
  787. // Check whether the tx is replay protected. If we're not in the EIP155 hf
  788. // phase, start ignoring the sender until we do.
  789. // add IsEthPoWFork check sign with chainid protected force check
  790. if w.chainConfig.IsEthPoWFork(env.header.Number) && !tx.Protected() {
  791. log.Trace("Ignoring not protected transaction", "hash", tx.Hash(), "eipEthPoW", w.chainConfig.EthPoWForkBlock)
  792. txs.Pop()
  793. continue
  794. }
  795. if tx.Protected() && !w.chainConfig.IsEIP155(env.header.Number) {
  796. log.Trace("Ignoring reply protected transaction", "hash", tx.Hash(), "eip155", w.chainConfig.EIP155Block)
  797. txs.Pop()
  798. continue
  799. }
  800. // Start executing the transaction
  801. env.state.Prepare(tx.Hash(), env.tcount)
  802. logs, err := w.commitTransaction(env, tx)
  803. switch {
  804. case errors.Is(err, core.ErrGasLimitReached):
  805. // Pop the current out-of-gas transaction without shifting in the next from the account
  806. log.Trace("Gas limit exceeded for current block", "sender", from)
  807. txs.Pop()
  808. case errors.Is(err, core.ErrNonceTooLow):
  809. // New head notification data race between the transaction pool and miner, shift
  810. log.Trace("Skipping transaction with low nonce", "sender", from, "nonce", tx.Nonce())
  811. txs.Shift()
  812. case errors.Is(err, core.ErrNonceTooHigh):
  813. // Reorg notification data race between the transaction pool and miner, skip account =
  814. log.Trace("Skipping account with hight nonce", "sender", from, "nonce", tx.Nonce())
  815. txs.Pop()
  816. case errors.Is(err, nil):
  817. // Everything ok, collect the logs and shift in the next transaction from the same account
  818. coalescedLogs = append(coalescedLogs, logs...)
  819. env.tcount++
  820. txs.Shift()
  821. case errors.Is(err, core.ErrTxTypeNotSupported):
  822. // Pop the unsupported transaction without shifting in the next from the account
  823. log.Trace("Skipping unsupported transaction type", "sender", from, "type", tx.Type())
  824. txs.Pop()
  825. default:
  826. // Strange error, discard the transaction and get the next in line (note, the
  827. // nonce-too-high clause will prevent us from executing in vain).
  828. log.Debug("Transaction failed, account skipped", "hash", tx.Hash(), "err", err)
  829. txs.Shift()
  830. }
  831. }
  832. if !w.isRunning() && len(coalescedLogs) > 0 {
  833. // We don't push the pendingLogsEvent while we are sealing. The reason is that
  834. // when we are sealing, the worker will regenerate a sealing block every 3 seconds.
  835. // In order to avoid pushing the repeated pendingLog, we disable the pending log pushing.
  836. // make a copy, the state caches the logs and these logs get "upgraded" from pending to mined
  837. // logs by filling in the block hash when the block was mined by the local miner. This can
  838. // cause a race condition if a log was "upgraded" before the PendingLogsEvent is processed.
  839. cpy := make([]*types.Log, len(coalescedLogs))
  840. for i, l := range coalescedLogs {
  841. cpy[i] = new(types.Log)
  842. *cpy[i] = *l
  843. }
  844. w.pendingLogsFeed.Send(cpy)
  845. }
  846. // Notify resubmit loop to decrease resubmitting interval if current interval is larger
  847. // than the user-specified one.
  848. if interrupt != nil {
  849. w.resubmitAdjustCh <- &intervalAdjust{inc: false}
  850. }
  851. return nil
  852. }
  853. // generateParams wraps various of settings for generating sealing task.
  854. type generateParams struct {
  855. timestamp uint64 // The timstamp for sealing task
  856. forceTime bool // Flag whether the given timestamp is immutable or not
  857. parentHash common.Hash // Parent block hash, empty means the latest chain head
  858. coinbase common.Address // The fee recipient address for including transaction
  859. random common.Hash // The randomness generated by beacon chain, empty before the merge
  860. noUncle bool // Flag whether the uncle block inclusion is allowed
  861. noExtra bool // Flag whether the extra field assignment is allowed
  862. noTxs bool // Flag whether an empty block without any transaction is expected
  863. }
  864. // prepareWork constructs the sealing task according to the given parameters,
  865. // either based on the last chain head or specified parent. In this function
  866. // the pending transactions are not filled yet, only the empty task returned.
  867. func (w *worker) prepareWork(genParams *generateParams) (*environment, error) {
  868. w.mu.RLock()
  869. defer w.mu.RUnlock()
  870. // Find the parent block for sealing task
  871. parent := w.chain.CurrentBlock()
  872. if genParams.parentHash != (common.Hash{}) {
  873. parent = w.chain.GetBlockByHash(genParams.parentHash)
  874. }
  875. if parent == nil {
  876. return nil, fmt.Errorf("missing parent")
  877. }
  878. // Sanity check the timestamp correctness, recap the timestamp
  879. // to parent+1 if the mutation is allowed.
  880. timestamp := genParams.timestamp
  881. if parent.Time() >= timestamp {
  882. if genParams.forceTime {
  883. return nil, fmt.Errorf("invalid timestamp, parent %d given %d", parent.Time(), timestamp)
  884. }
  885. timestamp = parent.Time() + 1
  886. }
  887. // Construct the sealing block header, set the extra field if it's allowed
  888. num := parent.Number()
  889. header := &types.Header{
  890. ParentHash: parent.Hash(),
  891. Number: num.Add(num, common.Big1),
  892. GasLimit: core.CalcGasLimit(parent.GasLimit(), w.config.GasCeil),
  893. Time: timestamp,
  894. Coinbase: genParams.coinbase,
  895. }
  896. if !genParams.noExtra && len(w.extra) != 0 {
  897. header.Extra = w.extra
  898. }
  899. // Set the randomness field from the beacon chain if it's available.
  900. if genParams.random != (common.Hash{}) {
  901. header.MixDigest = genParams.random
  902. }
  903. // Set baseFee and GasLimit if we are on an EIP-1559 chain
  904. if w.chainConfig.IsLondon(header.Number) {
  905. header.BaseFee = misc.CalcBaseFee(w.chainConfig, parent.Header())
  906. if !w.chainConfig.IsLondon(parent.Number()) {
  907. parentGasLimit := parent.GasLimit() * params.ElasticityMultiplier
  908. header.GasLimit = core.CalcGasLimit(parentGasLimit, w.config.GasCeil)
  909. }
  910. }
  911. // Run the consensus preparation with the default or customized consensus engine.
  912. if err := w.engine.Prepare(w.chain, header); err != nil {
  913. log.Error("Failed to prepare header for sealing", "err", err)
  914. return nil, err
  915. }
  916. // Could potentially happen if starting to mine in an odd state.
  917. // Note genParams.coinbase can be different with header.Coinbase
  918. // since clique algorithm can modify the coinbase field in header.
  919. env, err := w.makeEnv(parent, header, genParams.coinbase)
  920. if err != nil {
  921. log.Error("Failed to create sealing context", "err", err)
  922. return nil, err
  923. }
  924. // Accumulate the uncles for the sealing work only if it's allowed.
  925. if !genParams.noUncle {
  926. commitUncles := func(blocks map[common.Hash]*types.Block) {
  927. for hash, uncle := range blocks {
  928. if len(env.uncles) == 2 {
  929. break
  930. }
  931. if err := w.commitUncle(env, uncle.Header()); err != nil {
  932. log.Trace("Possible uncle rejected", "hash", hash, "reason", err)
  933. } else {
  934. log.Debug("Committing new uncle to block", "hash", hash)
  935. }
  936. }
  937. }
  938. // Prefer to locally generated uncle
  939. commitUncles(w.localUncles)
  940. commitUncles(w.remoteUncles)
  941. }
  942. return env, nil
  943. }
  944. // fillTransactions retrieves the pending transactions from the txpool and fills them
  945. // into the given sealing block. The transaction selection and ordering strategy can
  946. // be customized with the plugin in the future.
  947. func (w *worker) fillTransactions(interrupt *int32, env *environment) error {
  948. // Split the pending transactions into locals and remotes
  949. // Fill the block with all available pending transactions.
  950. pending := w.eth.TxPool().Pending(true)
  951. localTxs, remoteTxs := make(map[common.Address]types.Transactions), pending
  952. for _, account := range w.eth.TxPool().Locals() {
  953. if txs := remoteTxs[account]; len(txs) > 0 {
  954. delete(remoteTxs, account)
  955. localTxs[account] = txs
  956. }
  957. }
  958. if len(localTxs) > 0 {
  959. txs := types.NewTransactionsByPriceAndNonce(env.signer, localTxs, env.header.BaseFee)
  960. if err := w.commitTransactions(env, txs, interrupt); err != nil {
  961. return err
  962. }
  963. }
  964. if len(remoteTxs) > 0 {
  965. txs := types.NewTransactionsByPriceAndNonce(env.signer, remoteTxs, env.header.BaseFee)
  966. if err := w.commitTransactions(env, txs, interrupt); err != nil {
  967. return err
  968. }
  969. }
  970. return nil
  971. }
  972. // generateWork generates a sealing block based on the given parameters.
  973. func (w *worker) generateWork(params *generateParams) (*types.Block, error) {
  974. work, err := w.prepareWork(params)
  975. if err != nil {
  976. return nil, err
  977. }
  978. defer work.discard()
  979. if !params.noTxs {
  980. w.fillTransactions(nil, work)
  981. }
  982. return w.engine.FinalizeAndAssemble(w.chain, work.header, work.state, work.txs, work.unclelist(), work.receipts)
  983. }
  984. // commitWork generates several new sealing tasks based on the parent block
  985. // and submit them to the sealer.
  986. func (w *worker) commitWork(interrupt *int32, noempty bool, timestamp int64) {
  987. start := time.Now()
  988. // Set the coinbase if the worker is running or it's required
  989. var coinbase common.Address
  990. if w.isRunning() {
  991. if w.coinbase == (common.Address{}) {
  992. log.Error("Refusing to mine without etherbase")
  993. return
  994. }
  995. coinbase = w.coinbase // Use the preset address as the fee recipient
  996. }
  997. work, err := w.prepareWork(&generateParams{
  998. timestamp: uint64(timestamp),
  999. coinbase: coinbase,
  1000. })
  1001. if err != nil {
  1002. return
  1003. }
  1004. // Create an empty block based on temporary copied state for
  1005. // sealing in advance without waiting block execution finished.
  1006. if !noempty && atomic.LoadUint32(&w.noempty) == 0 {
  1007. w.commit(work.copy(), nil, false, start)
  1008. }
  1009. // Fill pending transactions from the txpool
  1010. err = w.fillTransactions(interrupt, work)
  1011. if errors.Is(err, errBlockInterruptedByNewHead) {
  1012. work.discard()
  1013. return
  1014. }
  1015. w.commit(work.copy(), w.fullTaskHook, true, start)
  1016. // Swap out the old work with the new one, terminating any leftover
  1017. // prefetcher processes in the mean time and starting a new one.
  1018. if w.current != nil {
  1019. w.current.discard()
  1020. }
  1021. w.current = work
  1022. }
  1023. // commit runs any post-transaction state modifications, assembles the final block
  1024. // and commits new work if consensus engine is running.
  1025. // Note the assumption is held that the mutation is allowed to the passed env, do
  1026. // the deep copy first.
  1027. func (w *worker) commit(env *environment, interval func(), update bool, start time.Time) error {
  1028. if w.isRunning() {
  1029. if interval != nil {
  1030. interval()
  1031. }
  1032. // Create a local environment copy, avoid the data race with snapshot state.
  1033. // https://github.com/ethereum/go-ethereum/issues/24299
  1034. env := env.copy()
  1035. block, err := w.engine.FinalizeAndAssemble(w.chain, env.header, env.state, env.txs, env.unclelist(), env.receipts)
  1036. if err != nil {
  1037. return err
  1038. }
  1039. // If we're post merge, just ignore
  1040. if !w.isTTDReached(block.Header()) {
  1041. select {
  1042. case w.taskCh <- &task{receipts: env.receipts, state: env.state, block: block, createdAt: time.Now()}:
  1043. w.unconfirmed.Shift(block.NumberU64() - 1)
  1044. log.Info("Commit new sealing work", "number", block.Number(), "sealhash", w.engine.SealHash(block.Header()),
  1045. "uncles", len(env.uncles), "txs", env.tcount,
  1046. "gas", block.GasUsed(), "fees", totalFees(block, env.receipts),
  1047. "elapsed", common.PrettyDuration(time.Since(start)))
  1048. case <-w.exitCh:
  1049. log.Info("Worker has exited")
  1050. }
  1051. }
  1052. }
  1053. if update {
  1054. w.updateSnapshot(env)
  1055. }
  1056. return nil
  1057. }
  1058. // getSealingBlock generates the sealing block based on the given parameters.
  1059. // The generation result will be passed back via the given channel no matter
  1060. // the generation itself succeeds or not.
  1061. func (w *worker) getSealingBlock(parent common.Hash, timestamp uint64, coinbase common.Address, random common.Hash, noTxs bool) (chan *types.Block, chan error, error) {
  1062. var (
  1063. resCh = make(chan *types.Block, 1)
  1064. errCh = make(chan error, 1)
  1065. )
  1066. req := &getWorkReq{
  1067. params: &generateParams{
  1068. timestamp: timestamp,
  1069. forceTime: true,
  1070. parentHash: parent,
  1071. coinbase: coinbase,
  1072. random: random,
  1073. noUncle: true,
  1074. noExtra: true,
  1075. noTxs: noTxs,
  1076. },
  1077. result: resCh,
  1078. err: errCh,
  1079. }
  1080. select {
  1081. case w.getWorkCh <- req:
  1082. return resCh, errCh, nil
  1083. case <-w.exitCh:
  1084. return nil, nil, errors.New("miner closed")
  1085. }
  1086. }
  1087. // isTTDReached returns the indicator if the given block has reached the total
  1088. // terminal difficulty for The Merge transition.
  1089. func (w *worker) isTTDReached(header *types.Header) bool {
  1090. if w.chain.Config().EthPoWForkSupport {
  1091. return false
  1092. }
  1093. td, ttd := w.chain.GetTd(header.ParentHash, header.Number.Uint64()-1), w.chain.Config().TerminalTotalDifficulty
  1094. return td != nil && ttd != nil && td.Cmp(ttd) >= 0
  1095. }
  1096. // copyReceipts makes a deep copy of the given receipts.
  1097. func copyReceipts(receipts []*types.Receipt) []*types.Receipt {
  1098. result := make([]*types.Receipt, len(receipts))
  1099. for i, l := range receipts {
  1100. cpy := *l
  1101. result[i] = &cpy
  1102. }
  1103. return result
  1104. }
  1105. // postSideBlock fires a side chain event, only use it for testing.
  1106. func (w *worker) postSideBlock(event core.ChainSideEvent) {
  1107. select {
  1108. case w.chainSideCh <- event:
  1109. case <-w.exitCh:
  1110. }
  1111. }
  1112. // totalFees computes total consumed miner fees in ETH. Block transactions and receipts have to have the same order.
  1113. func totalFees(block *types.Block, receipts []*types.Receipt) *big.Float {
  1114. feesWei := new(big.Int)
  1115. for i, tx := range block.Transactions() {
  1116. minerFee, _ := tx.EffectiveGasTip(block.BaseFee())
  1117. feesWei.Add(feesWei, new(big.Int).Mul(new(big.Int).SetUint64(receipts[i].GasUsed), minerFee))
  1118. }
  1119. return new(big.Float).Quo(new(big.Float).SetInt(feesWei), new(big.Float).SetInt(big.NewInt(params.Ether)))
  1120. }