worker.go 34 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045
  1. // Copyright 2015 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package miner
  17. import (
  18. "bytes"
  19. "errors"
  20. "math/big"
  21. "sync"
  22. "sync/atomic"
  23. "time"
  24. mapset "github.com/deckarep/golang-set"
  25. "github.com/ethereum/go-ethereum/common"
  26. "github.com/ethereum/go-ethereum/consensus"
  27. "github.com/ethereum/go-ethereum/consensus/misc"
  28. "github.com/ethereum/go-ethereum/consensus/satoshi"
  29. "github.com/ethereum/go-ethereum/core"
  30. "github.com/ethereum/go-ethereum/core/state"
  31. "github.com/ethereum/go-ethereum/core/systemcontracts"
  32. "github.com/ethereum/go-ethereum/core/types"
  33. "github.com/ethereum/go-ethereum/event"
  34. "github.com/ethereum/go-ethereum/log"
  35. "github.com/ethereum/go-ethereum/metrics"
  36. "github.com/ethereum/go-ethereum/params"
  37. "github.com/ethereum/go-ethereum/trie"
  38. )
  39. const (
  40. // resultQueueSize is the size of channel listening to sealing result.
  41. resultQueueSize = 10
  42. // txChanSize is the size of channel listening to NewTxsEvent.
  43. // The number is referenced from the size of tx pool.
  44. txChanSize = 4096
  45. // chainHeadChanSize is the size of channel listening to ChainHeadEvent.
  46. chainHeadChanSize = 10
  47. // chainSideChanSize is the size of channel listening to ChainSideEvent.
  48. chainSideChanSize = 10
  49. // resubmitAdjustChanSize is the size of resubmitting interval adjustment channel.
  50. resubmitAdjustChanSize = 10
  51. // miningLogAtDepth is the number of confirmations before logging successful mining.
  52. miningLogAtDepth = 11
  53. // minRecommitInterval is the minimal time interval to recreate the mining block with
  54. // any newly arrived transactions.
  55. minRecommitInterval = 1 * time.Second
  56. // maxRecommitInterval is the maximum time interval to recreate the mining block with
  57. // any newly arrived transactions.
  58. maxRecommitInterval = 15 * time.Second
  59. // intervalAdjustRatio is the impact a single interval adjustment has on sealing work
  60. // resubmitting interval.
  61. intervalAdjustRatio = 0.1
  62. // intervalAdjustBias is applied during the new resubmit interval calculation in favor of
  63. // increasing upper limit or decreasing lower limit so that the limit can be reachable.
  64. intervalAdjustBias = 200 * 1000.0 * 1000.0
  65. // staleThreshold is the maximum depth of the acceptable stale block.
  66. staleThreshold = 11
  67. )
  68. var (
  69. commitTxsTimer = metrics.NewRegisteredTimer("worker/committxs", nil)
  70. )
  71. // environment is the worker's current environment and holds all of the current state information.
  72. type environment struct {
  73. signer types.Signer
  74. state *state.StateDB // apply state changes here
  75. ancestors mapset.Set // ancestor set (used for checking uncle parent validity)
  76. family mapset.Set // family set (used for checking uncle invalidity)
  77. uncles mapset.Set // uncle set
  78. tcount int // tx count in cycle
  79. gasPool *core.GasPool // available gas used to pack transactions
  80. header *types.Header
  81. txs []*types.Transaction
  82. receipts []*types.Receipt
  83. }
  84. // task contains all information for consensus engine sealing and result submitting.
  85. type task struct {
  86. receipts []*types.Receipt
  87. state *state.StateDB
  88. block *types.Block
  89. createdAt time.Time
  90. }
  91. const (
  92. commitInterruptNone int32 = iota
  93. commitInterruptNewHead
  94. commitInterruptResubmit
  95. )
  96. // newWorkReq represents a request for new sealing work submitting with relative interrupt notifier.
  97. type newWorkReq struct {
  98. interrupt *int32
  99. noempty bool
  100. timestamp int64
  101. }
  102. // intervalAdjust represents a resubmitting interval adjustment.
  103. type intervalAdjust struct {
  104. ratio float64
  105. inc bool
  106. }
  107. // worker is the main object which takes care of submitting new work to consensus engine
  108. // and gathering the sealing result.
  109. type worker struct {
  110. config *Config
  111. chainConfig *params.ChainConfig
  112. engine consensus.Engine
  113. eth Backend
  114. chain *core.BlockChain
  115. // Feeds
  116. pendingLogsFeed event.Feed
  117. // Subscriptions
  118. mux *event.TypeMux
  119. txsCh chan core.NewTxsEvent
  120. txsSub event.Subscription
  121. chainHeadCh chan core.ChainHeadEvent
  122. chainHeadSub event.Subscription
  123. chainSideCh chan core.ChainSideEvent
  124. chainSideSub event.Subscription
  125. // Channels
  126. newWorkCh chan *newWorkReq
  127. taskCh chan *task
  128. resultCh chan *types.Block
  129. startCh chan struct{}
  130. exitCh chan struct{}
  131. resubmitIntervalCh chan time.Duration
  132. resubmitAdjustCh chan *intervalAdjust
  133. current *environment // An environment for current running cycle.
  134. localUncles map[common.Hash]*types.Block // A set of side blocks generated locally as the possible uncle blocks.
  135. remoteUncles map[common.Hash]*types.Block // A set of side blocks as the possible uncle blocks.
  136. unconfirmed *unconfirmedBlocks // A set of locally mined blocks pending canonicalness confirmations.
  137. mu sync.RWMutex // The lock used to protect the coinbase and extra fields
  138. coinbase common.Address
  139. extra []byte
  140. pendingMu sync.RWMutex
  141. pendingTasks map[common.Hash]*task
  142. snapshotMu sync.RWMutex // The lock used to protect the block snapshot and state snapshot
  143. snapshotBlock *types.Block
  144. snapshotState *state.StateDB
  145. // atomic status counters
  146. running int32 // The indicator whether the consensus engine is running or not.
  147. newTxs int32 // New arrival transaction count since last sealing work submitting.
  148. // noempty is the flag used to control whether the feature of pre-seal empty
  149. // block is enabled. The default value is false(pre-seal is enabled by default).
  150. // But in some special scenario the consensus engine will seal blocks instantaneously,
  151. // in this case this feature will add all empty blocks into canonical chain
  152. // non-stop and no real transaction will be included.
  153. noempty uint32
  154. // External functions
  155. isLocalBlock func(block *types.Block) bool // Function used to determine whether the specified block is mined by local miner.
  156. // Test hooks
  157. newTaskHook func(*task) // Method to call upon receiving a new sealing task.
  158. skipSealHook func(*task) bool // Method to decide whether skipping the sealing.
  159. fullTaskHook func() // Method to call before pushing the full sealing task.
  160. resubmitHook func(time.Duration, time.Duration) // Method to call upon updating resubmitting interval.
  161. }
  162. func newWorker(config *Config, chainConfig *params.ChainConfig, engine consensus.Engine, eth Backend, mux *event.TypeMux, isLocalBlock func(*types.Block) bool, init bool) *worker {
  163. worker := &worker{
  164. config: config,
  165. chainConfig: chainConfig,
  166. engine: engine,
  167. eth: eth,
  168. mux: mux,
  169. chain: eth.BlockChain(),
  170. isLocalBlock: isLocalBlock,
  171. localUncles: make(map[common.Hash]*types.Block),
  172. remoteUncles: make(map[common.Hash]*types.Block),
  173. unconfirmed: newUnconfirmedBlocks(eth.BlockChain(), miningLogAtDepth),
  174. pendingTasks: make(map[common.Hash]*task),
  175. txsCh: make(chan core.NewTxsEvent, txChanSize),
  176. chainHeadCh: make(chan core.ChainHeadEvent, chainHeadChanSize),
  177. chainSideCh: make(chan core.ChainSideEvent, chainSideChanSize),
  178. newWorkCh: make(chan *newWorkReq),
  179. taskCh: make(chan *task),
  180. resultCh: make(chan *types.Block, resultQueueSize),
  181. exitCh: make(chan struct{}),
  182. startCh: make(chan struct{}, 1),
  183. resubmitIntervalCh: make(chan time.Duration),
  184. resubmitAdjustCh: make(chan *intervalAdjust, resubmitAdjustChanSize),
  185. }
  186. // Subscribe NewTxsEvent for tx pool
  187. worker.txsSub = eth.TxPool().SubscribeNewTxsEvent(worker.txsCh)
  188. // Subscribe events for blockchain
  189. worker.chainHeadSub = eth.BlockChain().SubscribeChainHeadEvent(worker.chainHeadCh)
  190. worker.chainSideSub = eth.BlockChain().SubscribeChainSideEvent(worker.chainSideCh)
  191. // Sanitize recommit interval if the user-specified one is too short.
  192. recommit := worker.config.Recommit
  193. if recommit < minRecommitInterval {
  194. log.Warn("Sanitizing miner recommit interval", "provided", recommit, "updated", minRecommitInterval)
  195. recommit = minRecommitInterval
  196. }
  197. go worker.mainLoop()
  198. go worker.newWorkLoop(recommit)
  199. go worker.resultLoop()
  200. go worker.taskLoop()
  201. // Submit first work to initialize pending state.
  202. if init {
  203. worker.startCh <- struct{}{}
  204. }
  205. return worker
  206. }
  207. // setEtherbase sets the etherbase used to initialize the block coinbase field.
  208. func (w *worker) setEtherbase(addr common.Address) {
  209. w.mu.Lock()
  210. defer w.mu.Unlock()
  211. w.coinbase = addr
  212. }
  213. // setExtra sets the content used to initialize the block extra field.
  214. func (w *worker) setExtra(extra []byte) {
  215. w.mu.Lock()
  216. defer w.mu.Unlock()
  217. w.extra = extra
  218. }
  219. // setRecommitInterval updates the interval for miner sealing work recommitting.
  220. func (w *worker) setRecommitInterval(interval time.Duration) {
  221. w.resubmitIntervalCh <- interval
  222. }
  223. // disablePreseal disables pre-sealing mining feature
  224. func (w *worker) disablePreseal() {
  225. atomic.StoreUint32(&w.noempty, 1)
  226. }
  227. // enablePreseal enables pre-sealing mining feature
  228. func (w *worker) enablePreseal() {
  229. atomic.StoreUint32(&w.noempty, 0)
  230. }
  231. // pending returns the pending state and corresponding block.
  232. func (w *worker) pending() (*types.Block, *state.StateDB) {
  233. // return a snapshot to avoid contention on currentMu mutex
  234. w.snapshotMu.RLock()
  235. defer w.snapshotMu.RUnlock()
  236. if w.snapshotState == nil {
  237. return nil, nil
  238. }
  239. return w.snapshotBlock, w.snapshotState.Copy()
  240. }
  241. // pendingBlock returns pending block.
  242. func (w *worker) pendingBlock() *types.Block {
  243. // return a snapshot to avoid contention on currentMu mutex
  244. w.snapshotMu.RLock()
  245. defer w.snapshotMu.RUnlock()
  246. return w.snapshotBlock
  247. }
  248. // start sets the running status as 1 and triggers new work submitting.
  249. func (w *worker) start() {
  250. atomic.StoreInt32(&w.running, 1)
  251. w.startCh <- struct{}{}
  252. }
  253. // stop sets the running status as 0.
  254. func (w *worker) stop() {
  255. atomic.StoreInt32(&w.running, 0)
  256. }
  257. // isRunning returns an indicator whether worker is running or not.
  258. func (w *worker) isRunning() bool {
  259. return atomic.LoadInt32(&w.running) == 1
  260. }
  261. // close terminates all background threads maintained by the worker.
  262. // Note the worker does not support being closed multiple times.
  263. func (w *worker) close() {
  264. if w.current != nil && w.current.state != nil {
  265. w.current.state.StopPrefetcher()
  266. }
  267. atomic.StoreInt32(&w.running, 0)
  268. close(w.exitCh)
  269. }
  270. // recalcRecommit recalculates the resubmitting interval upon feedback.
  271. func recalcRecommit(minRecommit, prev time.Duration, target float64, inc bool) time.Duration {
  272. var (
  273. prevF = float64(prev.Nanoseconds())
  274. next float64
  275. )
  276. if inc {
  277. next = prevF*(1-intervalAdjustRatio) + intervalAdjustRatio*(target+intervalAdjustBias)
  278. max := float64(maxRecommitInterval.Nanoseconds())
  279. if next > max {
  280. next = max
  281. }
  282. } else {
  283. next = prevF*(1-intervalAdjustRatio) + intervalAdjustRatio*(target-intervalAdjustBias)
  284. min := float64(minRecommit.Nanoseconds())
  285. if next < min {
  286. next = min
  287. }
  288. }
  289. return time.Duration(int64(next))
  290. }
  291. // newWorkLoop is a standalone goroutine to submit new mining work upon received events.
  292. func (w *worker) newWorkLoop(recommit time.Duration) {
  293. var (
  294. interrupt *int32
  295. minRecommit = recommit // minimal resubmit interval specified by user.
  296. timestamp int64 // timestamp for each round of mining.
  297. )
  298. timer := time.NewTimer(0)
  299. defer timer.Stop()
  300. <-timer.C // discard the initial tick
  301. // commit aborts in-flight transaction execution with given signal and resubmits a new one.
  302. commit := func(noempty bool, s int32) {
  303. if interrupt != nil {
  304. atomic.StoreInt32(interrupt, s)
  305. }
  306. interrupt = new(int32)
  307. select {
  308. case w.newWorkCh <- &newWorkReq{interrupt: interrupt, noempty: noempty, timestamp: timestamp}:
  309. case <-w.exitCh:
  310. return
  311. }
  312. timer.Reset(recommit)
  313. atomic.StoreInt32(&w.newTxs, 0)
  314. }
  315. // clearPending cleans the stale pending tasks.
  316. clearPending := func(number uint64) {
  317. w.pendingMu.Lock()
  318. for h, t := range w.pendingTasks {
  319. if t.block.NumberU64()+staleThreshold <= number {
  320. delete(w.pendingTasks, h)
  321. }
  322. }
  323. w.pendingMu.Unlock()
  324. }
  325. for {
  326. select {
  327. case <-w.startCh:
  328. clearPending(w.chain.CurrentBlock().NumberU64())
  329. timestamp = time.Now().Unix()
  330. commit(true, commitInterruptNewHead)
  331. case head := <-w.chainHeadCh:
  332. if !w.isRunning() {
  333. continue
  334. }
  335. clearPending(head.Block.NumberU64())
  336. timestamp = time.Now().Unix()
  337. if p, ok := w.engine.(*satoshi.Satoshi); ok {
  338. signedRecent, err := p.SignRecently(w.chain, head.Block.Header())
  339. if err != nil {
  340. log.Info("Not allowed to propose block", "err", err)
  341. if p.IsRoundEnd(w.chain, head.Block.Header()) {
  342. commit(true, commitInterruptNewHead)
  343. }
  344. continue
  345. }
  346. if signedRecent {
  347. log.Info("Signed recently, must wait")
  348. if p.IsRoundEnd(w.chain, head.Block.Header()) {
  349. commit(true, commitInterruptNewHead)
  350. }
  351. continue
  352. }
  353. }
  354. commit(true, commitInterruptNewHead)
  355. case <-timer.C:
  356. // If mining is running resubmit a new work cycle periodically to pull in
  357. // higher priced transactions. Disable this overhead for pending blocks.
  358. if w.isRunning() && ((w.chainConfig.Ethash != nil) || (w.chainConfig.Clique != nil &&
  359. w.chainConfig.Clique.Period > 0) || (w.chainConfig.Satoshi != nil && w.chainConfig.Satoshi.Period > 0)) {
  360. // Short circuit if no new transaction arrives.
  361. if atomic.LoadInt32(&w.newTxs) == 0 {
  362. timer.Reset(recommit)
  363. continue
  364. }
  365. commit(true, commitInterruptResubmit)
  366. }
  367. case interval := <-w.resubmitIntervalCh:
  368. // Adjust resubmit interval explicitly by user.
  369. if interval < minRecommitInterval {
  370. log.Warn("Sanitizing miner recommit interval", "provided", interval, "updated", minRecommitInterval)
  371. interval = minRecommitInterval
  372. }
  373. log.Info("Miner recommit interval update", "from", minRecommit, "to", interval)
  374. minRecommit, recommit = interval, interval
  375. if w.resubmitHook != nil {
  376. w.resubmitHook(minRecommit, recommit)
  377. }
  378. case adjust := <-w.resubmitAdjustCh:
  379. // Adjust resubmit interval by feedback.
  380. if adjust.inc {
  381. before := recommit
  382. target := float64(recommit.Nanoseconds()) / adjust.ratio
  383. recommit = recalcRecommit(minRecommit, recommit, target, true)
  384. log.Trace("Increase miner recommit interval", "from", before, "to", recommit)
  385. } else {
  386. before := recommit
  387. recommit = recalcRecommit(minRecommit, recommit, float64(minRecommit.Nanoseconds()), false)
  388. log.Trace("Decrease miner recommit interval", "from", before, "to", recommit)
  389. }
  390. if w.resubmitHook != nil {
  391. w.resubmitHook(minRecommit, recommit)
  392. }
  393. case <-w.exitCh:
  394. return
  395. }
  396. }
  397. }
  398. // mainLoop is a standalone goroutine to regenerate the sealing task based on the received event.
  399. func (w *worker) mainLoop() {
  400. defer w.txsSub.Unsubscribe()
  401. defer w.chainHeadSub.Unsubscribe()
  402. defer w.chainSideSub.Unsubscribe()
  403. for {
  404. select {
  405. case req := <-w.newWorkCh:
  406. w.commitNewWork(req.interrupt, req.noempty, req.timestamp)
  407. case ev := <-w.chainSideCh:
  408. // Short circuit for duplicate side blocks
  409. if _, ok := w.engine.(*satoshi.Satoshi); ok {
  410. continue
  411. }
  412. if _, exist := w.localUncles[ev.Block.Hash()]; exist {
  413. continue
  414. }
  415. if _, exist := w.remoteUncles[ev.Block.Hash()]; exist {
  416. continue
  417. }
  418. // Add side block to possible uncle block set depending on the author.
  419. if w.isLocalBlock != nil && w.isLocalBlock(ev.Block) {
  420. w.localUncles[ev.Block.Hash()] = ev.Block
  421. } else {
  422. w.remoteUncles[ev.Block.Hash()] = ev.Block
  423. }
  424. // If our mining block contains less than 2 uncle blocks,
  425. // add the new uncle block if valid and regenerate a mining block.
  426. if w.isRunning() && w.current != nil && w.current.uncles.Cardinality() < 2 {
  427. start := time.Now()
  428. if err := w.commitUncle(w.current, ev.Block.Header()); err == nil {
  429. var uncles []*types.Header
  430. w.commit(uncles, nil, false, start)
  431. }
  432. }
  433. case ev := <-w.txsCh:
  434. // Apply transactions to the pending state if we're not mining.
  435. //
  436. // Note all transactions received may not be continuous with transactions
  437. // already included in the current mining block. These transactions will
  438. // be automatically eliminated.
  439. if !w.isRunning() && w.current != nil {
  440. // If block is already full, abort
  441. if gp := w.current.gasPool; gp != nil && gp.Gas() < params.TxGas {
  442. continue
  443. }
  444. w.mu.RLock()
  445. coinbase := w.coinbase
  446. w.mu.RUnlock()
  447. txs := make(map[common.Address]types.Transactions)
  448. for _, tx := range ev.Txs {
  449. acc, _ := types.Sender(w.current.signer, tx)
  450. txs[acc] = append(txs[acc], tx)
  451. }
  452. txset := types.NewTransactionsByPriceAndNonce(w.current.signer, txs)
  453. tcount := w.current.tcount
  454. w.commitTransactions(txset, coinbase, nil)
  455. // Only update the snapshot if any new transactons were added
  456. // to the pending block
  457. if tcount != w.current.tcount {
  458. w.updateSnapshot()
  459. }
  460. } else {
  461. // Special case, if the consensus engine is 0 period clique(dev mode),
  462. // submit mining work here since all empty submission will be rejected
  463. // by clique. Of course the advance sealing(empty submission) is disabled.
  464. if (w.chainConfig.Clique != nil && w.chainConfig.Clique.Period == 0) ||
  465. (w.chainConfig.Satoshi != nil && w.chainConfig.Satoshi.Period == 0) {
  466. w.commitNewWork(nil, true, time.Now().Unix())
  467. }
  468. }
  469. atomic.AddInt32(&w.newTxs, int32(len(ev.Txs)))
  470. // System stopped
  471. case <-w.exitCh:
  472. return
  473. case <-w.txsSub.Err():
  474. return
  475. case <-w.chainHeadSub.Err():
  476. return
  477. case <-w.chainSideSub.Err():
  478. return
  479. }
  480. }
  481. }
  482. // taskLoop is a standalone goroutine to fetch sealing task from the generator and
  483. // push them to consensus engine.
  484. func (w *worker) taskLoop() {
  485. var (
  486. stopCh chan struct{}
  487. prev common.Hash
  488. )
  489. // interrupt aborts the in-flight sealing task.
  490. interrupt := func() {
  491. if stopCh != nil {
  492. close(stopCh)
  493. stopCh = nil
  494. }
  495. }
  496. for {
  497. select {
  498. case task := <-w.taskCh:
  499. if w.newTaskHook != nil {
  500. w.newTaskHook(task)
  501. }
  502. // Reject duplicate sealing work due to resubmitting.
  503. sealHash := w.engine.SealHash(task.block.Header())
  504. if sealHash == prev {
  505. continue
  506. }
  507. // Interrupt previous sealing operation
  508. interrupt()
  509. stopCh, prev = make(chan struct{}), sealHash
  510. if w.skipSealHook != nil && w.skipSealHook(task) {
  511. continue
  512. }
  513. w.pendingMu.Lock()
  514. w.pendingTasks[sealHash] = task
  515. w.pendingMu.Unlock()
  516. if err := w.engine.Seal(w.chain, task.block, w.resultCh, stopCh); err != nil {
  517. log.Warn("Block sealing failed", "err", err)
  518. }
  519. case <-w.exitCh:
  520. interrupt()
  521. return
  522. }
  523. }
  524. }
  525. // resultLoop is a standalone goroutine to handle sealing result submitting
  526. // and flush relative data to the database.
  527. func (w *worker) resultLoop() {
  528. for {
  529. select {
  530. case block := <-w.resultCh:
  531. // Short circuit when receiving empty result.
  532. if block == nil {
  533. continue
  534. }
  535. // Short circuit when receiving duplicate result caused by resubmitting.
  536. if w.chain.HasBlock(block.Hash(), block.NumberU64()) {
  537. continue
  538. }
  539. var (
  540. sealhash = w.engine.SealHash(block.Header())
  541. hash = block.Hash()
  542. )
  543. w.pendingMu.RLock()
  544. task, exist := w.pendingTasks[sealhash]
  545. w.pendingMu.RUnlock()
  546. if !exist {
  547. log.Error("Block found but no relative pending task", "number", block.Number(), "sealhash", sealhash, "hash", hash)
  548. continue
  549. }
  550. // Different block could share same sealhash, deep copy here to prevent write-write conflict.
  551. var (
  552. receipts = make([]*types.Receipt, len(task.receipts))
  553. logs []*types.Log
  554. )
  555. for i, receipt := range task.receipts {
  556. // add block location fields
  557. receipt.BlockHash = hash
  558. receipt.BlockNumber = block.Number()
  559. receipt.TransactionIndex = uint(i)
  560. receipts[i] = new(types.Receipt)
  561. *receipts[i] = *receipt
  562. // Update the block hash in all logs since it is now available and not when the
  563. // receipt/log of individual transactions were created.
  564. for _, log := range receipt.Logs {
  565. log.BlockHash = hash
  566. }
  567. logs = append(logs, receipt.Logs...)
  568. }
  569. // Commit block and state to database.
  570. task.state.SetExpectedStateRoot(block.Root())
  571. _, err := w.chain.WriteBlockWithState(block, receipts, logs, task.state, true)
  572. if err != nil {
  573. log.Error("Failed writing block to chain", "err", err)
  574. continue
  575. }
  576. log.Info("Successfully sealed new block", "number", block.Number(), "sealhash", sealhash, "hash", hash,
  577. "elapsed", common.PrettyDuration(time.Since(task.createdAt)))
  578. // Broadcast the block and announce chain insertion event
  579. w.mux.Post(core.NewMinedBlockEvent{Block: block})
  580. // Insert the block into the set of pending ones to resultLoop for confirmations
  581. w.unconfirmed.Insert(block.NumberU64(), block.Hash())
  582. case <-w.exitCh:
  583. return
  584. }
  585. }
  586. }
  587. // makeCurrent creates a new environment for the current cycle.
  588. func (w *worker) makeCurrent(parent *types.Block, header *types.Header) error {
  589. // Retrieve the parent state to execute on top and start a prefetcher for
  590. // the miner to speed block sealing up a bit
  591. state, err := w.chain.StateAt(parent.Root())
  592. if err != nil {
  593. return err
  594. }
  595. state.StartPrefetcher("miner")
  596. env := &environment{
  597. signer: types.MakeSigner(w.chainConfig, header.Number),
  598. state: state,
  599. ancestors: mapset.NewSet(),
  600. family: mapset.NewSet(),
  601. uncles: mapset.NewSet(),
  602. header: header,
  603. }
  604. // Keep track of transactions which return errors so they can be removed
  605. env.tcount = 0
  606. // Swap out the old work with the new one, terminating any leftover prefetcher
  607. // processes in the mean time and starting a new one.
  608. if w.current != nil && w.current.state != nil {
  609. w.current.state.StopPrefetcher()
  610. }
  611. w.current = env
  612. return nil
  613. }
  614. // commitUncle adds the given block to uncle block set, returns error if failed to add.
  615. func (w *worker) commitUncle(env *environment, uncle *types.Header) error {
  616. hash := uncle.Hash()
  617. if env.uncles.Contains(hash) {
  618. return errors.New("uncle not unique")
  619. }
  620. if env.header.ParentHash == uncle.ParentHash {
  621. return errors.New("uncle is sibling")
  622. }
  623. if !env.ancestors.Contains(uncle.ParentHash) {
  624. return errors.New("uncle's parent unknown")
  625. }
  626. if env.family.Contains(hash) {
  627. return errors.New("uncle already included")
  628. }
  629. env.uncles.Add(uncle.Hash())
  630. return nil
  631. }
  632. // updateSnapshot updates pending snapshot block and state.
  633. // Note this function assumes the current variable is thread safe.
  634. func (w *worker) updateSnapshot() {
  635. w.snapshotMu.Lock()
  636. defer w.snapshotMu.Unlock()
  637. var uncles []*types.Header
  638. w.current.uncles.Each(func(item interface{}) bool {
  639. hash, ok := item.(common.Hash)
  640. if !ok {
  641. return false
  642. }
  643. uncle, exist := w.localUncles[hash]
  644. if !exist {
  645. uncle, exist = w.remoteUncles[hash]
  646. }
  647. if !exist {
  648. return false
  649. }
  650. uncles = append(uncles, uncle.Header())
  651. return false
  652. })
  653. w.snapshotBlock = types.NewBlock(
  654. w.current.header,
  655. w.current.txs,
  656. uncles,
  657. w.current.receipts,
  658. trie.NewStackTrie(nil),
  659. )
  660. w.snapshotState = w.current.state.Copy()
  661. }
  662. func (w *worker) commitTransaction(tx *types.Transaction, coinbase common.Address, receiptProcessors ...core.ReceiptProcessor) ([]*types.Log, error) {
  663. snap := w.current.state.Snapshot()
  664. receipt, err := core.ApplyTransaction(w.chainConfig, w.chain, &coinbase, w.current.gasPool, w.current.state, w.current.header, tx, &w.current.header.GasUsed, *w.chain.GetVMConfig(), receiptProcessors...)
  665. if err != nil {
  666. w.current.state.RevertToSnapshot(snap)
  667. return nil, err
  668. }
  669. w.current.txs = append(w.current.txs, tx)
  670. w.current.receipts = append(w.current.receipts, receipt)
  671. return receipt.Logs, nil
  672. }
  673. func (w *worker) commitTransactions(txs *types.TransactionsByPriceAndNonce, coinbase common.Address, interrupt *int32) bool {
  674. // Short circuit if current is nil
  675. if w.current == nil {
  676. return true
  677. }
  678. if w.current.gasPool == nil {
  679. w.current.gasPool = new(core.GasPool).AddGas(w.current.header.GasLimit)
  680. w.current.gasPool.SubGas(params.SystemTxsGas)
  681. }
  682. var coalescedLogs []*types.Log
  683. var stopTimer *time.Timer
  684. delay := w.engine.Delay(w.chain, w.current.header)
  685. if delay != nil {
  686. stopTimer = time.NewTimer(*delay - w.config.DelayLeftOver)
  687. log.Debug("Time left for mining work", "left", (*delay - w.config.DelayLeftOver).String(), "leftover", w.config.DelayLeftOver)
  688. defer stopTimer.Stop()
  689. }
  690. // initilise bloom processors
  691. processorCapacity := 100
  692. if txs.CurrentSize() < processorCapacity {
  693. processorCapacity = txs.CurrentSize()
  694. }
  695. bloomProcessors := core.NewAsyncReceiptBloomGenerator(processorCapacity)
  696. LOOP:
  697. for {
  698. // In the following three cases, we will interrupt the execution of the transaction.
  699. // (1) new head block event arrival, the interrupt signal is 1
  700. // (2) worker start or restart, the interrupt signal is 1
  701. // (3) worker recreate the mining block with any newly arrived transactions, the interrupt signal is 2.
  702. // For the first two cases, the semi-finished work will be discarded.
  703. // For the third case, the semi-finished work will be submitted to the consensus engine.
  704. if interrupt != nil && atomic.LoadInt32(interrupt) != commitInterruptNone {
  705. // Notify resubmit loop to increase resubmitting interval due to too frequent commits.
  706. if atomic.LoadInt32(interrupt) == commitInterruptResubmit {
  707. ratio := float64(w.current.header.GasLimit-w.current.gasPool.Gas()) / float64(w.current.header.GasLimit)
  708. if ratio < 0.1 {
  709. ratio = 0.1
  710. }
  711. w.resubmitAdjustCh <- &intervalAdjust{
  712. ratio: ratio,
  713. inc: true,
  714. }
  715. }
  716. return atomic.LoadInt32(interrupt) == commitInterruptNewHead
  717. }
  718. // If we don't have enough gas for any further transactions then we're done
  719. if w.current.gasPool.Gas() < params.TxGas {
  720. log.Trace("Not enough gas for further transactions", "have", w.current.gasPool, "want", params.TxGas)
  721. break
  722. }
  723. if stopTimer != nil {
  724. select {
  725. case <-stopTimer.C:
  726. log.Info("Not enough time for further transactions", "txs", len(w.current.txs))
  727. break LOOP
  728. default:
  729. }
  730. }
  731. // Retrieve the next transaction and abort if all done
  732. tx := txs.Peek()
  733. if tx == nil {
  734. break
  735. }
  736. // Error may be ignored here. The error has already been checked
  737. // during transaction acceptance is the transaction pool.
  738. //
  739. // We use the eip155 signer regardless of the current hf.
  740. //from, _ := types.Sender(w.current.signer, tx)
  741. // Check whether the tx is replay protected. If we're not in the EIP155 hf
  742. // phase, start ignoring the sender until we do.
  743. if tx.Protected() && !w.chainConfig.IsEIP155(w.current.header.Number) {
  744. //log.Trace("Ignoring reply protected transaction", "hash", tx.Hash(), "eip155", w.chainConfig.EIP155Block)
  745. txs.Pop()
  746. continue
  747. }
  748. // Start executing the transaction
  749. w.current.state.Prepare(tx.Hash(), common.Hash{}, w.current.tcount)
  750. logs, err := w.commitTransaction(tx, coinbase, bloomProcessors)
  751. switch {
  752. case errors.Is(err, core.ErrGasLimitReached):
  753. // Pop the current out-of-gas transaction without shifting in the next from the account
  754. //log.Trace("Gas limit exceeded for current block", "sender", from)
  755. txs.Pop()
  756. case errors.Is(err, core.ErrNonceTooLow):
  757. // New head notification data race between the transaction pool and miner, shift
  758. //log.Trace("Skipping transaction with low nonce", "sender", from, "nonce", tx.Nonce())
  759. txs.Shift()
  760. case errors.Is(err, core.ErrNonceTooHigh):
  761. // Reorg notification data race between the transaction pool and miner, skip account =
  762. //log.Trace("Skipping account with hight nonce", "sender", from, "nonce", tx.Nonce())
  763. txs.Pop()
  764. case errors.Is(err, nil):
  765. // Everything ok, collect the logs and shift in the next transaction from the same account
  766. coalescedLogs = append(coalescedLogs, logs...)
  767. w.current.tcount++
  768. txs.Shift()
  769. case errors.Is(err, core.ErrTxTypeNotSupported):
  770. // Pop the unsupported transaction without shifting in the next from the account
  771. //log.Trace("Skipping unsupported transaction type", "sender", from, "type", tx.Type())
  772. txs.Pop()
  773. default:
  774. // Strange error, discard the transaction and get the next in line (note, the
  775. // nonce-too-high clause will prevent us from executing in vain).
  776. //log.Debug("Transaction failed, account skipped", "hash", tx.Hash(), "err", err)
  777. txs.Shift()
  778. }
  779. }
  780. bloomProcessors.Close()
  781. if !w.isRunning() && len(coalescedLogs) > 0 {
  782. // We don't push the pendingLogsEvent while we are mining. The reason is that
  783. // when we are mining, the worker will regenerate a mining block every 3 seconds.
  784. // In order to avoid pushing the repeated pendingLog, we disable the pending log pushing.
  785. // make a copy, the state caches the logs and these logs get "upgraded" from pending to mined
  786. // logs by filling in the block hash when the block was mined by the local miner. This can
  787. // cause a race condition if a log was "upgraded" before the PendingLogsEvent is processed.
  788. cpy := make([]*types.Log, len(coalescedLogs))
  789. for i, l := range coalescedLogs {
  790. cpy[i] = new(types.Log)
  791. *cpy[i] = *l
  792. }
  793. w.pendingLogsFeed.Send(cpy)
  794. }
  795. // Notify resubmit loop to decrease resubmitting interval if current interval is larger
  796. // than the user-specified one.
  797. if interrupt != nil {
  798. w.resubmitAdjustCh <- &intervalAdjust{inc: false}
  799. }
  800. return false
  801. }
  802. // commitNewWork generates several new sealing tasks based on the parent block.
  803. func (w *worker) commitNewWork(interrupt *int32, noempty bool, timestamp int64) {
  804. w.mu.RLock()
  805. defer w.mu.RUnlock()
  806. tstart := time.Now()
  807. parent := w.chain.CurrentBlock()
  808. if parent.Time() >= uint64(timestamp) {
  809. timestamp = int64(parent.Time() + 1)
  810. }
  811. num := parent.Number()
  812. header := &types.Header{
  813. ParentHash: parent.Hash(),
  814. Number: num.Add(num, common.Big1),
  815. GasLimit: core.CalcGasLimit(parent, w.config.GasFloor, w.config.GasCeil),
  816. Extra: w.extra,
  817. Time: uint64(timestamp),
  818. }
  819. // Only set the coinbase if our consensus engine is running (avoid spurious block rewards)
  820. if w.isRunning() {
  821. if w.coinbase == (common.Address{}) {
  822. log.Error("Refusing to mine without etherbase")
  823. return
  824. }
  825. header.Coinbase = w.coinbase
  826. }
  827. if err := w.engine.Prepare(w.chain, header); err != nil {
  828. log.Error("Failed to prepare header for mining", "err", err)
  829. return
  830. }
  831. // If we are care about TheDAO hard-fork check whether to override the extra-data or not
  832. if daoBlock := w.chainConfig.DAOForkBlock; daoBlock != nil {
  833. // Check whether the block is among the fork extra-override range
  834. limit := new(big.Int).Add(daoBlock, params.DAOForkExtraRange)
  835. if header.Number.Cmp(daoBlock) >= 0 && header.Number.Cmp(limit) < 0 {
  836. // Depending whether we support or oppose the fork, override differently
  837. if w.chainConfig.DAOForkSupport {
  838. header.Extra = common.CopyBytes(params.DAOForkBlockExtra)
  839. } else if bytes.Equal(header.Extra, params.DAOForkBlockExtra) {
  840. header.Extra = []byte{} // If miner opposes, don't let it use the reserved extra-data
  841. }
  842. }
  843. }
  844. // Could potentially happen if starting to mine in an odd state.
  845. err := w.makeCurrent(parent, header)
  846. if err != nil {
  847. log.Error("Failed to create mining context", "err", err)
  848. return
  849. }
  850. // Create the current work task and check any fork transitions needed
  851. env := w.current
  852. if w.chainConfig.DAOForkSupport && w.chainConfig.DAOForkBlock != nil && w.chainConfig.DAOForkBlock.Cmp(header.Number) == 0 {
  853. misc.ApplyDAOHardFork(env.state)
  854. }
  855. systemcontracts.UpgradeBuildInSystemContract(w.chainConfig, header.Number, env.state)
  856. // Accumulate the uncles for the current block
  857. uncles := make([]*types.Header, 0)
  858. // Create an empty block based on temporary copied state for
  859. // sealing in advance without waiting block execution finished.
  860. if !noempty && atomic.LoadUint32(&w.noempty) == 0 {
  861. w.commit(uncles, nil, false, tstart)
  862. }
  863. err = w.engine.BeforePackTx(w.chain, env.header, env.state, &env.txs, uncles, &env.receipts)
  864. if err != nil {
  865. log.Error("Failed to pack system tx", "err", err)
  866. return
  867. }
  868. // Fill the block with all available pending transactions.
  869. pending, err := w.eth.TxPool().Pending()
  870. if err != nil {
  871. log.Error("Failed to fetch pending transactions", "err", err)
  872. }
  873. // Short circuit if there is no available pending transactions
  874. if len(pending) != 0 {
  875. start := time.Now()
  876. // Split the pending transactions into locals and remotes
  877. localTxs, remoteTxs := make(map[common.Address]types.Transactions), pending
  878. for _, account := range w.eth.TxPool().Locals() {
  879. if txs := remoteTxs[account]; len(txs) > 0 {
  880. delete(remoteTxs, account)
  881. localTxs[account] = txs
  882. }
  883. }
  884. if len(localTxs) > 0 {
  885. txs := types.NewTransactionsByPriceAndNonce(w.current.signer, localTxs)
  886. if w.commitTransactions(txs, w.coinbase, interrupt) {
  887. return
  888. }
  889. }
  890. if len(remoteTxs) > 0 {
  891. txs := types.NewTransactionsByPriceAndNonce(w.current.signer, remoteTxs)
  892. if w.commitTransactions(txs, w.coinbase, interrupt) {
  893. return
  894. }
  895. }
  896. commitTxsTimer.UpdateSince(start)
  897. log.Info("Gas pool", "height", header.Number.String(), "pool", w.current.gasPool.String())
  898. }
  899. w.commit(uncles, w.fullTaskHook, false, tstart)
  900. }
  901. // commit runs any post-transaction state modifications, assembles the final block
  902. // and commits new work if consensus engine is running.
  903. func (w *worker) commit(uncles []*types.Header, interval func(), update bool, start time.Time) error {
  904. s := w.current.state
  905. err := s.WaitPipeVerification()
  906. if err != nil {
  907. return err
  908. }
  909. block, receipts, err := w.engine.FinalizeAndAssemble(w.chain, types.CopyHeader(w.current.header), s, w.current.txs, uncles, w.current.receipts)
  910. if err != nil {
  911. return err
  912. }
  913. if w.isRunning() {
  914. if interval != nil {
  915. interval()
  916. }
  917. select {
  918. case w.taskCh <- &task{receipts: receipts, state: s, block: block, createdAt: time.Now()}:
  919. w.unconfirmed.Shift(block.NumberU64() - 1)
  920. log.Info("Commit new mining work", "number", block.Number(), "sealhash", w.engine.SealHash(block.Header()),
  921. "uncles", len(uncles), "txs", w.current.tcount,
  922. "gas", block.GasUsed(),
  923. "elapsed", common.PrettyDuration(time.Since(start)))
  924. case <-w.exitCh:
  925. log.Info("Worker has exited")
  926. }
  927. }
  928. if update {
  929. w.updateSnapshot()
  930. }
  931. return nil
  932. }
  933. // postSideBlock fires a side chain event, only use it for testing.
  934. func (w *worker) postSideBlock(event core.ChainSideEvent) {
  935. select {
  936. case w.chainSideCh <- event:
  937. case <-w.exitCh:
  938. }
  939. }