| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453 |
- package miner
- import (
- "fmt"
- "math/big"
- "sort"
- "sync"
- "sync/atomic"
- "github.com/ethereum/go-ethereum/accounts"
- "github.com/ethereum/go-ethereum/common"
- "github.com/ethereum/go-ethereum/core"
- "github.com/ethereum/go-ethereum/core/state"
- "github.com/ethereum/go-ethereum/core/types"
- "github.com/ethereum/go-ethereum/event"
- "github.com/ethereum/go-ethereum/logger"
- "github.com/ethereum/go-ethereum/logger/glog"
- "github.com/ethereum/go-ethereum/pow"
- "gopkg.in/fatih/set.v0"
- )
- var jsonlogger = logger.NewJsonLogger()
- type environment struct {
- totalUsedGas *big.Int
- state *state.StateDB
- coinbase *state.StateObject
- block *types.Block
- family *set.Set
- uncles *set.Set
- }
- func env(block *types.Block, eth core.Backend) *environment {
- state := state.New(block.Root(), eth.StateDb())
- env := &environment{
- totalUsedGas: new(big.Int),
- state: state,
- block: block,
- family: set.New(),
- uncles: set.New(),
- coinbase: state.GetOrNewStateObject(block.Coinbase()),
- }
- return env
- }
- type Work struct {
- Number uint64
- Nonce uint64
- MixDigest []byte
- SeedHash []byte
- }
- type Agent interface {
- Work() chan<- *types.Block
- SetReturnCh(chan<- *types.Block)
- Stop()
- Start()
- GetHashRate() int64
- }
- type worker struct {
- mu sync.Mutex
- agents []Agent
- recv chan *types.Block
- mux *event.TypeMux
- quit chan struct{}
- pow pow.PoW
- eth core.Backend
- chain *core.ChainManager
- proc *core.BlockProcessor
- coinbase common.Address
- gasPrice *big.Int
- extra []byte
- currentMu sync.Mutex
- current *environment
- uncleMu sync.Mutex
- possibleUncles map[common.Hash]*types.Block
- txQueueMu sync.Mutex
- txQueue map[common.Hash]*types.Transaction
- // atomic status counters
- mining int32
- atWork int32
- }
- func newWorker(coinbase common.Address, eth core.Backend) *worker {
- worker := &worker{
- eth: eth,
- mux: eth.EventMux(),
- recv: make(chan *types.Block),
- gasPrice: new(big.Int),
- chain: eth.ChainManager(),
- proc: eth.BlockProcessor(),
- possibleUncles: make(map[common.Hash]*types.Block),
- coinbase: coinbase,
- txQueue: make(map[common.Hash]*types.Transaction),
- quit: make(chan struct{}),
- }
- go worker.update()
- go worker.wait()
- worker.commitNewWork()
- return worker
- }
- func (self *worker) pendingState() *state.StateDB {
- self.currentMu.Lock()
- defer self.currentMu.Unlock()
- return self.current.state
- }
- func (self *worker) pendingBlock() *types.Block {
- self.currentMu.Lock()
- defer self.currentMu.Unlock()
- return self.current.block
- }
- func (self *worker) start() {
- self.mu.Lock()
- defer self.mu.Unlock()
- // spin up agents
- for _, agent := range self.agents {
- agent.Start()
- }
- atomic.StoreInt32(&self.mining, 1)
- }
- func (self *worker) stop() {
- self.mu.Lock()
- defer self.mu.Unlock()
- if atomic.LoadInt32(&self.mining) == 1 {
- // stop all agents
- for _, agent := range self.agents {
- agent.Stop()
- }
- }
- atomic.StoreInt32(&self.mining, 0)
- atomic.StoreInt32(&self.atWork, 0)
- }
- func (self *worker) register(agent Agent) {
- self.mu.Lock()
- defer self.mu.Unlock()
- self.agents = append(self.agents, agent)
- agent.SetReturnCh(self.recv)
- }
- func (self *worker) update() {
- events := self.mux.Subscribe(core.ChainHeadEvent{}, core.ChainSideEvent{}, core.TxPreEvent{})
- out:
- for {
- select {
- case event := <-events.Chan():
- switch ev := event.(type) {
- case core.ChainHeadEvent:
- self.commitNewWork()
- case core.ChainSideEvent:
- self.uncleMu.Lock()
- self.possibleUncles[ev.Block.Hash()] = ev.Block
- self.uncleMu.Unlock()
- case core.TxPreEvent:
- if atomic.LoadInt32(&self.mining) == 0 {
- self.commitNewWork()
- }
- }
- case <-self.quit:
- break out
- }
- }
- events.Unsubscribe()
- }
- func (self *worker) wait() {
- for {
- for block := range self.recv {
- atomic.AddInt32(&self.atWork, -1)
- if block == nil {
- continue
- }
- if _, err := self.chain.InsertChain(types.Blocks{block}); err == nil {
- for _, uncle := range block.Uncles() {
- delete(self.possibleUncles, uncle.Hash())
- }
- self.mux.Post(core.NewMinedBlockEvent{block})
- glog.V(logger.Info).Infof("🔨 Mined block #%v", block.Number())
- jsonlogger.LogJson(&logger.EthMinerNewBlock{
- BlockHash: block.Hash().Hex(),
- BlockNumber: block.Number(),
- ChainHeadHash: block.ParentHeaderHash.Hex(),
- BlockPrevHash: block.ParentHeaderHash.Hex(),
- })
- } else {
- self.commitNewWork()
- }
- }
- }
- }
- func (self *worker) push() {
- if atomic.LoadInt32(&self.mining) == 1 {
- self.current.block.Header().GasUsed = self.current.totalUsedGas
- self.current.block.SetRoot(self.current.state.Root())
- // push new work to agents
- for _, agent := range self.agents {
- atomic.AddInt32(&self.atWork, 1)
- if agent.Work() != nil {
- agent.Work() <- self.current.block.Copy()
- } else {
- common.Report(fmt.Sprintf("%v %T\n", agent, agent))
- }
- }
- }
- }
- func (self *worker) makeCurrent() {
- block := self.chain.NewBlock(self.coinbase)
- if block.Time() == self.chain.CurrentBlock().Time() {
- block.Header().Time++
- }
- block.Header().Extra = self.extra
- self.current = env(block, self.eth)
- for _, ancestor := range self.chain.GetAncestors(block, 7) {
- self.current.family.Add(ancestor.Hash())
- }
- parent := self.chain.GetBlock(self.current.block.ParentHash())
- self.current.coinbase.SetGasPool(core.CalcGasLimit(parent))
- }
- func (w *worker) setGasPrice(p *big.Int) {
- w.mu.Lock()
- defer w.mu.Unlock()
- // calculate the minimal gas price the miner accepts when sorting out transactions.
- const pct = int64(90)
- w.gasPrice = gasprice(p, pct)
- w.mux.Post(core.GasPriceChanged{w.gasPrice})
- }
- func (self *worker) commitNewWork() {
- self.mu.Lock()
- defer self.mu.Unlock()
- self.uncleMu.Lock()
- defer self.uncleMu.Unlock()
- self.currentMu.Lock()
- defer self.currentMu.Unlock()
- self.makeCurrent()
- transactions := self.eth.TxPool().GetTransactions()
- sort.Sort(types.TxByNonce{transactions})
- accounts, _ := self.eth.AccountManager().Accounts()
- // Keep track of transactions which return errors so they can be removed
- var (
- remove = set.New()
- tcount = 0
- ignoredTransactors = set.New()
- lowGasTransactors = set.New()
- ownedAccounts = accountAddressesSet(accounts)
- lowGasTxs types.Transactions
- )
- for _, tx := range transactions {
- // We can skip err. It has already been validated in the tx pool
- from, _ := tx.From()
- // check if it falls within margin
- if tx.GasPrice().Cmp(self.gasPrice) < 0 {
- // ignore the transaction and transactor. We ignore the transactor
- // because nonce will fail after ignoring this transaction so there's
- // no point
- lowGasTransactors.Add(from)
- glog.V(logger.Info).Infof("transaction(%x) below gas price (tx=%v ask=%v). All sequential txs from this address(%x) will be ignored\n", tx.Hash().Bytes()[:4], common.CurrencyToString(tx.GasPrice()), common.CurrencyToString(self.gasPrice), from[:4])
- }
- // Continue with the next transaction if the transaction sender is included in
- // the low gas tx set. This will also remove the tx and all sequential transaction
- // from this transactor
- if lowGasTransactors.Has(from) {
- // add tx to the low gas set. This will be removed at the end of the run
- // owned accounts are ignored
- if !ownedAccounts.Has(from) {
- lowGasTxs = append(lowGasTxs, tx)
- }
- continue
- }
- // Move on to the next transaction when the transactor is in ignored transactions set
- // This may occur when a transaction hits the gas limit. When a gas limit is hit and
- // the transaction is processed (that could potentially be included in the block) it
- // will throw a nonce error because the previous transaction hasn't been processed.
- // Therefor we need to ignore any transaction after the ignored one.
- if ignoredTransactors.Has(from) {
- continue
- }
- self.current.state.StartRecord(tx.Hash(), common.Hash{}, 0)
- err := self.commitTransaction(tx)
- switch {
- case core.IsNonceErr(err) || core.IsInvalidTxErr(err):
- // Remove invalid transactions
- from, _ := tx.From()
- self.chain.TxState().RemoveNonce(from, tx.Nonce())
- remove.Add(tx.Hash())
- if glog.V(logger.Detail) {
- glog.Infof("TX (%x) failed, will be removed: %v\n", tx.Hash().Bytes()[:4], err)
- }
- case state.IsGasLimitErr(err):
- from, _ := tx.From()
- // ignore the transactor so no nonce errors will be thrown for this account
- // next time the worker is run, they'll be picked up again.
- ignoredTransactors.Add(from)
- glog.V(logger.Detail).Infof("Gas limit reached for (%x) in this block. Continue to try smaller txs\n", from[:4])
- default:
- tcount++
- }
- }
- self.eth.TxPool().RemoveTransactions(lowGasTxs)
- var (
- uncles []*types.Header
- badUncles []common.Hash
- )
- for hash, uncle := range self.possibleUncles {
- if len(uncles) == 2 {
- break
- }
- if err := self.commitUncle(uncle.Header()); err != nil {
- if glog.V(logger.Ridiculousness) {
- glog.V(logger.Detail).Infof("Bad uncle found and will be removed (%x)\n", hash[:4])
- glog.V(logger.Detail).Infoln(uncle)
- }
- badUncles = append(badUncles, hash)
- } else {
- glog.V(logger.Debug).Infof("commiting %x as uncle\n", hash[:4])
- uncles = append(uncles, uncle.Header())
- }
- }
- // We only care about logging if we're actually mining
- if atomic.LoadInt32(&self.mining) == 1 {
- glog.V(logger.Info).Infof("commit new work on block %v with %d txs & %d uncles\n", self.current.block.Number(), tcount, len(uncles))
- }
- for _, hash := range badUncles {
- delete(self.possibleUncles, hash)
- }
- self.current.block.SetUncles(uncles)
- core.AccumulateRewards(self.current.state, self.current.block)
- self.current.state.Update()
- self.push()
- }
- var (
- inclusionReward = new(big.Int).Div(core.BlockReward, big.NewInt(32))
- _uncleReward = new(big.Int).Mul(core.BlockReward, big.NewInt(15))
- uncleReward = new(big.Int).Div(_uncleReward, big.NewInt(16))
- )
- func (self *worker) commitUncle(uncle *types.Header) error {
- if self.current.uncles.Has(uncle.Hash()) {
- // Error not unique
- return core.UncleError("Uncle not unique")
- }
- self.current.uncles.Add(uncle.Hash())
- if !self.current.family.Has(uncle.ParentHash) {
- return core.UncleError(fmt.Sprintf("Uncle's parent unknown (%x)", uncle.ParentHash[0:4]))
- }
- if self.current.family.Has(uncle.Hash()) {
- return core.UncleError(fmt.Sprintf("Uncle already in family (%x)", uncle.Hash()))
- }
- return nil
- }
- func (self *worker) commitTransaction(tx *types.Transaction) error {
- snap := self.current.state.Copy()
- receipt, _, err := self.proc.ApplyTransaction(self.current.coinbase, self.current.state, self.current.block, tx, self.current.totalUsedGas, true)
- if err != nil && (core.IsNonceErr(err) || state.IsGasLimitErr(err) || core.IsInvalidTxErr(err)) {
- self.current.state.Set(snap)
- return err
- }
- self.current.block.AddTransaction(tx)
- self.current.block.AddReceipt(receipt)
- return nil
- }
- func (self *worker) HashRate() int64 {
- var tot int64
- for _, agent := range self.agents {
- tot += agent.GetHashRate()
- }
- return tot
- }
- // gasprice calculates a reduced gas price based on the pct
- // XXX Use big.Rat?
- func gasprice(price *big.Int, pct int64) *big.Int {
- p := new(big.Int).Set(price)
- p.Div(p, big.NewInt(100))
- p.Mul(p, big.NewInt(pct))
- return p
- }
- func accountAddressesSet(accounts []accounts.Account) *set.Set {
- accountSet := set.New()
- for _, account := range accounts {
- accountSet.Add(common.BytesToAddress(account.Address))
- }
- return accountSet
- }
|