worker.go 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308
  1. package miner
  2. import (
  3. "fmt"
  4. "math/big"
  5. "sort"
  6. "sync"
  7. "time"
  8. "github.com/ethereum/go-ethereum/common"
  9. "github.com/ethereum/go-ethereum/core"
  10. "github.com/ethereum/go-ethereum/core/types"
  11. "github.com/ethereum/go-ethereum/event"
  12. "github.com/ethereum/go-ethereum/logger"
  13. "github.com/ethereum/go-ethereum/pow"
  14. "github.com/ethereum/go-ethereum/state"
  15. "gopkg.in/fatih/set.v0"
  16. )
  17. var jsonlogger = logger.NewJsonLogger()
  18. type environment struct {
  19. totalUsedGas *big.Int
  20. state *state.StateDB
  21. coinbase *state.StateObject
  22. block *types.Block
  23. family *set.Set
  24. uncles *set.Set
  25. }
  26. func env(block *types.Block, eth core.Backend) *environment {
  27. state := state.New(block.Root(), eth.StateDb())
  28. env := &environment{
  29. totalUsedGas: new(big.Int),
  30. state: state,
  31. block: block,
  32. family: set.New(),
  33. uncles: set.New(),
  34. coinbase: state.GetOrNewStateObject(block.Coinbase()),
  35. }
  36. return env
  37. }
  38. type Work struct {
  39. Number uint64
  40. Nonce uint64
  41. MixDigest []byte
  42. SeedHash []byte
  43. }
  44. type Agent interface {
  45. Work() chan<- *types.Block
  46. SetWorkCh(chan<- Work)
  47. Stop()
  48. Start()
  49. GetHashRate() int64
  50. }
  51. type worker struct {
  52. mu sync.Mutex
  53. agents []Agent
  54. recv chan Work
  55. mux *event.TypeMux
  56. quit chan struct{}
  57. pow pow.PoW
  58. eth core.Backend
  59. chain *core.ChainManager
  60. proc *core.BlockProcessor
  61. coinbase common.Address
  62. current *environment
  63. uncleMu sync.Mutex
  64. possibleUncles map[common.Hash]*types.Block
  65. mining bool
  66. }
  67. func newWorker(coinbase common.Address, eth core.Backend) *worker {
  68. return &worker{
  69. eth: eth,
  70. mux: eth.EventMux(),
  71. recv: make(chan Work),
  72. chain: eth.ChainManager(),
  73. proc: eth.BlockProcessor(),
  74. possibleUncles: make(map[common.Hash]*types.Block),
  75. coinbase: coinbase,
  76. }
  77. }
  78. func (self *worker) start() {
  79. self.mining = true
  80. self.quit = make(chan struct{})
  81. // spin up agents
  82. for _, agent := range self.agents {
  83. agent.Start()
  84. }
  85. go self.update()
  86. go self.wait()
  87. }
  88. func (self *worker) stop() {
  89. self.mining = false
  90. close(self.quit)
  91. }
  92. func (self *worker) register(agent Agent) {
  93. self.agents = append(self.agents, agent)
  94. agent.SetWorkCh(self.recv)
  95. }
  96. func (self *worker) update() {
  97. events := self.mux.Subscribe(core.ChainHeadEvent{}, core.NewMinedBlockEvent{}, core.ChainSideEvent{})
  98. timer := time.NewTicker(2 * time.Second)
  99. out:
  100. for {
  101. select {
  102. case event := <-events.Chan():
  103. switch ev := event.(type) {
  104. case core.ChainHeadEvent:
  105. if self.current.block != ev.Block {
  106. self.commitNewWork()
  107. }
  108. case core.NewMinedBlockEvent:
  109. self.commitNewWork()
  110. case core.ChainSplitEvent:
  111. self.uncleMu.Lock()
  112. self.possibleUncles[ev.Block.Hash()] = ev.Block
  113. self.uncleMu.Unlock()
  114. }
  115. case <-self.quit:
  116. // stop all agents
  117. for _, agent := range self.agents {
  118. agent.Stop()
  119. }
  120. break out
  121. case <-timer.C:
  122. minerlogger.Infoln("Hash rate:", self.HashRate(), "Khash")
  123. }
  124. }
  125. events.Unsubscribe()
  126. }
  127. func (self *worker) addUncle(uncle *types.Block) {
  128. }
  129. func (self *worker) wait() {
  130. for {
  131. for work := range self.recv {
  132. // Someone Successfully Mined!
  133. block := self.current.block
  134. if block.Number().Uint64() == work.Number && block.Nonce() == 0 {
  135. self.current.block.SetNonce(work.Nonce)
  136. self.current.block.Header().MixDigest = common.BytesToHash(work.MixDigest)
  137. jsonlogger.LogJson(&logger.EthMinerNewBlock{
  138. BlockHash: block.Hash().Hex(),
  139. BlockNumber: block.Number(),
  140. ChainHeadHash: block.ParentHeaderHash.Hex(),
  141. BlockPrevHash: block.ParentHeaderHash.Hex(),
  142. })
  143. if err := self.chain.InsertChain(types.Blocks{self.current.block}); err == nil {
  144. self.mux.Post(core.NewMinedBlockEvent{self.current.block})
  145. } else {
  146. self.commitNewWork()
  147. }
  148. }
  149. break
  150. }
  151. }
  152. }
  153. func (self *worker) push() {
  154. if self.mining {
  155. self.current.block.Header().GasUsed = self.current.totalUsedGas
  156. self.current.block.SetRoot(self.current.state.Root())
  157. // push new work to agents
  158. for _, agent := range self.agents {
  159. agent.Work() <- self.current.block
  160. }
  161. }
  162. }
  163. func (self *worker) commitNewWork() {
  164. self.mu.Lock()
  165. defer self.mu.Unlock()
  166. block := self.chain.NewBlock(self.coinbase)
  167. self.current = env(block, self.eth)
  168. for _, ancestor := range self.chain.GetAncestors(block, 7) {
  169. self.current.family.Add(ancestor.Hash())
  170. }
  171. parent := self.chain.GetBlock(self.current.block.ParentHash())
  172. self.current.coinbase.SetGasPool(core.CalcGasLimit(parent, self.current.block))
  173. transactions := self.eth.TxPool().GetTransactions()
  174. sort.Sort(types.TxByNonce{transactions})
  175. minerlogger.Infof("committing new work with %d txs\n", len(transactions))
  176. // Keep track of transactions which return errors so they can be removed
  177. var remove types.Transactions
  178. gasLimit:
  179. for i, tx := range transactions {
  180. err := self.commitTransaction(tx)
  181. switch {
  182. case core.IsNonceErr(err):
  183. fallthrough
  184. case core.IsInvalidTxErr(err):
  185. // Remove invalid transactions
  186. from, _ := tx.From()
  187. self.chain.TxState().RemoveNonce(from, tx.Nonce())
  188. remove = append(remove, tx)
  189. minerlogger.Infof("TX (%x) failed, will be removed: %v\n", tx.Hash().Bytes()[:4], err)
  190. minerlogger.Infoln(tx)
  191. case state.IsGasLimitErr(err):
  192. minerlogger.Infof("Gas limit reached for block. %d TXs included in this block\n", i)
  193. // Break on gas limit
  194. break gasLimit
  195. }
  196. }
  197. self.eth.TxPool().RemoveSet(remove)
  198. ucount := 0
  199. for hash, uncle := range self.possibleUncles {
  200. if ucount == 2 {
  201. break
  202. }
  203. if err := self.commitUncle(uncle.Header()); err != nil {
  204. minerlogger.Infof("Bad uncle found and will be removed (%x)\n", hash[:4])
  205. minerlogger.Debugln(uncle)
  206. } else {
  207. minerlogger.Infof("Commiting %x as uncle\n", hash[:4])
  208. ucount++
  209. }
  210. }
  211. minerlogger.Infoln("Included %d uncle(s)")
  212. self.current.state.AddBalance(self.coinbase, core.BlockReward)
  213. self.current.state.Update(common.Big0)
  214. self.push()
  215. }
  216. var (
  217. inclusionReward = new(big.Int).Div(core.BlockReward, big.NewInt(32))
  218. _uncleReward = new(big.Int).Mul(core.BlockReward, big.NewInt(15))
  219. uncleReward = new(big.Int).Div(_uncleReward, big.NewInt(16))
  220. )
  221. func (self *worker) commitUncle(uncle *types.Header) error {
  222. if self.current.uncles.Has(uncle.Hash()) {
  223. // Error not unique
  224. return core.UncleError("Uncle not unique")
  225. }
  226. self.current.uncles.Add(uncle.Hash())
  227. if !self.current.family.Has(uncle.ParentHash) {
  228. return core.UncleError(fmt.Sprintf("Uncle's parent unknown (%x)", uncle.ParentHash[0:4]))
  229. }
  230. if self.current.family.Has(uncle.Hash()) {
  231. return core.UncleError(fmt.Sprintf("Uncle already in family (%x)", uncle.Hash()))
  232. }
  233. uncleAccount := self.current.state.GetAccount(uncle.Coinbase)
  234. uncleAccount.AddBalance(uncleReward)
  235. self.current.coinbase.AddBalance(uncleReward)
  236. return nil
  237. }
  238. func (self *worker) commitTransaction(tx *types.Transaction) error {
  239. snap := self.current.state.Copy()
  240. receipt, _, err := self.proc.ApplyTransaction(self.current.coinbase, self.current.state, self.current.block, tx, self.current.totalUsedGas, true)
  241. if err != nil && (core.IsNonceErr(err) || state.IsGasLimitErr(err) || core.IsInvalidTxErr(err)) {
  242. self.current.state.Set(snap)
  243. return err
  244. }
  245. self.current.block.AddTransaction(tx)
  246. self.current.block.AddReceipt(receipt)
  247. return nil
  248. }
  249. func (self *worker) HashRate() int64 {
  250. var tot int64
  251. for _, agent := range self.agents {
  252. tot += agent.GetHashRate()
  253. }
  254. return tot
  255. }