瀏覽代碼

all: use (blocking) event package instead of ethreact

Felix Lange 11 年之前
父節點
當前提交
36cdab2068
共有 8 個文件被更改,包括 144 次插入155 次删除
  1. 3 4
      ethchain/dagger.go
  2. 10 0
      ethchain/events.go
  3. 20 35
      ethchain/state_manager.go
  4. 2 1
      ethchain/transaction_pool.go
  5. 27 47
      ethereum.go
  6. 70 67
      ethminer/miner.go
  7. 11 0
      events.go
  8. 1 1
      peer.go

+ 3 - 4
ethchain/dagger.go

@@ -8,7 +8,6 @@ import (
 
 	"github.com/ethereum/eth-go/ethcrypto"
 	"github.com/ethereum/eth-go/ethlog"
-	"github.com/ethereum/eth-go/ethreact"
 	"github.com/ethereum/eth-go/ethutil"
 	"github.com/obscuren/sha3"
 )
@@ -16,7 +15,7 @@ import (
 var powlogger = ethlog.NewLogger("POW")
 
 type PoW interface {
-	Search(block *Block, reactChan chan ethreact.Event) []byte
+	Search(block *Block, stop <-chan struct{}) []byte
 	Verify(hash []byte, diff *big.Int, nonce []byte) bool
 	GetHashrate() int64
 	Turbo(bool)
@@ -36,7 +35,7 @@ func (pow *EasyPow) Turbo(on bool) {
 	pow.turbo = on
 }
 
-func (pow *EasyPow) Search(block *Block, reactChan chan ethreact.Event) []byte {
+func (pow *EasyPow) Search(block *Block, stop <-chan struct{}) []byte {
 	r := rand.New(rand.NewSource(time.Now().UnixNano()))
 	hash := block.HashNoNonce()
 	diff := block.Difficulty
@@ -46,7 +45,7 @@ func (pow *EasyPow) Search(block *Block, reactChan chan ethreact.Event) []byte {
 
 	for {
 		select {
-		case <-reactChan:
+		case <-stop:
 			powlogger.Infoln("Breaking from mining")
 			return nil
 		default:

+ 10 - 0
ethchain/events.go

@@ -0,0 +1,10 @@
+package ethchain
+
+type TxEvent struct {
+	Type int // TxPre || TxPost
+	Tx   *Transaction
+}
+
+type NewBlockEvent struct {
+	Block *Block
+}

+ 20 - 35
ethchain/state_manager.go

@@ -11,11 +11,10 @@ import (
 
 	"github.com/ethereum/eth-go/ethcrypto"
 	"github.com/ethereum/eth-go/ethlog"
-	"github.com/ethereum/eth-go/ethreact"
 	"github.com/ethereum/eth-go/ethstate"
 	"github.com/ethereum/eth-go/ethutil"
 	"github.com/ethereum/eth-go/ethwire"
-	"github.com/ethereum/eth-go/eventer"
+	"github.com/ethereum/eth-go/event"
 )
 
 var statelogger = ethlog.NewLogger("STATE")
@@ -37,7 +36,6 @@ type EthManager interface {
 	BlockChain() *BlockChain
 	TxPool() *TxPool
 	Broadcast(msgType ethwire.MsgType, data []interface{})
-	Reactor() *ethreact.ReactorEngine
 	PeerCount() int
 	IsMining() bool
 	IsListening() bool
@@ -45,7 +43,7 @@ type EthManager interface {
 	KeyManager() *ethcrypto.KeyManager
 	ClientIdentity() ethwire.ClientIdentity
 	Db() ethutil.Database
-	Eventer() *eventer.EventMachine
+	EventMux() *event.TypeMux
 }
 
 type StateManager struct {
@@ -73,17 +71,15 @@ type StateManager struct {
 	// 'Process' & canonical validation.
 	lastAttemptedBlock *Block
 
-	// Quit chan
-	quit chan bool
+	events event.Subscription
 }
 
 func NewStateManager(ethereum EthManager) *StateManager {
 	sm := &StateManager{
-		mem:  make(map[string]*big.Int),
-		Pow:  &EasyPow{},
-		eth:  ethereum,
-		bc:   ethereum.BlockChain(),
-		quit: make(chan bool),
+		mem: make(map[string]*big.Int),
+		Pow: &EasyPow{},
+		eth: ethereum,
+		bc:  ethereum.BlockChain(),
 	}
 	sm.transState = ethereum.BlockChain().CurrentBlock.State().Copy()
 	sm.miningState = ethereum.BlockChain().CurrentBlock.State().Copy()
@@ -93,36 +89,25 @@ func NewStateManager(ethereum EthManager) *StateManager {
 
 func (self *StateManager) Start() {
 	statelogger.Debugln("Starting state manager")
-
+	self.events = self.eth.EventMux().Subscribe(Blocks(nil))
 	go self.updateThread()
 }
 
 func (self *StateManager) Stop() {
 	statelogger.Debugln("Stopping state manager")
-
-	close(self.quit)
+	self.events.Unsubscribe()
 }
 
 func (self *StateManager) updateThread() {
-	blockChan := self.eth.Eventer().Register("blocks")
-
-out:
-	for {
-		select {
-		case event := <-blockChan:
-			blocks := event.Data.(Blocks)
-			for _, block := range blocks {
-				err := self.Process(block, false)
-				if err != nil {
-					statelogger.Infoln(err)
-					statelogger.Debugf("Block #%v failed (%x...)\n", block.Number, block.Hash()[0:4])
-					statelogger.Debugln(block)
-					break
-				}
+	for ev := range self.events.Chan() {
+		for _, block := range ev.(Blocks) {
+			err := self.Process(block, false)
+			if err != nil {
+				statelogger.Infoln(err)
+				statelogger.Debugf("Block #%v failed (%x...)\n", block.Number, block.Hash()[0:4])
+				statelogger.Debugln(block)
+				break
 			}
-
-		case <-self.quit:
-			break out
 		}
 	}
 }
@@ -202,7 +187,7 @@ done:
 		}
 
 		// Notify all subscribers
-		self.eth.Reactor().Post("newTx:post", tx)
+		self.eth.EventMux().Post(TxEvent{TxPost, tx})
 
 		receipts = append(receipts, receipt)
 		handled = append(handled, tx)
@@ -293,7 +278,7 @@ func (sm *StateManager) Process(block *Block, dontReact bool) (err error) {
 
 		statelogger.Infof("Imported block #%d (%x...)\n", block.Number, block.Hash()[0:4])
 		if dontReact == false {
-			sm.eth.Reactor().Post("newBlock", block)
+			sm.eth.EventMux().Post(NewBlockEvent{block})
 
 			state.Manifest().Reset()
 		}
@@ -434,7 +419,7 @@ func (sm *StateManager) createBloomFilter(state *ethstate.State) *BloomFilter {
 		bloomf.Set(msg.From)
 	}
 
-	sm.eth.Reactor().Post("messages", state.Manifest().Messages)
+	sm.eth.EventMux().Post(state.Manifest().Messages)
 
 	return bloomf
 }

+ 2 - 1
ethchain/transaction_pool.go

@@ -24,6 +24,7 @@ type TxMsgTy byte
 const (
 	TxPre = iota
 	TxPost
+
 	minGasPrice = 1000000
 )
 
@@ -160,7 +161,7 @@ out:
 				txplogger.Debugf("(t) %x => %x (%v) %x\n", tx.Sender()[:4], tmp, tx.Value, tx.Hash())
 
 				// Notify the subscribers
-				pool.Ethereum.Reactor().Post("newTx:pre", tx)
+				pool.Ethereum.EventMux().Post(TxEvent{TxPre, tx})
 			}
 		case <-pool.quit:
 			break out

+ 27 - 47
ethereum.go

@@ -17,12 +17,11 @@ import (
 	"github.com/ethereum/eth-go/ethchain"
 	"github.com/ethereum/eth-go/ethcrypto"
 	"github.com/ethereum/eth-go/ethlog"
-	"github.com/ethereum/eth-go/ethreact"
 	"github.com/ethereum/eth-go/ethrpc"
 	"github.com/ethereum/eth-go/ethstate"
 	"github.com/ethereum/eth-go/ethutil"
 	"github.com/ethereum/eth-go/ethwire"
-	"github.com/ethereum/eth-go/eventer"
+	"github.com/ethereum/eth-go/event"
 )
 
 const (
@@ -60,7 +59,7 @@ type Ethereum struct {
 	// The block pool
 	blockPool *BlockPool
 	// Eventer
-	eventer *eventer.EventMachine
+	eventMux *event.TypeMux
 	// Peers
 	peers *list.List
 	// Nonce
@@ -85,8 +84,6 @@ type Ethereum struct {
 
 	listening bool
 
-	reactor *ethreact.ReactorEngine
-
 	RpcServer *ethrpc.JsonRpcServer
 
 	keyManager *ethcrypto.KeyManager
@@ -129,8 +126,7 @@ func New(db ethutil.Database, clientIdentity ethwire.ClientIdentity, keyManager
 		isUpToDate:     true,
 		filters:        make(map[int]*ethchain.Filter),
 	}
-	ethereum.reactor = ethreact.New()
-	ethereum.eventer = eventer.New()
+	ethereum.eventMux = event.NewTypeMux()
 
 	ethereum.blockPool = NewBlockPool(ethereum)
 	ethereum.txPool = ethchain.NewTxPool(ethereum)
@@ -143,10 +139,6 @@ func New(db ethutil.Database, clientIdentity ethwire.ClientIdentity, keyManager
 	return ethereum, nil
 }
 
-func (s *Ethereum) Reactor() *ethreact.ReactorEngine {
-	return s.reactor
-}
-
 func (s *Ethereum) KeyManager() *ethcrypto.KeyManager {
 	return s.keyManager
 }
@@ -169,8 +161,8 @@ func (s *Ethereum) TxPool() *ethchain.TxPool {
 func (s *Ethereum) BlockPool() *BlockPool {
 	return s.blockPool
 }
-func (s *Ethereum) Eventer() *eventer.EventMachine {
-	return s.eventer
+func (s *Ethereum) EventMux() *event.TypeMux {
+	return s.eventMux
 }
 func (self *Ethereum) Db() ethutil.Database {
 	return self.db
@@ -376,7 +368,7 @@ func (s *Ethereum) removePeerElement(e *list.Element) {
 
 	s.peers.Remove(e)
 
-	s.reactor.Post("peerList", s.peers)
+	s.eventMux.Post(PeerListEvent{s.peers})
 }
 
 func (s *Ethereum) RemovePeer(p *Peer) {
@@ -400,7 +392,6 @@ func (s *Ethereum) reapDeadPeerHandler() {
 
 // Start the ethereum
 func (s *Ethereum) Start(seed bool) {
-	s.reactor.Start()
 	s.blockPool.Start()
 	s.stateManager.Start()
 
@@ -524,8 +515,7 @@ func (s *Ethereum) Stop() {
 	}
 	s.txPool.Stop()
 	s.stateManager.Stop()
-	s.reactor.Flush()
-	s.reactor.Stop()
+	s.eventMux.Stop()
 	s.blockPool.Stop()
 
 	ethlogger.Infoln("Server stopped")
@@ -584,10 +574,10 @@ out:
 		select {
 		case <-upToDateTimer.C:
 			if self.IsUpToDate() && !self.isUpToDate {
-				self.reactor.Post("chainSync", false)
+				self.eventMux.Post(ChainSyncEvent{false})
 				self.isUpToDate = true
 			} else if !self.IsUpToDate() && self.isUpToDate {
-				self.reactor.Post("chainSync", true)
+				self.eventMux.Post(ChainSyncEvent{true})
 				self.isUpToDate = false
 			}
 		case <-self.quit:
@@ -623,40 +613,30 @@ func (self *Ethereum) GetFilter(id int) *ethchain.Filter {
 }
 
 func (self *Ethereum) filterLoop() {
-	blockChan := make(chan ethreact.Event, 5)
-	messageChan := make(chan ethreact.Event, 5)
 	// Subscribe to events
-	reactor := self.Reactor()
-	reactor.Subscribe("newBlock", blockChan)
-	reactor.Subscribe("messages", messageChan)
-out:
-	for {
-		select {
-		case <-self.quit:
-			break out
-		case block := <-blockChan:
-			if block, ok := block.Resource.(*ethchain.Block); ok {
-				self.filterMu.RLock()
-				for _, filter := range self.filters {
-					if filter.BlockCallback != nil {
-						filter.BlockCallback(block)
-					}
+	events := self.eventMux.Subscribe(ethchain.NewBlockEvent{}, ethstate.Messages(nil))
+	for event := range events.Chan() {
+		switch event := event.(type) {
+		case ethchain.NewBlockEvent:
+			self.filterMu.RLock()
+			for _, filter := range self.filters {
+				if filter.BlockCallback != nil {
+					filter.BlockCallback(event.Block)
 				}
-				self.filterMu.RUnlock()
 			}
-		case msg := <-messageChan:
-			if messages, ok := msg.Resource.(ethstate.Messages); ok {
-				self.filterMu.RLock()
-				for _, filter := range self.filters {
-					if filter.MessageCallback != nil {
-						msgs := filter.FilterMessages(messages)
-						if len(msgs) > 0 {
-							filter.MessageCallback(msgs)
-						}
+			self.filterMu.RUnlock()
+
+		case ethstate.Messages:
+			self.filterMu.RLock()
+			for _, filter := range self.filters {
+				if filter.MessageCallback != nil {
+					msgs := filter.FilterMessages(event)
+					if len(msgs) > 0 {
+						filter.MessageCallback(msgs)
 					}
 				}
-				self.filterMu.RUnlock()
 			}
+			self.filterMu.RUnlock()
 		}
 	}
 }

+ 70 - 67
ethminer/miner.go

@@ -6,27 +6,37 @@ import (
 
 	"github.com/ethereum/eth-go/ethchain"
 	"github.com/ethereum/eth-go/ethlog"
-	"github.com/ethereum/eth-go/ethreact"
 	"github.com/ethereum/eth-go/ethwire"
+	"github.com/ethereum/eth-go/event"
 )
 
 var logger = ethlog.NewLogger("MINER")
 
 type Miner struct {
-	pow         ethchain.PoW
-	ethereum    ethchain.EthManager
-	coinbase    []byte
-	reactChan   chan ethreact.Event
-	txs         ethchain.Transactions
-	uncles      []*ethchain.Block
-	block       *ethchain.Block
-	powChan     chan []byte
-	powQuitChan chan ethreact.Event
-	quitChan    chan chan error
+	pow      ethchain.PoW
+	ethereum ethchain.EthManager
+	coinbase []byte
+	txs      ethchain.Transactions
+	uncles   []*ethchain.Block
+	block    *ethchain.Block
+
+	events      event.Subscription
+	powQuitChan chan struct{}
+	powDone     chan struct{}
 
 	turbo bool
 }
 
+const (
+	Started = iota
+	Stopped
+)
+
+type Event struct {
+	Type  int // Started || Stopped
+	Miner *Miner
+}
+
 func (self *Miner) GetPow() ethchain.PoW {
 	return self.pow
 }
@@ -48,46 +58,42 @@ func (self *Miner) ToggleTurbo() {
 }
 
 func (miner *Miner) Start() {
-	miner.reactChan = make(chan ethreact.Event, 1)   // This is the channel that receives 'updates' when ever a new transaction or block comes in
-	miner.powChan = make(chan []byte, 1)             // This is the channel that receives valid sha hashes for a given block
-	miner.powQuitChan = make(chan ethreact.Event, 1) // This is the channel that can exit the miner thread
-	miner.quitChan = make(chan chan error, 1)
 
 	// Insert initial TXs in our little miner 'pool'
 	miner.txs = miner.ethereum.TxPool().Flush()
 	miner.block = miner.ethereum.BlockChain().NewBlock(miner.coinbase)
 
+	mux := miner.ethereum.EventMux()
+	miner.events = mux.Subscribe(ethchain.NewBlockEvent{}, ethchain.TxEvent{})
+
 	// Prepare inital block
 	//miner.ethereum.StateManager().Prepare(miner.block.State(), miner.block.State())
 	go miner.listener()
 
-	reactor := miner.ethereum.Reactor()
-	reactor.Subscribe("newBlock", miner.reactChan)
-	reactor.Subscribe("newTx:pre", miner.reactChan)
-
-	// We need the quit chan to be a Reactor event.
-	// The POW search method is actually blocking and if we don't
-	// listen to the reactor events inside of the pow itself
-	// The miner overseer will never get the reactor events themselves
-	// Only after the miner will find the sha
-	reactor.Subscribe("newBlock", miner.powQuitChan)
-	reactor.Subscribe("newTx:pre", miner.powQuitChan)
-
 	logger.Infoln("Started")
+	mux.Post(Event{Started, miner})
+}
 
-	reactor.Post("miner:start", miner)
+func (miner *Miner) Stop() {
+	logger.Infoln("Stopping...")
+	miner.events.Unsubscribe()
+	miner.ethereum.EventMux().Post(Event{Stopped, miner})
 }
 
 func (miner *Miner) listener() {
 	for {
+		miner.startMining()
+
 		select {
-		case status := <-miner.quitChan:
-			logger.Infoln("Stopped")
-			status <- nil
-			return
-		case chanMessage := <-miner.reactChan:
+		case event, isopen := <-miner.events.Chan():
+			miner.stopMining()
+			if !isopen {
+				return
+			}
 
-			if block, ok := chanMessage.Resource.(*ethchain.Block); ok {
+			switch event := event.(type) {
+			case ethchain.NewBlockEvent:
+				block := event.Block
 				//logger.Infoln("Got new block via Reactor")
 				if bytes.Compare(miner.ethereum.BlockChain().CurrentBlock.Hash(), block.Hash()) == 0 {
 					// TODO: Perhaps continue mining to get some uncle rewards
@@ -117,49 +123,44 @@ func (miner *Miner) listener() {
 						miner.uncles = append(miner.uncles, block)
 					}
 				}
-			}
 
-			if tx, ok := chanMessage.Resource.(*ethchain.Transaction); ok {
-				found := false
-				for _, ctx := range miner.txs {
-					if found = bytes.Compare(ctx.Hash(), tx.Hash()) == 0; found {
-						break
+			case ethchain.TxEvent:
+				if event.Type == ethchain.TxPre {
+					found := false
+					for _, ctx := range miner.txs {
+						if found = bytes.Compare(ctx.Hash(), event.Tx.Hash()) == 0; found {
+							break
+						}
+					}
+					if found == false {
+						// Undo all previous commits
+						miner.block.Undo()
+						// Apply new transactions
+						miner.txs = append(miner.txs, event.Tx)
 					}
-
-				}
-				if found == false {
-					// Undo all previous commits
-					miner.block.Undo()
-					// Apply new transactions
-					miner.txs = append(miner.txs, tx)
 				}
 			}
-		default:
-			miner.mineNewBlock()
+
+		case <-miner.powDone:
+			// next iteration will start mining again
 		}
 	}
 }
 
-func (miner *Miner) Stop() {
-	logger.Infoln("Stopping...")
-
-	miner.powQuitChan <- ethreact.Event{}
-
-	status := make(chan error)
-	miner.quitChan <- status
-	<-status
-
-	reactor := miner.ethereum.Reactor()
-	reactor.Unsubscribe("newBlock", miner.powQuitChan)
-	reactor.Unsubscribe("newTx:pre", miner.powQuitChan)
-	reactor.Unsubscribe("newBlock", miner.reactChan)
-	reactor.Unsubscribe("newTx:pre", miner.reactChan)
+func (miner *Miner) startMining() {
+	if miner.powDone == nil {
+		miner.powDone = make(chan struct{})
+	}
+	miner.powQuitChan = make(chan struct{})
+	go miner.mineNewBlock()
+}
 
-	reactor.Post("miner:stop", miner)
+func (miner *Miner) stopMining() {
+	close(miner.powQuitChan)
+	<-miner.powDone
 }
 
 func (self *Miner) mineNewBlock() {
-
 	stateManager := self.ethereum.StateManager()
 
 	self.block = self.ethereum.BlockChain().NewBlock(self.coinbase)
@@ -195,8 +196,9 @@ func (self *Miner) mineNewBlock() {
 	logger.Infof("Mining on block. Includes %v transactions", len(self.txs))
 
 	// Find a valid nonce
-	self.block.Nonce = self.pow.Search(self.block, self.powQuitChan)
-	if self.block.Nonce != nil {
+	nonce := self.pow.Search(self.block, self.powQuitChan)
+	if nonce != nil {
+		self.block.Nonce = nonce
 		err := self.ethereum.StateManager().Process(self.block, false)
 		if err != nil {
 			logger.Infoln(err)
@@ -208,4 +210,5 @@ func (self *Miner) mineNewBlock() {
 			self.txs = self.ethereum.TxPool().CurrentTransactions()
 		}
 	}
+	self.powDone <- struct{}{}
 }

+ 11 - 0
events.go

@@ -0,0 +1,11 @@
+package eth
+
+import "container/list"
+
+type PeerListEvent struct {
+	Peers *list.List
+}
+
+type ChainSyncEvent struct {
+	InSync bool
+}

+ 1 - 1
peer.go

@@ -802,7 +802,7 @@ func (p *Peer) handleHandshake(msg *ethwire.Msg) {
 	p.versionKnown = true
 
 	p.ethereum.PushPeer(p)
-	p.ethereum.reactor.Post("peerList", p.ethereum.Peers())
+	p.ethereum.eventMux.Post(PeerListEvent{p.ethereum.Peers()})
 
 	p.protocolCaps = caps
 	capsIt := caps.NewIterator()