worker.go 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251
  1. package miner
  2. import (
  3. "fmt"
  4. "math/big"
  5. "sort"
  6. "sync"
  7. "github.com/ethereum/go-ethereum/core"
  8. "github.com/ethereum/go-ethereum/core/types"
  9. "github.com/ethereum/go-ethereum/ethutil"
  10. "github.com/ethereum/go-ethereum/event"
  11. "github.com/ethereum/go-ethereum/pow"
  12. "github.com/ethereum/go-ethereum/state"
  13. "gopkg.in/fatih/set.v0"
  14. )
  15. type environment struct {
  16. totalUsedGas *big.Int
  17. state *state.StateDB
  18. coinbase *state.StateObject
  19. block *types.Block
  20. ancestors *set.Set
  21. uncles *set.Set
  22. }
  23. func env(block *types.Block, eth core.Backend) *environment {
  24. state := state.New(block.Root(), eth.Db())
  25. env := &environment{
  26. totalUsedGas: new(big.Int),
  27. state: state,
  28. block: block,
  29. ancestors: set.New(),
  30. uncles: set.New(),
  31. coinbase: state.GetOrNewStateObject(block.Coinbase()),
  32. }
  33. for _, ancestor := range eth.ChainManager().GetAncestors(block, 7) {
  34. env.ancestors.Add(string(ancestor.Hash()))
  35. }
  36. return env
  37. }
  38. type Work struct {
  39. Number uint64
  40. Nonce []byte
  41. MixDigest []byte
  42. SeedHash []byte
  43. }
  44. type Agent interface {
  45. Work() chan<- *types.Block
  46. SetWorkCh(chan<- Work)
  47. Stop()
  48. Start()
  49. Pow() pow.PoW
  50. }
  51. type worker struct {
  52. mu sync.Mutex
  53. agents []Agent
  54. recv chan Work
  55. mux *event.TypeMux
  56. quit chan struct{}
  57. pow pow.PoW
  58. eth core.Backend
  59. chain *core.ChainManager
  60. proc *core.BlockProcessor
  61. coinbase []byte
  62. current *environment
  63. mining bool
  64. }
  65. func newWorker(coinbase []byte, eth core.Backend) *worker {
  66. return &worker{
  67. eth: eth,
  68. mux: eth.EventMux(),
  69. recv: make(chan Work),
  70. chain: eth.ChainManager(),
  71. proc: eth.BlockProcessor(),
  72. coinbase: coinbase,
  73. }
  74. }
  75. func (self *worker) start() {
  76. self.mining = true
  77. self.quit = make(chan struct{})
  78. // spin up agents
  79. for _, agent := range self.agents {
  80. agent.Start()
  81. }
  82. go self.update()
  83. go self.wait()
  84. }
  85. func (self *worker) stop() {
  86. self.mining = false
  87. close(self.quit)
  88. }
  89. func (self *worker) register(agent Agent) {
  90. self.agents = append(self.agents, agent)
  91. agent.SetWorkCh(self.recv)
  92. }
  93. func (self *worker) update() {
  94. events := self.mux.Subscribe(core.ChainEvent{}, core.NewMinedBlockEvent{})
  95. out:
  96. for {
  97. select {
  98. case event := <-events.Chan():
  99. switch ev := event.(type) {
  100. case core.ChainEvent:
  101. if self.current.block != ev.Block {
  102. self.commitNewWork()
  103. }
  104. case core.NewMinedBlockEvent:
  105. self.commitNewWork()
  106. }
  107. case <-self.quit:
  108. // stop all agents
  109. for _, agent := range self.agents {
  110. agent.Stop()
  111. }
  112. break out
  113. }
  114. }
  115. events.Unsubscribe()
  116. }
  117. func (self *worker) wait() {
  118. for {
  119. for work := range self.recv {
  120. // Someone Successfully Mined!
  121. block := self.current.block
  122. if block.Number().Uint64() == work.Number && block.Nonce() == nil {
  123. self.current.block.Header().Nonce = work.Nonce
  124. self.current.block.Header().MixDigest = work.MixDigest
  125. self.current.block.Header().SeedHash = work.SeedHash
  126. if err := self.chain.InsertChain(types.Blocks{self.current.block}); err == nil {
  127. self.mux.Post(core.NewMinedBlockEvent{self.current.block})
  128. } else {
  129. self.commitNewWork()
  130. }
  131. }
  132. break
  133. }
  134. }
  135. }
  136. func (self *worker) push() {
  137. if self.mining {
  138. self.current.block.Header().GasUsed = self.current.totalUsedGas
  139. self.current.block.SetRoot(self.current.state.Root())
  140. // push new work to agents
  141. for _, agent := range self.agents {
  142. agent.Work() <- self.current.block
  143. }
  144. }
  145. }
  146. func (self *worker) commitNewWork() {
  147. self.mu.Lock()
  148. defer self.mu.Unlock()
  149. self.current = env(self.chain.NewBlock(self.coinbase), self.eth)
  150. parent := self.chain.GetBlock(self.current.block.ParentHash())
  151. self.current.coinbase.SetGasPool(core.CalcGasLimit(parent, self.current.block))
  152. transactions := self.eth.TxPool().GetTransactions()
  153. sort.Sort(types.TxByNonce{transactions})
  154. minerlogger.Infof("committing new work with %d txs\n", len(transactions))
  155. // Keep track of transactions which return errors so they can be removed
  156. var remove types.Transactions
  157. gasLimit:
  158. for _, tx := range transactions {
  159. err := self.commitTransaction(tx)
  160. switch {
  161. case core.IsNonceErr(err):
  162. // Remove invalid transactions
  163. remove = append(remove, tx)
  164. case state.IsGasLimitErr(err):
  165. // Break on gas limit
  166. break gasLimit
  167. }
  168. if err != nil {
  169. minerlogger.Infoln(err)
  170. }
  171. }
  172. self.eth.TxPool().RemoveSet(remove)
  173. self.current.coinbase.AddBalance(core.BlockReward)
  174. self.current.state.Update(ethutil.Big0)
  175. self.push()
  176. }
  177. var (
  178. inclusionReward = new(big.Int).Div(core.BlockReward, big.NewInt(32))
  179. _uncleReward = new(big.Int).Mul(core.BlockReward, big.NewInt(15))
  180. uncleReward = new(big.Int).Div(_uncleReward, big.NewInt(16))
  181. )
  182. func (self *worker) commitUncle(uncle *types.Header) error {
  183. if self.current.uncles.Has(string(uncle.Hash())) {
  184. // Error not unique
  185. return core.UncleError("Uncle not unique")
  186. }
  187. self.current.uncles.Add(string(uncle.Hash()))
  188. if !self.current.ancestors.Has(string(uncle.ParentHash)) {
  189. return core.UncleError(fmt.Sprintf("Uncle's parent unknown (%x)", uncle.ParentHash[0:4]))
  190. }
  191. if !self.pow.Verify(types.NewBlockWithHeader(uncle)) {
  192. return core.ValidationError("Uncle's nonce is invalid (= %v)", ethutil.Bytes2Hex(uncle.Nonce))
  193. }
  194. uncleAccount := self.current.state.GetAccount(uncle.Coinbase)
  195. uncleAccount.AddBalance(uncleReward)
  196. self.current.coinbase.AddBalance(uncleReward)
  197. return nil
  198. }
  199. func (self *worker) commitTransaction(tx *types.Transaction) error {
  200. //fmt.Printf("proc %x %v\n", tx.Hash()[:3], tx.Nonce())
  201. receipt, _, err := self.proc.ApplyTransaction(self.current.coinbase, self.current.state, self.current.block, tx, self.current.totalUsedGas, true)
  202. if err != nil && (core.IsNonceErr(err) || state.IsGasLimitErr(err)) {
  203. return err
  204. }
  205. self.current.block.AddTransaction(tx)
  206. self.current.block.AddReceipt(receipt)
  207. return nil
  208. }