worker.go 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385
  1. package miner
  2. import (
  3. "fmt"
  4. "math/big"
  5. "sort"
  6. "sync"
  7. "sync/atomic"
  8. "github.com/ethereum/go-ethereum/common"
  9. "github.com/ethereum/go-ethereum/core"
  10. "github.com/ethereum/go-ethereum/core/state"
  11. "github.com/ethereum/go-ethereum/core/types"
  12. "github.com/ethereum/go-ethereum/event"
  13. "github.com/ethereum/go-ethereum/logger"
  14. "github.com/ethereum/go-ethereum/logger/glog"
  15. "github.com/ethereum/go-ethereum/pow"
  16. "gopkg.in/fatih/set.v0"
  17. )
  18. var jsonlogger = logger.NewJsonLogger()
  19. type environment struct {
  20. totalUsedGas *big.Int
  21. state *state.StateDB
  22. coinbase *state.StateObject
  23. block *types.Block
  24. family *set.Set
  25. uncles *set.Set
  26. }
  27. func env(block *types.Block, eth core.Backend) *environment {
  28. state := state.New(block.Root(), eth.StateDb())
  29. env := &environment{
  30. totalUsedGas: new(big.Int),
  31. state: state,
  32. block: block,
  33. family: set.New(),
  34. uncles: set.New(),
  35. coinbase: state.GetOrNewStateObject(block.Coinbase()),
  36. }
  37. return env
  38. }
  39. type Work struct {
  40. Number uint64
  41. Nonce uint64
  42. MixDigest []byte
  43. SeedHash []byte
  44. }
  45. type Agent interface {
  46. Work() chan<- *types.Block
  47. SetReturnCh(chan<- *types.Block)
  48. Stop()
  49. Start()
  50. GetHashRate() int64
  51. }
  52. type worker struct {
  53. mu sync.Mutex
  54. agents []Agent
  55. recv chan *types.Block
  56. mux *event.TypeMux
  57. quit chan struct{}
  58. pow pow.PoW
  59. eth core.Backend
  60. chain *core.ChainManager
  61. proc *core.BlockProcessor
  62. coinbase common.Address
  63. extra []byte
  64. currentMu sync.Mutex
  65. current *environment
  66. uncleMu sync.Mutex
  67. possibleUncles map[common.Hash]*types.Block
  68. txQueueMu sync.Mutex
  69. txQueue map[common.Hash]*types.Transaction
  70. // atomic status counters
  71. mining int32
  72. atWork int32
  73. }
  74. func newWorker(coinbase common.Address, eth core.Backend) *worker {
  75. worker := &worker{
  76. eth: eth,
  77. mux: eth.EventMux(),
  78. recv: make(chan *types.Block),
  79. chain: eth.ChainManager(),
  80. proc: eth.BlockProcessor(),
  81. possibleUncles: make(map[common.Hash]*types.Block),
  82. coinbase: coinbase,
  83. txQueue: make(map[common.Hash]*types.Transaction),
  84. quit: make(chan struct{}),
  85. }
  86. go worker.update()
  87. go worker.wait()
  88. worker.commitNewWork()
  89. return worker
  90. }
  91. func (self *worker) pendingState() *state.StateDB {
  92. self.currentMu.Lock()
  93. defer self.currentMu.Unlock()
  94. return self.current.state
  95. }
  96. func (self *worker) pendingBlock() *types.Block {
  97. self.currentMu.Lock()
  98. defer self.currentMu.Unlock()
  99. return self.current.block
  100. }
  101. func (self *worker) start() {
  102. // spin up agents
  103. for _, agent := range self.agents {
  104. agent.Start()
  105. }
  106. atomic.StoreInt32(&self.mining, 1)
  107. }
  108. func (self *worker) stop() {
  109. if atomic.LoadInt32(&self.mining) == 1 {
  110. // stop all agents
  111. for _, agent := range self.agents {
  112. agent.Stop()
  113. }
  114. }
  115. atomic.StoreInt32(&self.mining, 0)
  116. atomic.StoreInt32(&self.atWork, 0)
  117. }
  118. func (self *worker) register(agent Agent) {
  119. self.agents = append(self.agents, agent)
  120. agent.SetReturnCh(self.recv)
  121. }
  122. func (self *worker) update() {
  123. events := self.mux.Subscribe(core.ChainHeadEvent{}, core.ChainSideEvent{}, core.TxPreEvent{})
  124. out:
  125. for {
  126. select {
  127. case event := <-events.Chan():
  128. switch ev := event.(type) {
  129. case core.ChainHeadEvent:
  130. self.commitNewWork()
  131. case core.ChainSideEvent:
  132. self.uncleMu.Lock()
  133. self.possibleUncles[ev.Block.Hash()] = ev.Block
  134. self.uncleMu.Unlock()
  135. case core.TxPreEvent:
  136. if atomic.LoadInt32(&self.mining) == 0 {
  137. self.commitNewWork()
  138. }
  139. }
  140. case <-self.quit:
  141. break out
  142. }
  143. }
  144. events.Unsubscribe()
  145. }
  146. func (self *worker) wait() {
  147. for {
  148. for block := range self.recv {
  149. atomic.AddInt32(&self.atWork, -1)
  150. if block == nil {
  151. continue
  152. }
  153. if err := self.chain.InsertChain(types.Blocks{block}); err == nil {
  154. for _, uncle := range block.Uncles() {
  155. delete(self.possibleUncles, uncle.Hash())
  156. }
  157. self.mux.Post(core.NewMinedBlockEvent{block})
  158. glog.V(logger.Info).Infof("🔨 Mined block #%v", block.Number())
  159. jsonlogger.LogJson(&logger.EthMinerNewBlock{
  160. BlockHash: block.Hash().Hex(),
  161. BlockNumber: block.Number(),
  162. ChainHeadHash: block.ParentHeaderHash.Hex(),
  163. BlockPrevHash: block.ParentHeaderHash.Hex(),
  164. })
  165. } else {
  166. self.commitNewWork()
  167. }
  168. }
  169. }
  170. }
  171. func (self *worker) push() {
  172. if atomic.LoadInt32(&self.mining) == 1 {
  173. self.current.block.Header().GasUsed = self.current.totalUsedGas
  174. self.current.block.SetRoot(self.current.state.Root())
  175. // push new work to agents
  176. for _, agent := range self.agents {
  177. atomic.AddInt32(&self.atWork, 1)
  178. if agent.Work() != nil {
  179. agent.Work() <- self.current.block.Copy()
  180. } else {
  181. common.Report(fmt.Sprintf("%v %T\n", agent, agent))
  182. }
  183. }
  184. }
  185. }
  186. func (self *worker) makeCurrent() {
  187. block := self.chain.NewBlock(self.coinbase)
  188. if block.Time() == self.chain.CurrentBlock().Time() {
  189. block.Header().Time++
  190. }
  191. block.Header().Extra = self.extra
  192. self.current = env(block, self.eth)
  193. for _, ancestor := range self.chain.GetAncestors(block, 7) {
  194. self.current.family.Add(ancestor.Hash())
  195. }
  196. parent := self.chain.GetBlock(self.current.block.ParentHash())
  197. self.current.coinbase.SetGasPool(core.CalcGasLimit(parent))
  198. }
  199. func (self *worker) commitNewWork() {
  200. self.mu.Lock()
  201. defer self.mu.Unlock()
  202. self.uncleMu.Lock()
  203. defer self.uncleMu.Unlock()
  204. self.currentMu.Lock()
  205. defer self.currentMu.Unlock()
  206. self.makeCurrent()
  207. transactions := self.eth.TxPool().GetTransactions()
  208. sort.Sort(types.TxByNonce{transactions})
  209. // Keep track of transactions which return errors so they can be removed
  210. var (
  211. remove = set.New()
  212. tcount = 0
  213. ignoredTransactors = set.New()
  214. )
  215. for _, tx := range transactions {
  216. // We can skip err. It has already been validated in the tx pool
  217. from, _ := tx.From()
  218. // Move on to the next transaction when the transactor is in ignored transactions set
  219. // This may occur when a transaction hits the gas limit. When a gas limit is hit and
  220. // the transaction is processed (that could potentially be included in the block) it
  221. // will throw a nonce error because the previous transaction hasn't been processed.
  222. // Therefor we need to ignore any transaction after the ignored one.
  223. if ignoredTransactors.Has(from) {
  224. continue
  225. }
  226. self.current.state.StartRecord(tx.Hash(), common.Hash{}, 0)
  227. err := self.commitTransaction(tx)
  228. switch {
  229. case core.IsNonceErr(err) || core.IsInvalidTxErr(err):
  230. // Remove invalid transactions
  231. from, _ := tx.From()
  232. self.chain.TxState().RemoveNonce(from, tx.Nonce())
  233. remove.Add(tx.Hash())
  234. if glog.V(logger.Detail) {
  235. glog.Infof("TX (%x) failed, will be removed: %v\n", tx.Hash().Bytes()[:4], err)
  236. }
  237. case state.IsGasLimitErr(err):
  238. from, _ := tx.From()
  239. // ignore the transactor so no nonce errors will be thrown for this account
  240. // next time the worker is run, they'll be picked up again.
  241. ignoredTransactors.Add(from)
  242. glog.V(logger.Detail).Infof("Gas limit reached for (%x) in this block. Continue to try smaller txs\n", from[:4])
  243. default:
  244. tcount++
  245. }
  246. }
  247. var (
  248. uncles []*types.Header
  249. badUncles []common.Hash
  250. )
  251. for hash, uncle := range self.possibleUncles {
  252. if len(uncles) == 2 {
  253. break
  254. }
  255. if err := self.commitUncle(uncle.Header()); err != nil {
  256. if glog.V(logger.Ridiculousness) {
  257. glog.V(logger.Detail).Infof("Bad uncle found and will be removed (%x)\n", hash[:4])
  258. glog.V(logger.Detail).Infoln(uncle)
  259. }
  260. badUncles = append(badUncles, hash)
  261. } else {
  262. glog.V(logger.Debug).Infof("commiting %x as uncle\n", hash[:4])
  263. uncles = append(uncles, uncle.Header())
  264. }
  265. }
  266. // We only care about logging if we're actually mining
  267. if atomic.LoadInt32(&self.mining) == 1 {
  268. glog.V(logger.Info).Infof("commit new work on block %v with %d txs & %d uncles\n", self.current.block.Number(), tcount, len(uncles))
  269. }
  270. for _, hash := range badUncles {
  271. delete(self.possibleUncles, hash)
  272. }
  273. self.current.block.SetUncles(uncles)
  274. core.AccumulateRewards(self.current.state, self.current.block)
  275. self.current.state.Update()
  276. self.push()
  277. }
  278. var (
  279. inclusionReward = new(big.Int).Div(core.BlockReward, big.NewInt(32))
  280. _uncleReward = new(big.Int).Mul(core.BlockReward, big.NewInt(15))
  281. uncleReward = new(big.Int).Div(_uncleReward, big.NewInt(16))
  282. )
  283. func (self *worker) commitUncle(uncle *types.Header) error {
  284. if self.current.uncles.Has(uncle.Hash()) {
  285. // Error not unique
  286. return core.UncleError("Uncle not unique")
  287. }
  288. self.current.uncles.Add(uncle.Hash())
  289. if !self.current.family.Has(uncle.ParentHash) {
  290. return core.UncleError(fmt.Sprintf("Uncle's parent unknown (%x)", uncle.ParentHash[0:4]))
  291. }
  292. if self.current.family.Has(uncle.Hash()) {
  293. return core.UncleError(fmt.Sprintf("Uncle already in family (%x)", uncle.Hash()))
  294. }
  295. return nil
  296. }
  297. func (self *worker) commitTransaction(tx *types.Transaction) error {
  298. snap := self.current.state.Copy()
  299. receipt, _, err := self.proc.ApplyTransaction(self.current.coinbase, self.current.state, self.current.block, tx, self.current.totalUsedGas, true)
  300. if err != nil && (core.IsNonceErr(err) || state.IsGasLimitErr(err) || core.IsInvalidTxErr(err)) {
  301. self.current.state.Set(snap)
  302. return err
  303. }
  304. self.current.block.AddTransaction(tx)
  305. self.current.block.AddReceipt(receipt)
  306. return nil
  307. }
  308. func (self *worker) HashRate() int64 {
  309. var tot int64
  310. for _, agent := range self.agents {
  311. tot += agent.GetHashRate()
  312. }
  313. return tot
  314. }