浏览代码

Merge pull request #1389 from obscuren/txpool-issues

core, miner: miner header validation, transaction & receipt writing
Jeffrey Wilcke 10 年之前
父节点
当前提交
1cbab291a9

+ 1 - 1
cmd/utils/flags.go

@@ -412,7 +412,7 @@ func MakeChain(ctx *cli.Context) (chain *core.ChainManager, blockDB, stateDB, ex
 	eventMux := new(event.TypeMux)
 	pow := ethash.New()
 	genesis := core.GenesisBlock(uint64(ctx.GlobalInt(GenesisNonceFlag.Name)), blockDB)
-	chain, err = core.NewChainManager(genesis, blockDB, stateDB, pow, eventMux)
+	chain, err = core.NewChainManager(genesis, blockDB, stateDB, extraDB, pow, eventMux)
 	if err != nil {
 		Fatalf("Could not start chainmanager: %v", err)
 	}

+ 1 - 1
core/bench_test.go

@@ -152,7 +152,7 @@ func benchInsertChain(b *testing.B, disk bool, gen func(int, *BlockGen)) {
 	// Time the insertion of the new chain.
 	// State and blocks are stored in the same DB.
 	evmux := new(event.TypeMux)
-	chainman, _ := NewChainManager(genesis, db, db, FakePow{}, evmux)
+	chainman, _ := NewChainManager(genesis, db, db, db, FakePow{}, evmux)
 	chainman.SetProcessor(NewBlockProcessor(db, db, FakePow{}, chainman, evmux))
 	defer chainman.Stop()
 	b.ReportAllocs()

+ 8 - 56
core/block_processor.go

@@ -151,7 +151,7 @@ func (sm *BlockProcessor) RetryProcess(block *types.Block) (logs state.Logs, err
 	errch := make(chan bool)
 	go func() { errch <- sm.Pow.Verify(block) }()
 
-	logs, err = sm.processWithParent(block, parent)
+	logs, _, err = sm.processWithParent(block, parent)
 	if !<-errch {
 		return nil, ValidationError("Block's nonce is invalid (= %x)", block.Nonce)
 	}
@@ -162,23 +162,23 @@ func (sm *BlockProcessor) RetryProcess(block *types.Block) (logs state.Logs, err
 // Process block will attempt to process the given block's transactions and applies them
 // on top of the block's parent state (given it exists) and will return wether it was
 // successful or not.
-func (sm *BlockProcessor) Process(block *types.Block) (logs state.Logs, err error) {
+func (sm *BlockProcessor) Process(block *types.Block) (logs state.Logs, receipts types.Receipts, err error) {
 	// Processing a blocks may never happen simultaneously
 	sm.mutex.Lock()
 	defer sm.mutex.Unlock()
 
 	if sm.bc.HasBlock(block.Hash()) {
-		return nil, &KnownBlockError{block.Number(), block.Hash()}
+		return nil, nil, &KnownBlockError{block.Number(), block.Hash()}
 	}
 
 	if !sm.bc.HasBlock(block.ParentHash()) {
-		return nil, ParentError(block.ParentHash())
+		return nil, nil, ParentError(block.ParentHash())
 	}
 	parent := sm.bc.GetBlock(block.ParentHash())
 	return sm.processWithParent(block, parent)
 }
 
-func (sm *BlockProcessor) processWithParent(block, parent *types.Block) (logs state.Logs, err error) {
+func (sm *BlockProcessor) processWithParent(block, parent *types.Block) (logs state.Logs, receipts types.Receipts, err error) {
 	// Create a new state based on the parent's root (e.g., create copy)
 	state := state.New(parent.Root(), sm.db)
 	header := block.Header()
@@ -192,10 +192,10 @@ func (sm *BlockProcessor) processWithParent(block, parent *types.Block) (logs st
 
 	// There can be at most two uncles
 	if len(uncles) > 2 {
-		return nil, ValidationError("Block can only contain maximum 2 uncles (contained %v)", len(uncles))
+		return nil, nil, ValidationError("Block can only contain maximum 2 uncles (contained %v)", len(uncles))
 	}
 
-	receipts, err := sm.TransitionState(state, parent, block, false)
+	receipts, err = sm.TransitionState(state, parent, block, false)
 	if err != nil {
 		return
 	}
@@ -248,15 +248,7 @@ func (sm *BlockProcessor) processWithParent(block, parent *types.Block) (logs st
 	// Sync the current block's state to the database
 	state.Sync()
 
-	// This puts transactions in a extra db for rpc
-	for i, tx := range block.Transactions() {
-		putTx(sm.extraDb, tx, block, uint64(i))
-	}
-
-	// store the receipts
-	putReceipts(sm.extraDb, block.Hash(), receipts)
-
-	return state.Logs(), nil
+	return state.Logs(), receipts, nil
 }
 
 var (
@@ -411,43 +403,3 @@ func getBlockReceipts(db common.Database, bhash common.Hash) (receipts types.Rec
 	}
 	return
 }
-
-func putTx(db common.Database, tx *types.Transaction, block *types.Block, i uint64) {
-	rlpEnc, err := rlp.EncodeToBytes(tx)
-	if err != nil {
-		glog.V(logger.Debug).Infoln("Failed encoding tx", err)
-		return
-	}
-	db.Put(tx.Hash().Bytes(), rlpEnc)
-
-	var txExtra struct {
-		BlockHash  common.Hash
-		BlockIndex uint64
-		Index      uint64
-	}
-	txExtra.BlockHash = block.Hash()
-	txExtra.BlockIndex = block.NumberU64()
-	txExtra.Index = i
-	rlpMeta, err := rlp.EncodeToBytes(txExtra)
-	if err != nil {
-		glog.V(logger.Debug).Infoln("Failed encoding tx meta data", err)
-		return
-	}
-	db.Put(append(tx.Hash().Bytes(), 0x0001), rlpMeta)
-}
-
-func putReceipts(db common.Database, hash common.Hash, receipts types.Receipts) error {
-	storageReceipts := make([]*types.ReceiptForStorage, len(receipts))
-	for i, receipt := range receipts {
-		storageReceipts[i] = (*types.ReceiptForStorage)(receipt)
-	}
-
-	bytes, err := rlp.EncodeToBytes(storageReceipts)
-	if err != nil {
-		return err
-	}
-
-	db.Put(append(receiptsPre, hash[:]...), bytes)
-
-	return nil
-}

+ 2 - 2
core/block_processor_test.go

@@ -18,7 +18,7 @@ func proc() (*BlockProcessor, *ChainManager) {
 	var mux event.TypeMux
 
 	genesis := GenesisBlock(0, db)
-	chainMan, err := NewChainManager(genesis, db, db, thePow(), &mux)
+	chainMan, err := NewChainManager(genesis, db, db, db, thePow(), &mux)
 	if err != nil {
 		fmt.Println(err)
 	}
@@ -64,7 +64,7 @@ func TestPutReceipt(t *testing.T) {
 		Index:     0,
 	}})
 
-	putReceipts(db, hash, types.Receipts{receipt})
+	PutReceipts(db, hash, types.Receipts{receipt})
 	receipts, err := getBlockReceipts(db, hash)
 	if err != nil {
 		t.Error("got err:", err)

+ 1 - 1
core/chain_makers.go

@@ -167,7 +167,7 @@ func makeHeader(parent *types.Block, state *state.StateDB) *types.Header {
 // InsertChain on the result of makeChain.
 func newCanonical(n int, db common.Database) (*BlockProcessor, error) {
 	evmux := &event.TypeMux{}
-	chainman, _ := NewChainManager(GenesisBlock(0, db), db, db, FakePow{}, evmux)
+	chainman, _ := NewChainManager(GenesisBlock(0, db), db, db, db, FakePow{}, evmux)
 	bman := NewBlockProcessor(db, db, FakePow{}, chainman, evmux)
 	bman.bc.SetProcessor(bman)
 	parent := bman.bc.CurrentBlock()

+ 1 - 1
core/chain_makers_test.go

@@ -58,7 +58,7 @@ func ExampleGenerateChain() {
 
 	// Import the chain. This runs all block validation rules.
 	evmux := &event.TypeMux{}
-	chainman, _ := NewChainManager(genesis, db, db, FakePow{}, evmux)
+	chainman, _ := NewChainManager(genesis, db, db, db, FakePow{}, evmux)
 	chainman.SetProcessor(NewBlockProcessor(db, db, FakePow{}, chainman, evmux))
 	if i, err := chainman.InsertChain(chain); err != nil {
 		fmt.Printf("insert error (block %d): %v\n", i, err)

+ 20 - 13
core/chain_manager.go

@@ -42,6 +42,7 @@ type ChainManager struct {
 	//eth          EthManager
 	blockDb      common.Database
 	stateDb      common.Database
+	extraDb      common.Database
 	processor    types.BlockProcessor
 	eventMux     *event.TypeMux
 	genesisBlock *types.Block
@@ -70,11 +71,12 @@ type ChainManager struct {
 	pow pow.PoW
 }
 
-func NewChainManager(genesis *types.Block, blockDb, stateDb common.Database, pow pow.PoW, mux *event.TypeMux) (*ChainManager, error) {
+func NewChainManager(genesis *types.Block, blockDb, stateDb, extraDb common.Database, pow pow.PoW, mux *event.TypeMux) (*ChainManager, error) {
 	cache, _ := lru.New(blockCacheLimit)
 	bc := &ChainManager{
 		blockDb:      blockDb,
 		stateDb:      stateDb,
+		extraDb:      extraDb,
 		genesisBlock: GenesisBlock(42, stateDb),
 		eventMux:     mux,
 		quit:         make(chan struct{}),
@@ -477,10 +479,10 @@ func (self *ChainManager) procFutureBlocks() {
 type writeStatus byte
 
 const (
-	nonStatTy writeStatus = iota
-	canonStatTy
-	splitStatTy
-	sideStatTy
+	NonStatTy writeStatus = iota
+	CanonStatTy
+	SplitStatTy
+	SideStatTy
 )
 
 // WriteBlock writes the block to the chain (or pending queue)
@@ -497,10 +499,10 @@ func (self *ChainManager) WriteBlock(block *types.Block, queued bool) (status wr
 			// during split we merge two different chains and create the new canonical chain
 			err := self.merge(cblock, block)
 			if err != nil {
-				return nonStatTy, err
+				return NonStatTy, err
 			}
 
-			status = splitStatTy
+			status = SplitStatTy
 		}
 
 		self.mu.Lock()
@@ -511,9 +513,9 @@ func (self *ChainManager) WriteBlock(block *types.Block, queued bool) (status wr
 		self.setTransState(state.New(block.Root(), self.stateDb))
 		self.txState.SetState(state.New(block.Root(), self.stateDb))
 
-		status = canonStatTy
+		status = CanonStatTy
 	} else {
-		status = sideStatTy
+		status = SideStatTy
 	}
 
 	self.write(block)
@@ -581,7 +583,7 @@ func (self *ChainManager) InsertChain(chain types.Blocks) (int, error) {
 
 		// Call in to the block processor and check for errors. It's likely that if one block fails
 		// all others will fail too (unless a known block is returned).
-		logs, err := self.processor.Process(block)
+		logs, receipts, err := self.processor.Process(block)
 		if err != nil {
 			if IsKnownBlockErr(err) {
 				stats.ignored++
@@ -620,19 +622,24 @@ func (self *ChainManager) InsertChain(chain types.Blocks) (int, error) {
 			return i, err
 		}
 		switch status {
-		case canonStatTy:
+		case CanonStatTy:
 			if glog.V(logger.Debug) {
 				glog.Infof("[%v] inserted block #%d (%d TXs %d UNCs) (%x...). Took %v\n", time.Now().UnixNano(), block.Number(), len(block.Transactions()), len(block.Uncles()), block.Hash().Bytes()[0:4], time.Since(bstart))
 			}
 			queue[i] = ChainEvent{block, block.Hash(), logs}
 			queueEvent.canonicalCount++
-		case sideStatTy:
+
+			// This puts transactions in a extra db for rpc
+			PutTransactions(self.extraDb, block, block.Transactions())
+			// store the receipts
+			PutReceipts(self.extraDb, block.Hash(), receipts)
+		case SideStatTy:
 			if glog.V(logger.Detail) {
 				glog.Infof("inserted forked block #%d (TD=%v) (%d TXs %d UNCs) (%x...). Took %v\n", block.Number(), block.Difficulty(), len(block.Transactions()), len(block.Uncles()), block.Hash().Bytes()[0:4], time.Since(bstart))
 			}
 			queue[i] = ChainSideEvent{block, logs}
 			queueEvent.sideCount++
-		case splitStatTy:
+		case SplitStatTy:
 			queue[i] = ChainSplitEvent{block, logs}
 			queueEvent.splitCount++
 		}

+ 6 - 6
core/chain_manager_test.go

@@ -33,7 +33,7 @@ func thePow() pow.PoW {
 func theChainManager(db common.Database, t *testing.T) *ChainManager {
 	var eventMux event.TypeMux
 	genesis := GenesisBlock(0, db)
-	chainMan, err := NewChainManager(genesis, db, db, thePow(), &eventMux)
+	chainMan, err := NewChainManager(genesis, db, db, db, thePow(), &eventMux)
 	if err != nil {
 		t.Error("failed creating chainmanager:", err)
 		t.FailNow()
@@ -96,7 +96,7 @@ func printChain(bc *ChainManager) {
 func testChain(chainB types.Blocks, bman *BlockProcessor) (*big.Int, error) {
 	td := new(big.Int)
 	for _, block := range chainB {
-		_, err := bman.bc.processor.Process(block)
+		_, _, err := bman.bc.processor.Process(block)
 		if err != nil {
 			if IsKnownBlockErr(err) {
 				continue
@@ -367,7 +367,7 @@ func TestGetBlocksFromHash(t *testing.T) {
 
 type bproc struct{}
 
-func (bproc) Process(*types.Block) (state.Logs, error) { return nil, nil }
+func (bproc) Process(*types.Block) (state.Logs, types.Receipts, error) { return nil, nil, nil }
 
 func makeChainWithDiff(genesis *types.Block, d []int, seed byte) []*types.Block {
 	var chain []*types.Block
@@ -390,7 +390,7 @@ func makeChainWithDiff(genesis *types.Block, d []int, seed byte) []*types.Block
 
 func chm(genesis *types.Block, db common.Database) *ChainManager {
 	var eventMux event.TypeMux
-	bc := &ChainManager{blockDb: db, stateDb: db, genesisBlock: genesis, eventMux: &eventMux, pow: FakePow{}}
+	bc := &ChainManager{extraDb: db, blockDb: db, stateDb: db, genesisBlock: genesis, eventMux: &eventMux, pow: FakePow{}}
 	bc.cache, _ = lru.New(100)
 	bc.futureBlocks, _ = lru.New(100)
 	bc.processor = bproc{}
@@ -479,12 +479,12 @@ func TestGenesisMismatch(t *testing.T) {
 	db, _ := ethdb.NewMemDatabase()
 	var mux event.TypeMux
 	genesis := GenesisBlock(0, db)
-	_, err := NewChainManager(genesis, db, db, thePow(), &mux)
+	_, err := NewChainManager(genesis, db, db, db, thePow(), &mux)
 	if err != nil {
 		t.Error(err)
 	}
 	genesis = GenesisBlock(1, db)
-	_, err = NewChainManager(genesis, db, db, thePow(), &mux)
+	_, err = NewChainManager(genesis, db, db, db, thePow(), &mux)
 	if err == nil {
 		t.Error("expected genesis mismatch error")
 	}

+ 1 - 0
core/manager.go

@@ -14,5 +14,6 @@ type Backend interface {
 	TxPool() *TxPool
 	BlockDb() common.Database
 	StateDb() common.Database
+	ExtraDb() common.Database
 	EventMux() *event.TypeMux
 }

+ 51 - 0
core/transaction_util.go

@@ -0,0 +1,51 @@
+package core
+
+import (
+	"github.com/ethereum/go-ethereum/common"
+	"github.com/ethereum/go-ethereum/core/types"
+	"github.com/ethereum/go-ethereum/logger"
+	"github.com/ethereum/go-ethereum/logger/glog"
+	"github.com/ethereum/go-ethereum/rlp"
+)
+
+func PutTransactions(db common.Database, block *types.Block, txs types.Transactions) {
+	for i, tx := range block.Transactions() {
+		rlpEnc, err := rlp.EncodeToBytes(tx)
+		if err != nil {
+			glog.V(logger.Debug).Infoln("Failed encoding tx", err)
+			return
+		}
+		db.Put(tx.Hash().Bytes(), rlpEnc)
+
+		var txExtra struct {
+			BlockHash  common.Hash
+			BlockIndex uint64
+			Index      uint64
+		}
+		txExtra.BlockHash = block.Hash()
+		txExtra.BlockIndex = block.NumberU64()
+		txExtra.Index = uint64(i)
+		rlpMeta, err := rlp.EncodeToBytes(txExtra)
+		if err != nil {
+			glog.V(logger.Debug).Infoln("Failed encoding tx meta data", err)
+			return
+		}
+		db.Put(append(tx.Hash().Bytes(), 0x0001), rlpMeta)
+	}
+}
+
+func PutReceipts(db common.Database, hash common.Hash, receipts types.Receipts) error {
+	storageReceipts := make([]*types.ReceiptForStorage, len(receipts))
+	for i, receipt := range receipts {
+		storageReceipts[i] = (*types.ReceiptForStorage)(receipt)
+	}
+
+	bytes, err := rlp.EncodeToBytes(storageReceipts)
+	if err != nil {
+		return err
+	}
+
+	db.Put(append(receiptsPre, hash[:]...), bytes)
+
+	return nil
+}

+ 1 - 1
core/types/common.go

@@ -10,7 +10,7 @@ import (
 )
 
 type BlockProcessor interface {
-	Process(*Block) (state.Logs, error)
+	Process(*Block) (state.Logs, Receipts, error)
 }
 
 const bloomLength = 256

+ 1 - 1
eth/backend.go

@@ -339,7 +339,7 @@ func New(config *Config) (*Ethereum, error) {
 
 	eth.pow = ethash.New()
 	genesis := core.GenesisBlock(uint64(config.GenesisNonce), stateDb)
-	eth.chainManager, err = core.NewChainManager(genesis, blockDb, stateDb, eth.pow, eth.EventMux())
+	eth.chainManager, err = core.NewChainManager(genesis, blockDb, stateDb, extraDb, eth.pow, eth.EventMux())
 	if err != nil {
 		return nil, err
 	}

+ 1 - 1
eth/protocol_test.go

@@ -165,7 +165,7 @@ func newProtocolManagerForTesting(txAdded chan<- []*types.Transaction) *Protocol
 	var (
 		em       = new(event.TypeMux)
 		db, _    = ethdb.NewMemDatabase()
-		chain, _ = core.NewChainManager(core.GenesisBlock(0, db), db, db, core.FakePow{}, em)
+		chain, _ = core.NewChainManager(core.GenesisBlock(0, db), db, db, db, core.FakePow{}, em)
 		txpool   = &fakeTxPool{added: txAdded}
 		pm       = NewProtocolManager(0, em, txpool, core.FakePow{}, chain)
 	)

+ 30 - 5
miner/worker.go

@@ -79,9 +79,10 @@ type worker struct {
 	quit   chan struct{}
 	pow    pow.PoW
 
-	eth   core.Backend
-	chain *core.ChainManager
-	proc  *core.BlockProcessor
+	eth     core.Backend
+	chain   *core.ChainManager
+	proc    *core.BlockProcessor
+	extraDb common.Database
 
 	coinbase common.Address
 	gasPrice *big.Int
@@ -105,6 +106,7 @@ func newWorker(coinbase common.Address, eth core.Backend) *worker {
 	worker := &worker{
 		eth:            eth,
 		mux:            eth.EventMux(),
+		extraDb:        eth.ExtraDb(),
 		recv:           make(chan *types.Block),
 		gasPrice:       new(big.Int),
 		chain:          eth.ChainManager(),
@@ -233,11 +235,28 @@ func (self *worker) wait() {
 				continue
 			}
 
-			_, err := self.chain.WriteBlock(block, false)
+			parent := self.chain.GetBlock(block.ParentHash())
+			if parent == nil {
+				glog.V(logger.Error).Infoln("Invalid block found during mining")
+				continue
+			}
+			if err := core.ValidateHeader(self.eth.BlockProcessor().Pow, block.Header(), parent, true); err != nil {
+				glog.V(logger.Error).Infoln("Invalid header on mined block:", err)
+				continue
+			}
+
+			stat, err := self.chain.WriteBlock(block, false)
 			if err != nil {
 				glog.V(logger.Error).Infoln("error writing block to chain", err)
 				continue
 			}
+			// check if canon block and write transactions
+			if stat == core.CanonStatTy {
+				// This puts transactions in a extra db for rpc
+				core.PutTransactions(self.extraDb, block, block.Transactions())
+				// store the receipts
+				core.PutReceipts(self.extraDb, block.Hash(), self.current.receipts)
+			}
 
 			// check staleness and display confirmation
 			var stale, confirm string
@@ -252,7 +271,13 @@ func (self *worker) wait() {
 			glog.V(logger.Info).Infof("🔨  Mined %sblock (#%v / %x). %s", stale, block.Number(), block.Hash().Bytes()[:4], confirm)
 
 			// broadcast before waiting for validation
-			go self.mux.Post(core.NewMinedBlockEvent{block})
+			go func(block *types.Block, logs state.Logs) {
+				self.mux.Post(core.NewMinedBlockEvent{block})
+				self.mux.Post(core.ChainEvent{block, block.Hash(), logs})
+				if stat == core.CanonStatTy {
+					self.mux.Post(core.ChainHeadEvent{block})
+				}
+			}(block, self.current.state.Logs())
 
 			self.commitNewWork()
 		}

+ 1 - 0
xeth/xeth.go

@@ -980,6 +980,7 @@ func (self *XEth) Transact(fromStr, toStr, nonceStr, valueStr, gasStr, gasPriceS
 	} else {
 		glog.V(logger.Info).Infof("Tx(%x) to: %x\n", tx.Hash(), tx.To())
 	}
+
 	return tx.Hash().Hex(), nil
 }