소스 검색

Working on new (blocking) event machine.

The new event machine will be used for loose coupling and handle the
communications between the services:

1) Block pool finds blocks which "links" with our current canonical
chain
2) Posts the blocks on to the event machine
3) State manager receives blocks & processes them
4) Broadcasts new post block event
obscuren 11 년 전
부모
커밋
ab6ede51d7
9개의 변경된 파일322개의 추가작업 그리고 180개의 파일을 삭제
  1. 24 14
      block_pool.go
  2. 0 144
      ethchain/block_chain_test.go
  3. 88 0
      ethchain/helper_test.go
  4. 51 20
      ethchain/state_manager.go
  5. 10 1
      ethereum.go
  6. 79 0
      eventer/eventer.go
  7. 66 0
      eventer/eventer_test.go
  8. 3 1
      peer.go
  9. 1 0
      types/ethereum.go

+ 24 - 14
block_pool.go

@@ -1,6 +1,7 @@
 package eth
 
 import (
+	"bytes"
 	"container/list"
 	"math"
 	"math/big"
@@ -236,22 +237,31 @@ out:
 		case <-self.quit:
 			break out
 		case <-procTimer.C:
-			// XXX We can optimize this lifting this on to a new goroutine.
-			// We'd need to make sure that the pools are properly protected by a mutex
-			// XXX This should moved in The Great Refactor(TM)
-			amount := self.ProcessCanonical(func(block *ethchain.Block) {
-				err := self.eth.StateManager().Process(block, false)
-				if err != nil {
-					poollogger.Infoln(err)
-					poollogger.Debugf("Block #%v failed (%x...)\n", block.Number, block.Hash()[0:4])
-					poollogger.Debugln(block)
+			blocks := self.Blocks()
+			ethchain.BlockBy(ethchain.Number).Sort(blocks)
+
+			if len(blocks) > 0 {
+				if self.eth.BlockChain().HasBlock(blocks[0].PrevHash) {
+					for i, block := range blocks[1:] {
+						// NOTE: The Ith element in this loop refers to the previous block in
+						// outer "blocks"
+						if bytes.Compare(block.PrevHash, blocks[i].Hash()) != 0 {
+							blocks = blocks[:i]
+
+							break
+						}
+					}
+				} else {
+					blocks = nil
 				}
-			})
+			}
+
+			// Handle in batches of 4k
+			max := int(math.Min(4000, float64(len(blocks))))
+			for _, block := range blocks[:max] {
+				self.eth.Eventer().Post("block", block)
 
-			// Do not propagate to the network on catchups
-			if amount == 1 {
-				block := self.eth.BlockChain().CurrentBlock
-				self.eth.Broadcast(ethwire.MsgBlockTy, []interface{}{block.Value().Val})
+				self.Remove(block.Hash())
 			}
 		}
 	}

+ 0 - 144
ethchain/block_chain_test.go

@@ -1,145 +1 @@
 package ethchain
-
-import (
-	"container/list"
-	"fmt"
-	"testing"
-
-	"github.com/ethereum/eth-go/ethcrypto"
-	"github.com/ethereum/eth-go/ethdb"
-	"github.com/ethereum/eth-go/ethreact"
-	"github.com/ethereum/eth-go/ethutil"
-	"github.com/ethereum/eth-go/ethwire"
-)
-
-// Implement our EthTest Manager
-type TestManager struct {
-	stateManager *StateManager
-	reactor      *ethreact.ReactorEngine
-
-	txPool     *TxPool
-	blockChain *BlockChain
-	Blocks     []*Block
-}
-
-func (s *TestManager) IsListening() bool {
-	return false
-}
-
-func (s *TestManager) IsMining() bool {
-	return false
-}
-
-func (s *TestManager) PeerCount() int {
-	return 0
-}
-
-func (s *TestManager) Peers() *list.List {
-	return list.New()
-}
-
-func (s *TestManager) BlockChain() *BlockChain {
-	return s.blockChain
-}
-
-func (tm *TestManager) TxPool() *TxPool {
-	return tm.txPool
-}
-
-func (tm *TestManager) StateManager() *StateManager {
-	return tm.stateManager
-}
-
-func (tm *TestManager) Reactor() *ethreact.ReactorEngine {
-	return tm.reactor
-}
-func (tm *TestManager) Broadcast(msgType ethwire.MsgType, data []interface{}) {
-	fmt.Println("Broadcast not implemented")
-}
-
-func (tm *TestManager) ClientIdentity() ethwire.ClientIdentity {
-	return nil
-}
-func (tm *TestManager) KeyManager() *ethcrypto.KeyManager {
-	return nil
-}
-
-func (tm *TestManager) Db() ethutil.Database { return nil }
-
-func NewTestManager() *TestManager {
-	ethutil.ReadConfig(".ethtest", "/tmp/ethtest", "ETH")
-
-	db, err := ethdb.NewMemDatabase()
-	if err != nil {
-		fmt.Println("Could not create mem-db, failing")
-		return nil
-	}
-	ethutil.Config.Db = db
-
-	testManager := &TestManager{}
-	testManager.reactor = ethreact.New()
-
-	testManager.txPool = NewTxPool(testManager)
-	testManager.blockChain = NewBlockChain(testManager)
-	testManager.stateManager = NewStateManager(testManager)
-
-	// Start the tx pool
-	testManager.txPool.Start()
-
-	return testManager
-}
-
-func (tm *TestManager) AddFakeBlock(blk []byte) error {
-	block := NewBlockFromBytes(blk)
-	tm.Blocks = append(tm.Blocks, block)
-	err := tm.StateManager().Process(block, false)
-	return err
-}
-
-func (tm *TestManager) CreateChain1() error {
-	err := tm.AddFakeBlock([]byte{248, 246, 248, 242, 160, 58, 253, 98, 206, 198, 181, 152, 223, 201, 116, 197, 154, 111, 104, 54, 113, 249, 184, 246, 15, 226, 142, 187, 47, 138, 60, 201, 66, 226, 237, 29, 7, 160, 29, 204, 77, 232, 222, 199, 93, 122, 171, 133, 181, 103, 182, 204, 212, 26, 211, 18, 69, 27, 148, 138, 116, 19, 240, 161, 66, 253, 64, 212, 147, 71, 184, 65, 4, 103, 109, 19, 120, 219, 91, 248, 48, 204, 17, 28, 7, 146, 72, 203, 15, 207, 251, 31, 216, 138, 26, 59, 34, 238, 40, 114, 233, 1, 13, 207, 90, 71, 136, 124, 86, 196, 127, 10, 176, 193, 154, 165, 76, 155, 154, 59, 45, 34, 96, 183, 212, 99, 41, 27, 40, 119, 171, 231, 160, 114, 56, 218, 173, 160, 80, 218, 177, 253, 147, 35, 101, 59, 37, 87, 97, 193, 119, 21, 132, 111, 93, 53, 152, 203, 38, 134, 25, 104, 138, 236, 92, 27, 176, 89, 229, 176, 160, 29, 204, 77, 232, 222, 199, 93, 122, 171, 133, 181, 103, 182, 204, 212, 26, 211, 18, 69, 27, 148, 138, 116, 19, 240, 161, 66, 253, 64, 212, 147, 71, 131, 63, 240, 0, 132, 83, 48, 32, 251, 128, 160, 4, 10, 11, 225, 132, 86, 146, 227, 229, 137, 164, 245, 16, 139, 219, 12, 251, 178, 154, 168, 210, 18, 84, 40, 250, 41, 124, 92, 169, 242, 246, 180, 192, 192})
-	err = tm.AddFakeBlock([]byte{248, 246, 248, 242, 160, 222, 229, 152, 228, 200, 163, 244, 144, 120, 18, 203, 253, 195, 185, 105, 131, 163, 226, 116, 40, 140, 68, 249, 198, 221, 152, 121, 0, 124, 11, 180, 125, 160, 29, 204, 77, 232, 222, 199, 93, 122, 171, 133, 181, 103, 182, 204, 212, 26, 211, 18, 69, 27, 148, 138, 116, 19, 240, 161, 66, 253, 64, 212, 147, 71, 184, 65, 4, 103, 109, 19, 120, 219, 91, 248, 48, 204, 17, 28, 7, 146, 72, 203, 15, 207, 251, 31, 216, 138, 26, 59, 34, 238, 40, 114, 233, 1, 13, 207, 90, 71, 136, 124, 86, 196, 127, 10, 176, 193, 154, 165, 76, 155, 154, 59, 45, 34, 96, 183, 212, 99, 41, 27, 40, 119, 171, 231, 160, 114, 56, 218, 173, 160, 80, 218, 177, 253, 147, 35, 101, 59, 37, 87, 97, 193, 119, 21, 132, 111, 93, 53, 152, 203, 38, 134, 25, 104, 138, 236, 92, 27, 176, 89, 229, 176, 160, 29, 204, 77, 232, 222, 199, 93, 122, 171, 133, 181, 103, 182, 204, 212, 26, 211, 18, 69, 27, 148, 138, 116, 19, 240, 161, 66, 253, 64, 212, 147, 71, 131, 63, 224, 4, 132, 83, 48, 36, 250, 128, 160, 79, 58, 51, 246, 238, 249, 210, 253, 136, 83, 71, 134, 49, 114, 190, 189, 242, 78, 100, 238, 101, 84, 204, 176, 198, 25, 139, 151, 60, 84, 51, 126, 192, 192})
-	err = tm.AddFakeBlock([]byte{248, 246, 248, 242, 160, 68, 52, 33, 210, 160, 189, 217, 255, 78, 37, 196, 217, 94, 247, 166, 169, 224, 199, 102, 110, 85, 213, 45, 13, 173, 106, 4, 103, 151, 195, 38, 86, 160, 29, 204, 77, 232, 222, 199, 93, 122, 171, 133, 181, 103, 182, 204, 212, 26, 211, 18, 69, 27, 148, 138, 116, 19, 240, 161, 66, 253, 64, 212, 147, 71, 184, 65, 4, 103, 109, 19, 120, 219, 91, 248, 48, 204, 17, 28, 7, 146, 72, 203, 15, 207, 251, 31, 216, 138, 26, 59, 34, 238, 40, 114, 233, 1, 13, 207, 90, 71, 136, 124, 86, 196, 127, 10, 176, 193, 154, 165, 76, 155, 154, 59, 45, 34, 96, 183, 212, 99, 41, 27, 40, 119, 171, 231, 160, 114, 56, 218, 173, 160, 80, 218, 177, 253, 147, 35, 101, 59, 37, 87, 97, 193, 119, 21, 132, 111, 93, 53, 152, 203, 38, 134, 25, 104, 138, 236, 92, 27, 176, 89, 229, 176, 160, 29, 204, 77, 232, 222, 199, 93, 122, 171, 133, 181, 103, 182, 204, 212, 26, 211, 18, 69, 27, 148, 138, 116, 19, 240, 161, 66, 253, 64, 212, 147, 71, 131, 63, 208, 12, 132, 83, 48, 38, 206, 128, 160, 65, 147, 32, 128, 177, 198, 131, 57, 57, 68, 135, 65, 198, 178, 138, 43, 25, 135, 92, 174, 208, 119, 103, 225, 26, 207, 243, 31, 225, 29, 173, 119, 192, 192})
-	return err
-}
-func (tm *TestManager) CreateChain2() error {
-	err := tm.AddFakeBlock([]byte{248, 246, 248, 242, 160, 58, 253, 98, 206, 198, 181, 152, 223, 201, 116, 197, 154, 111, 104, 54, 113, 249, 184, 246, 15, 226, 142, 187, 47, 138, 60, 201, 66, 226, 237, 29, 7, 160, 29, 204, 77, 232, 222, 199, 93, 122, 171, 133, 181, 103, 182, 204, 212, 26, 211, 18, 69, 27, 148, 138, 116, 19, 240, 161, 66, 253, 64, 212, 147, 71, 184, 65, 4, 72, 201, 77, 81, 160, 103, 70, 18, 102, 204, 82, 192, 86, 157, 40, 30, 117, 218, 224, 202, 1, 36, 249, 88, 82, 210, 19, 156, 112, 31, 13, 117, 227, 0, 125, 221, 190, 165, 16, 193, 163, 161, 175, 33, 37, 184, 235, 62, 201, 93, 102, 185, 143, 54, 146, 114, 30, 253, 178, 245, 87, 38, 191, 214, 160, 80, 218, 177, 253, 147, 35, 101, 59, 37, 87, 97, 193, 119, 21, 132, 111, 93, 53, 152, 203, 38, 134, 25, 104, 138, 236, 92, 27, 176, 89, 229, 176, 160, 29, 204, 77, 232, 222, 199, 93, 122, 171, 133, 181, 103, 182, 204, 212, 26, 211, 18, 69, 27, 148, 138, 116, 19, 240, 161, 66, 253, 64, 212, 147, 71, 131, 63, 240, 0, 132, 83, 48, 40, 35, 128, 160, 162, 214, 119, 207, 212, 186, 64, 47, 14, 186, 98, 118, 203, 79, 172, 205, 33, 206, 225, 177, 225, 194, 98, 188, 63, 219, 13, 151, 47, 32, 204, 27, 192, 192})
-	err = tm.AddFakeBlock([]byte{248, 246, 248, 242, 160, 0, 210, 76, 6, 13, 18, 219, 190, 18, 250, 23, 178, 198, 117, 254, 85, 14, 74, 104, 116, 56, 144, 116, 172, 14, 3, 236, 99, 248, 228, 142, 91, 160, 29, 204, 77, 232, 222, 199, 93, 122, 171, 133, 181, 103, 182, 204, 212, 26, 211, 18, 69, 27, 148, 138, 116, 19, 240, 161, 66, 253, 64, 212, 147, 71, 184, 65, 4, 72, 201, 77, 81, 160, 103, 70, 18, 102, 204, 82, 192, 86, 157, 40, 30, 117, 218, 224, 202, 1, 36, 249, 88, 82, 210, 19, 156, 112, 31, 13, 117, 227, 0, 125, 221, 190, 165, 16, 193, 163, 161, 175, 33, 37, 184, 235, 62, 201, 93, 102, 185, 143, 54, 146, 114, 30, 253, 178, 245, 87, 38, 191, 214, 160, 80, 218, 177, 253, 147, 35, 101, 59, 37, 87, 97, 193, 119, 21, 132, 111, 93, 53, 152, 203, 38, 134, 25, 104, 138, 236, 92, 27, 176, 89, 229, 176, 160, 29, 204, 77, 232, 222, 199, 93, 122, 171, 133, 181, 103, 182, 204, 212, 26, 211, 18, 69, 27, 148, 138, 116, 19, 240, 161, 66, 253, 64, 212, 147, 71, 131, 63, 255, 252, 132, 83, 48, 40, 74, 128, 160, 185, 20, 138, 2, 210, 15, 71, 144, 89, 167, 94, 155, 148, 118, 170, 157, 122, 70, 70, 114, 50, 221, 231, 8, 132, 167, 115, 239, 44, 245, 41, 226, 192, 192})
-	return err
-}
-
-func TestNegativeBlockChainReorg(t *testing.T) {
-	// We are resetting the database between creation so we need to cache our information
-	testManager2 := NewTestManager()
-	testManager2.CreateChain2()
-	tm2Blocks := testManager2.Blocks
-
-	testManager := NewTestManager()
-	testManager.CreateChain1()
-	oldState := testManager.BlockChain().CurrentBlock.State()
-
-	if testManager.BlockChain().FindCanonicalChain(tm2Blocks, testManager.BlockChain().GenesisBlock().Hash()) != true {
-		t.Error("I expected TestManager to have the longest chain, but it was TestManager2 instead.")
-	}
-	if testManager.BlockChain().CurrentBlock.State() != oldState {
-		t.Error("I expected the top state to be the same as it was as before the reorg")
-	}
-
-}
-
-func TestPositiveBlockChainReorg(t *testing.T) {
-	testManager := NewTestManager()
-	testManager.CreateChain1()
-	tm1Blocks := testManager.Blocks
-
-	testManager2 := NewTestManager()
-	testManager2.CreateChain2()
-	oldState := testManager2.BlockChain().CurrentBlock.State()
-
-	if testManager2.BlockChain().FindCanonicalChain(tm1Blocks, testManager.BlockChain().GenesisBlock().Hash()) == true {
-		t.Error("I expected TestManager to have the longest chain, but it was TestManager2 instead.")
-	}
-	if testManager2.BlockChain().CurrentBlock.State() == oldState {
-		t.Error("I expected the top state to have been modified but it was not")
-	}
-}

+ 88 - 0
ethchain/helper_test.go

@@ -0,0 +1,88 @@
+package ethchain
+
+import (
+	"container/list"
+	"fmt"
+
+	"github.com/ethereum/eth-go/ethcrypto"
+	"github.com/ethereum/eth-go/ethdb"
+	"github.com/ethereum/eth-go/ethreact"
+	"github.com/ethereum/eth-go/ethutil"
+	"github.com/ethereum/eth-go/ethwire"
+)
+
+// Implement our EthTest Manager
+type TestManager struct {
+	stateManager *StateManager
+	reactor      *ethreact.ReactorEngine
+
+	txPool     *TxPool
+	blockChain *BlockChain
+	Blocks     []*Block
+}
+
+func (s *TestManager) IsListening() bool {
+	return false
+}
+
+func (s *TestManager) IsMining() bool {
+	return false
+}
+
+func (s *TestManager) PeerCount() int {
+	return 0
+}
+
+func (s *TestManager) Peers() *list.List {
+	return list.New()
+}
+
+func (s *TestManager) BlockChain() *BlockChain {
+	return s.blockChain
+}
+
+func (tm *TestManager) TxPool() *TxPool {
+	return tm.txPool
+}
+
+func (tm *TestManager) StateManager() *StateManager {
+	return tm.stateManager
+}
+
+func (tm *TestManager) Reactor() *ethreact.ReactorEngine {
+	return tm.reactor
+}
+func (tm *TestManager) Broadcast(msgType ethwire.MsgType, data []interface{}) {
+	fmt.Println("Broadcast not implemented")
+}
+
+func (tm *TestManager) ClientIdentity() ethwire.ClientIdentity {
+	return nil
+}
+func (tm *TestManager) KeyManager() *ethcrypto.KeyManager {
+	return nil
+}
+
+func (tm *TestManager) Db() ethutil.Database { return nil }
+func NewTestManager() *TestManager {
+	ethutil.ReadConfig(".ethtest", "/tmp/ethtest", "ETH")
+
+	db, err := ethdb.NewMemDatabase()
+	if err != nil {
+		fmt.Println("Could not create mem-db, failing")
+		return nil
+	}
+	ethutil.Config.Db = db
+
+	testManager := &TestManager{}
+	testManager.reactor = ethreact.New()
+
+	testManager.txPool = NewTxPool(testManager)
+	testManager.blockChain = NewBlockChain(testManager)
+	testManager.stateManager = NewStateManager(testManager)
+
+	// Start the tx pool
+	testManager.txPool.Start()
+
+	return testManager
+}

+ 51 - 20
ethchain/state_manager.go

@@ -15,14 +15,11 @@ import (
 	"github.com/ethereum/eth-go/ethstate"
 	"github.com/ethereum/eth-go/ethutil"
 	"github.com/ethereum/eth-go/ethwire"
+	"github.com/ethereum/eth-go/eventer"
 )
 
 var statelogger = ethlog.NewLogger("STATE")
 
-type BlockProcessor interface {
-	ProcessBlock(block *Block)
-}
-
 type Peer interface {
 	Inbound() bool
 	LastSend() time.Time
@@ -48,6 +45,7 @@ type EthManager interface {
 	KeyManager() *ethcrypto.KeyManager
 	ClientIdentity() ethwire.ClientIdentity
 	Db() ethutil.Database
+	Eventer() *eventer.EventMachine
 }
 
 type StateManager struct {
@@ -60,7 +58,7 @@ type StateManager struct {
 	// Proof of work used for validating
 	Pow PoW
 	// The ethereum manager interface
-	Ethereum EthManager
+	eth EthManager
 	// The managed states
 	// Transiently state. The trans state isn't ever saved, validated and
 	// it could be used for setting account nonces without effecting
@@ -74,14 +72,18 @@ type StateManager struct {
 	// This does not have to be a valid block and will be set during
 	// 'Process' & canonical validation.
 	lastAttemptedBlock *Block
+
+	// Quit chan
+	quit chan bool
 }
 
 func NewStateManager(ethereum EthManager) *StateManager {
 	sm := &StateManager{
-		mem:      make(map[string]*big.Int),
-		Pow:      &EasyPow{},
-		Ethereum: ethereum,
-		bc:       ethereum.BlockChain(),
+		mem:  make(map[string]*big.Int),
+		Pow:  &EasyPow{},
+		eth:  ethereum,
+		bc:   ethereum.BlockChain(),
+		quit: make(chan bool),
 	}
 	sm.transState = ethereum.BlockChain().CurrentBlock.State().Copy()
 	sm.miningState = ethereum.BlockChain().CurrentBlock.State().Copy()
@@ -89,8 +91,41 @@ func NewStateManager(ethereum EthManager) *StateManager {
 	return sm
 }
 
+func (self *StateManager) Start() {
+	statelogger.Debugln("Starting state manager")
+
+	go self.updateThread()
+}
+
+func (self *StateManager) Stop() {
+	statelogger.Debugln("Stopping state manager")
+
+	close(self.quit)
+}
+
+func (self *StateManager) updateThread() {
+	blockChan := self.eth.Eventer().Register("block")
+
+out:
+	for {
+		select {
+		case event := <-blockChan:
+			block := event.Data.(*Block)
+			err := self.Process(block, false)
+			if err != nil {
+				statelogger.Infoln(err)
+				statelogger.Debugf("Block #%v failed (%x...)\n", block.Number, block.Hash()[0:4])
+				statelogger.Debugln(block)
+			}
+
+		case <-self.quit:
+			break out
+		}
+	}
+}
+
 func (sm *StateManager) CurrentState() *ethstate.State {
-	return sm.Ethereum.BlockChain().CurrentBlock.State()
+	return sm.eth.BlockChain().CurrentBlock.State()
 }
 
 func (sm *StateManager) TransState() *ethstate.State {
@@ -102,7 +137,7 @@ func (sm *StateManager) MiningState() *ethstate.State {
 }
 
 func (sm *StateManager) NewMiningState() *ethstate.State {
-	sm.miningState = sm.Ethereum.BlockChain().CurrentBlock.State().Copy()
+	sm.miningState = sm.eth.BlockChain().CurrentBlock.State().Copy()
 
 	return sm.miningState
 }
@@ -164,7 +199,7 @@ done:
 		}
 
 		// Notify all subscribers
-		self.Ethereum.Reactor().Post("newTx:post", tx)
+		self.eth.Reactor().Post("newTx:post", tx)
 
 		receipts = append(receipts, receipt)
 		handled = append(handled, tx)
@@ -251,16 +286,16 @@ func (sm *StateManager) Process(block *Block, dontReact bool) (err error) {
 		filter := sm.createBloomFilter(state)
 		// Persist the data
 		fk := append([]byte("bloom"), block.Hash()...)
-		sm.Ethereum.Db().Put(fk, filter.Bin())
+		sm.eth.Db().Put(fk, filter.Bin())
 
 		statelogger.Infof("Imported block #%d (%x...)\n", block.Number, block.Hash()[0:4])
 		if dontReact == false {
-			sm.Ethereum.Reactor().Post("newBlock", block)
+			sm.eth.Reactor().Post("newBlock", block)
 
 			state.Manifest().Reset()
 		}
 
-		sm.Ethereum.TxPool().RemoveInvalid(state)
+		sm.eth.TxPool().RemoveInvalid(state)
 	} else {
 		statelogger.Errorln("total diff failed")
 	}
@@ -385,10 +420,6 @@ func (sm *StateManager) AccumelateRewards(state *ethstate.State, block, parent *
 	return nil
 }
 
-func (sm *StateManager) Stop() {
-	sm.bc.Stop()
-}
-
 // Manifest will handle both creating notifications and generating bloom bin data
 func (sm *StateManager) createBloomFilter(state *ethstate.State) *BloomFilter {
 	bloomf := NewBloomFilter(nil)
@@ -398,7 +429,7 @@ func (sm *StateManager) createBloomFilter(state *ethstate.State) *BloomFilter {
 		bloomf.Set(msg.From)
 	}
 
-	sm.Ethereum.Reactor().Post("messages", state.Manifest().Messages)
+	sm.eth.Reactor().Post("messages", state.Manifest().Messages)
 
 	return bloomf
 }

+ 10 - 1
ethereum.go

@@ -22,6 +22,7 @@ import (
 	"github.com/ethereum/eth-go/ethstate"
 	"github.com/ethereum/eth-go/ethutil"
 	"github.com/ethereum/eth-go/ethwire"
+	"github.com/ethereum/eth-go/eventer"
 )
 
 const (
@@ -58,7 +59,9 @@ type Ethereum struct {
 	blockChain *ethchain.BlockChain
 	// The block pool
 	blockPool *BlockPool
-	// Peers (NYI)
+	// Eventer
+	eventer *eventer.EventMachine
+	// Peers
 	peers *list.List
 	// Nonce
 	Nonce uint64
@@ -123,6 +126,7 @@ func New(db ethutil.Database, clientIdentity ethwire.ClientIdentity, keyManager
 		filters:        make(map[int]*ethchain.Filter),
 	}
 	ethereum.reactor = ethreact.New()
+	ethereum.eventer = eventer.New()
 
 	ethereum.blockPool = NewBlockPool(ethereum)
 	ethereum.txPool = ethchain.NewTxPool(ethereum)
@@ -161,6 +165,9 @@ func (s *Ethereum) TxPool() *ethchain.TxPool {
 func (s *Ethereum) BlockPool() *BlockPool {
 	return s.blockPool
 }
+func (s *Ethereum) Eventer() *eventer.EventMachine {
+	return s.eventer
+}
 func (self *Ethereum) Db() ethutil.Database {
 	return self.db
 }
@@ -387,6 +394,8 @@ func (s *Ethereum) ReapDeadPeerHandler() {
 func (s *Ethereum) Start(seed bool) {
 	s.reactor.Start()
 	s.blockPool.Start()
+	s.stateManager.Start()
+
 	// Bind to addr and port
 	ln, err := net.Listen("tcp", ":"+s.Port)
 	if err != nil {

+ 79 - 0
eventer/eventer.go

@@ -0,0 +1,79 @@
+package eventer
+
+// Basic receiver interface.
+type Receiver interface {
+	Send(Event)
+}
+
+// Receiver as channel
+type Channel chan Event
+
+func (self Channel) Send(ev Event) {
+	self <- ev
+}
+
+// Receiver as function
+type Function func(ev Event)
+
+func (self Function) Send(ev Event) {
+	self(ev)
+}
+
+type Event struct {
+	Type string
+	Data interface{}
+}
+
+type Channels map[string][]Receiver
+
+type EventMachine struct {
+	channels Channels
+}
+
+func New() *EventMachine {
+	return &EventMachine{
+		channels: make(Channels),
+	}
+}
+
+func (self *EventMachine) add(typ string, r Receiver) {
+	self.channels[typ] = append(self.channels[typ], r)
+}
+
+// Generalised methods for the known receiver types
+// * Channel
+// * Function
+func (self *EventMachine) On(typ string, r interface{}) {
+	if eventFunc, ok := r.(func(Event)); ok {
+		self.RegisterFunc(typ, eventFunc)
+	} else if eventChan, ok := r.(Channel); ok {
+		self.RegisterChannel(typ, eventChan)
+	} else {
+		panic("Invalid type for EventMachine::On")
+	}
+}
+
+func (self *EventMachine) RegisterChannel(typ string, c Channel) {
+	self.add(typ, c)
+}
+
+func (self *EventMachine) RegisterFunc(typ string, f Function) {
+	self.add(typ, f)
+}
+
+func (self *EventMachine) Register(typ string) Channel {
+	c := make(Channel, 1)
+	self.add(typ, c)
+
+	return c
+}
+
+func (self *EventMachine) Post(typ string, data interface{}) {
+	if self.channels[typ] != nil {
+		ev := Event{typ, data}
+		for _, receiver := range self.channels[typ] {
+			// Blocking is OK. These are internals and need to be handled
+			receiver.Send(ev)
+		}
+	}
+}

+ 66 - 0
eventer/eventer_test.go

@@ -0,0 +1,66 @@
+package eventer
+
+import "testing"
+
+func TestChannel(t *testing.T) {
+	eventer := New(nil)
+
+	c := make(Channel, 1)
+	eventer.RegisterChannel("test", c)
+	eventer.Post("test", "hello world")
+
+	res := <-c
+
+	if res.Data.(string) != "hello world" {
+		t.Error("Expected event with data 'hello world'. Got", res.Data)
+	}
+}
+
+func TestFunction(t *testing.T) {
+	eventer := New(nil)
+
+	var data string
+	eventer.RegisterFunc("test", func(ev Event) {
+		data = ev.Data.(string)
+	})
+	eventer.Post("test", "hello world")
+
+	if data != "hello world" {
+		t.Error("Expected event with data 'hello world'. Got", data)
+	}
+}
+
+func TestRegister(t *testing.T) {
+	eventer := New(nil)
+
+	c := eventer.Register("test")
+	eventer.Post("test", "hello world")
+
+	res := <-c
+
+	if res.Data.(string) != "hello world" {
+		t.Error("Expected event with data 'hello world'. Got", res.Data)
+	}
+}
+
+func TestOn(t *testing.T) {
+	eventer := New(nil)
+
+	c := make(Channel, 1)
+	eventer.On("test", c)
+
+	var data string
+	eventer.On("test", func(ev Event) {
+		data = ev.Data.(string)
+	})
+	eventer.Post("test", "hello world")
+
+	res := <-c
+	if res.Data.(string) != "hello world" {
+		t.Error("Expected channel event with data 'hello world'. Got", res.Data)
+	}
+
+	if data != "hello world" {
+		t.Error("Expected function event with data 'hello world'. Got", data)
+	}
+}

+ 3 - 1
peer.go

@@ -554,7 +554,9 @@ func (self *Peer) FetchHashes() {
 		blockPool.td = self.td
 
 		if !blockPool.HasLatestHash() {
-			self.QueueMessage(ethwire.NewMessage(ethwire.MsgGetBlockHashesTy, []interface{}{self.lastReceivedHash, uint32(256)}))
+			const amount = 256
+			peerlogger.Debugf("Fetching hashes (%d)\n", amount)
+			self.QueueMessage(ethwire.NewMessage(ethwire.MsgGetBlockHashesTy, []interface{}{self.lastReceivedHash, uint32(amount)}))
 		}
 	}
 }

+ 1 - 0
types/ethereum.go

@@ -0,0 +1 @@
+package types