Browse Source

Merge pull request #908 from obscuren/develop

core, eth, miner: improved tx removal & fatal error on db sync err
Jeffrey Wilcke 10 năm trước cách đây
mục cha
commit
5176fbc6fa
9 tập tin đã thay đổi với 135 bổ sung97 xóa
  1. 0 6
      common/size.go
  2. 0 14
      common/size_test.go
  3. 5 1
      core/events.go
  4. 2 0
      core/manager.go
  5. 1 1
      core/transaction_pool.go
  6. 5 3
      eth/backend.go
  7. 4 0
      miner/miner.go
  8. 116 72
      miner/worker.go
  9. 2 0
      rpc/jeth.go

+ 0 - 6
common/size.go

@@ -44,12 +44,6 @@ func CurrencyToString(num *big.Int) string {
 	)
 
 	switch {
-	case num.Cmp(Douglas) >= 0:
-		fin = new(big.Int).Div(num, Douglas)
-		denom = "Douglas"
-	case num.Cmp(Einstein) >= 0:
-		fin = new(big.Int).Div(num, Einstein)
-		denom = "Einstein"
 	case num.Cmp(Ether) >= 0:
 		fin = new(big.Int).Div(num, Ether)
 		denom = "Ether"

+ 0 - 14
common/size_test.go

@@ -25,8 +25,6 @@ func (s *SizeSuite) TestStorageSizeString(c *checker.C) {
 }
 
 func (s *CommonSuite) TestCommon(c *checker.C) {
-	douglas := CurrencyToString(BigPow(10, 43))
-	einstein := CurrencyToString(BigPow(10, 22))
 	ether := CurrencyToString(BigPow(10, 19))
 	finney := CurrencyToString(BigPow(10, 16))
 	szabo := CurrencyToString(BigPow(10, 13))
@@ -35,8 +33,6 @@ func (s *CommonSuite) TestCommon(c *checker.C) {
 	ada := CurrencyToString(BigPow(10, 4))
 	wei := CurrencyToString(big.NewInt(10))
 
-	c.Assert(douglas, checker.Equals, "10 Douglas")
-	c.Assert(einstein, checker.Equals, "10 Einstein")
 	c.Assert(ether, checker.Equals, "10 Ether")
 	c.Assert(finney, checker.Equals, "10 Finney")
 	c.Assert(szabo, checker.Equals, "10 Szabo")
@@ -45,13 +41,3 @@ func (s *CommonSuite) TestCommon(c *checker.C) {
 	c.Assert(ada, checker.Equals, "10 Ada")
 	c.Assert(wei, checker.Equals, "10 Wei")
 }
-
-func (s *CommonSuite) TestLarge(c *checker.C) {
-	douglaslarge := CurrencyToString(BigPow(100000000, 43))
-	adalarge := CurrencyToString(BigPow(100000000, 4))
-	weilarge := CurrencyToString(big.NewInt(100000000))
-
-	c.Assert(douglaslarge, checker.Equals, "10000E298 Douglas")
-	c.Assert(adalarge, checker.Equals, "10000E7 Einstein")
-	c.Assert(weilarge, checker.Equals, "100 Babbage")
-}

+ 5 - 1
core/events.go

@@ -1,8 +1,10 @@
 package core
 
 import (
-	"github.com/ethereum/go-ethereum/core/types"
+	"math/big"
+
 	"github.com/ethereum/go-ethereum/core/state"
+	"github.com/ethereum/go-ethereum/core/types"
 )
 
 // TxPreEvent is posted when a transaction enters the transaction pool.
@@ -44,6 +46,8 @@ type ChainUncleEvent struct {
 
 type ChainHeadEvent struct{ Block *types.Block }
 
+type GasPriceChanged struct{ Price *big.Int }
+
 // Mining operation events
 type StartMining struct{}
 type TopMining struct{}

+ 2 - 0
core/manager.go

@@ -1,12 +1,14 @@
 package core
 
 import (
+	"github.com/ethereum/go-ethereum/accounts"
 	"github.com/ethereum/go-ethereum/common"
 	"github.com/ethereum/go-ethereum/event"
 	"github.com/ethereum/go-ethereum/p2p"
 )
 
 type Backend interface {
+	AccountManager() *accounts.Manager
 	BlockProcessor() *BlockProcessor
 	ChainManager() *ChainManager
 	TxPool() *TxPool

+ 1 - 1
core/transaction_pool.go

@@ -21,7 +21,7 @@ var (
 	ErrInvalidSender      = errors.New("Invalid sender")
 	ErrNonce              = errors.New("Nonce too low")
 	ErrBalance            = errors.New("Insufficient balance")
-	ErrNonExistentAccount = errors.New("Account does not exist")
+	ErrNonExistentAccount = errors.New("Account does not exist or account balance too low")
 	ErrInsufficientFunds  = errors.New("Insufficient funds for gas * price + value")
 	ErrIntrinsicGas       = errors.New("Intrinsic gas too low")
 	ErrGasLimit           = errors.New("Exceeds block gas limit")

+ 5 - 3
eth/backend.go

@@ -451,6 +451,8 @@ func (s *Ethereum) Start() error {
 	return nil
 }
 
+// sync databases every minute. If flushing fails we exit immediatly. The system
+// may not continue under any circumstances.
 func (s *Ethereum) syncDatabases() {
 	ticker := time.NewTicker(1 * time.Minute)
 done:
@@ -459,13 +461,13 @@ done:
 		case <-ticker.C:
 			// don't change the order of database flushes
 			if err := s.extraDb.Flush(); err != nil {
-				glog.V(logger.Error).Infof("error: flush extraDb: %v\n", err)
+				glog.Fatalf("fatal error: flush extraDb: %v\n", err)
 			}
 			if err := s.stateDb.Flush(); err != nil {
-				glog.V(logger.Error).Infof("error: flush stateDb: %v\n", err)
+				glog.Fatalf("fatal error: flush stateDb: %v\n", err)
 			}
 			if err := s.blockDb.Flush(); err != nil {
-				glog.V(logger.Error).Infof("error: flush blockDb: %v\n", err)
+				glog.Fatalf("fatal error: flush blockDb: %v\n", err)
 			}
 		case <-s.shutdownChan:
 			break done

+ 4 - 0
miner/miner.go

@@ -7,6 +7,8 @@ import (
 	"github.com/ethereum/go-ethereum/core"
 	"github.com/ethereum/go-ethereum/core/state"
 	"github.com/ethereum/go-ethereum/core/types"
+	"github.com/ethereum/go-ethereum/logger"
+	"github.com/ethereum/go-ethereum/logger/glog"
 	"github.com/ethereum/go-ethereum/pow"
 )
 
@@ -47,6 +49,8 @@ func (m *Miner) SetGasPrice(price *big.Int) {
 }
 
 func (self *Miner) Start(coinbase common.Address) {
+	glog.V(logger.Info).Infoln("Starting mining operation")
+
 	self.mining = true
 	self.worker.coinbase = coinbase
 	self.worker.start()

+ 116 - 72
miner/worker.go

@@ -7,6 +7,7 @@ import (
 	"sync"
 	"sync/atomic"
 
+	"github.com/ethereum/go-ethereum/accounts"
 	"github.com/ethereum/go-ethereum/common"
 	"github.com/ethereum/go-ethereum/core"
 	"github.com/ethereum/go-ethereum/core/state"
@@ -21,12 +22,18 @@ import (
 var jsonlogger = logger.NewJsonLogger()
 
 type environment struct {
-	totalUsedGas *big.Int
-	state        *state.StateDB
-	coinbase     *state.StateObject
-	block        *types.Block
-	family       *set.Set
-	uncles       *set.Set
+	totalUsedGas       *big.Int
+	state              *state.StateDB
+	coinbase           *state.StateObject
+	block              *types.Block
+	family             *set.Set
+	uncles             *set.Set
+	remove             *set.Set
+	tcount             int
+	ignoredTransactors *set.Set
+	lowGasTransactors  *set.Set
+	ownedAccounts      *set.Set
+	lowGasTxs          types.Transactions
 }
 
 func env(block *types.Block, eth core.Backend) *environment {
@@ -128,12 +135,13 @@ func (self *worker) start() {
 	self.mu.Lock()
 	defer self.mu.Unlock()
 
+	atomic.StoreInt32(&self.mining, 1)
+
 	// spin up agents
 	for _, agent := range self.agents {
 		agent.Start()
 	}
 
-	atomic.StoreInt32(&self.mining, 1)
 }
 
 func (self *worker) stop() {
@@ -174,8 +182,11 @@ out:
 				self.possibleUncles[ev.Block.Hash()] = ev.Block
 				self.uncleMu.Unlock()
 			case core.TxPreEvent:
+				// Apply transaction to the pending state if we're not mining
 				if atomic.LoadInt32(&self.mining) == 0 {
-					self.commitNewWork()
+					self.mu.Lock()
+					self.commitTransactions(types.Transactions{ev.Tx})
+					self.mu.Unlock()
 				}
 			}
 		case <-self.quit:
@@ -241,19 +252,33 @@ func (self *worker) makeCurrent() {
 	}
 	block.Header().Extra = self.extra
 
-	self.current = env(block, self.eth)
+	current := env(block, self.eth)
 	for _, ancestor := range self.chain.GetAncestors(block, 7) {
-		self.current.family.Add(ancestor.Hash())
+		current.family.Add(ancestor.Hash())
 	}
+	accounts, _ := self.eth.AccountManager().Accounts()
+	// Keep track of transactions which return errors so they can be removed
+	current.remove = set.New()
+	current.tcount = 0
+	current.ignoredTransactors = set.New()
+	current.lowGasTransactors = set.New()
+	current.ownedAccounts = accountAddressesSet(accounts)
 
-	parent := self.chain.GetBlock(self.current.block.ParentHash())
-	self.current.coinbase.SetGasPool(core.CalcGasLimit(parent))
+	parent := self.chain.GetBlock(current.block.ParentHash())
+	current.coinbase.SetGasPool(core.CalcGasLimit(parent))
+
+	self.current = current
 }
 
 func (w *worker) setGasPrice(p *big.Int) {
 	w.mu.Lock()
 	defer w.mu.Unlock()
-	w.gasPrice = p
+
+	// calculate the minimal gas price the miner accepts when sorting out transactions.
+	const pct = int64(90)
+	w.gasPrice = gasprice(p, pct)
+
+	w.mux.Post(core.GasPriceChanged{w.gasPrice})
 }
 
 func (self *worker) commitNewWork() {
@@ -265,68 +290,14 @@ func (self *worker) commitNewWork() {
 	defer self.currentMu.Unlock()
 
 	self.makeCurrent()
+	current := self.current
 
 	transactions := self.eth.TxPool().GetTransactions()
 	sort.Sort(types.TxByNonce{transactions})
 
-	// Keep track of transactions which return errors so they can be removed
-	var (
-		remove             = set.New()
-		tcount             = 0
-		ignoredTransactors = set.New()
-	)
-
-	const pct = int64(90)
-	// calculate the minimal gas price the miner accepts when sorting out transactions.
-	minprice := gasprice(self.gasPrice, pct)
-	for _, tx := range transactions {
-		// We can skip err. It has already been validated in the tx pool
-		from, _ := tx.From()
-
-		// check if it falls within margin
-		if tx.GasPrice().Cmp(minprice) < 0 {
-			// ignore the transaction and transactor. We ignore the transactor
-			// because nonce will fail after ignoring this transaction so there's
-			// no point
-			ignoredTransactors.Add(from)
-			glog.V(logger.Info).Infof("transaction(%x) below gas price (<%d%% ask price). All sequential txs from this address(%x) will fail\n", tx.Hash().Bytes()[:4], pct, from[:4])
-			continue
-		}
-
-		// Move on to the next transaction when the transactor is in ignored transactions set
-		// This may occur when a transaction hits the gas limit. When a gas limit is hit and
-		// the transaction is processed (that could potentially be included in the block) it
-		// will throw a nonce error because the previous transaction hasn't been processed.
-		// Therefor we need to ignore any transaction after the ignored one.
-		if ignoredTransactors.Has(from) {
-			continue
-		}
-
-		self.current.state.StartRecord(tx.Hash(), common.Hash{}, 0)
-
-		err := self.commitTransaction(tx)
-		switch {
-		case core.IsNonceErr(err) || core.IsInvalidTxErr(err):
-			// Remove invalid transactions
-			from, _ := tx.From()
-
-			self.chain.TxState().RemoveNonce(from, tx.Nonce())
-			remove.Add(tx.Hash())
-
-			if glog.V(logger.Detail) {
-				glog.Infof("TX (%x) failed, will be removed: %v\n", tx.Hash().Bytes()[:4], err)
-			}
-		case state.IsGasLimitErr(err):
-			from, _ := tx.From()
-			// ignore the transactor so no nonce errors will be thrown for this account
-			// next time the worker is run, they'll be picked up again.
-			ignoredTransactors.Add(from)
-
-			glog.V(logger.Detail).Infof("Gas limit reached for (%x) in this block. Continue to try smaller txs\n", from[:4])
-		default:
-			tcount++
-		}
-	}
+	// commit transactions for this run
+	self.commitTransactions(transactions)
+	self.eth.TxPool().RemoveTransactions(current.lowGasTxs)
 
 	var (
 		uncles    []*types.Header
@@ -352,7 +323,7 @@ func (self *worker) commitNewWork() {
 
 	// We only care about logging if we're actually mining
 	if atomic.LoadInt32(&self.mining) == 1 {
-		glog.V(logger.Info).Infof("commit new work on block %v with %d txs & %d uncles\n", self.current.block.Number(), tcount, len(uncles))
+		glog.V(logger.Info).Infof("commit new work on block %v with %d txs & %d uncles\n", current.block.Number(), current.tcount, len(uncles))
 	}
 
 	for _, hash := range badUncles {
@@ -392,6 +363,71 @@ func (self *worker) commitUncle(uncle *types.Header) error {
 	return nil
 }
 
+func (self *worker) commitTransactions(transactions types.Transactions) {
+	current := self.current
+
+	for _, tx := range transactions {
+		// We can skip err. It has already been validated in the tx pool
+		from, _ := tx.From()
+
+		// check if it falls within margin
+		if tx.GasPrice().Cmp(self.gasPrice) < 0 {
+			// ignore the transaction and transactor. We ignore the transactor
+			// because nonce will fail after ignoring this transaction so there's
+			// no point
+			current.lowGasTransactors.Add(from)
+
+			glog.V(logger.Info).Infof("transaction(%x) below gas price (tx=%v ask=%v). All sequential txs from this address(%x) will be ignored\n", tx.Hash().Bytes()[:4], common.CurrencyToString(tx.GasPrice()), common.CurrencyToString(self.gasPrice), from[:4])
+		}
+
+		// Continue with the next transaction if the transaction sender is included in
+		// the low gas tx set. This will also remove the tx and all sequential transaction
+		// from this transactor
+		if current.lowGasTransactors.Has(from) {
+			// add tx to the low gas set. This will be removed at the end of the run
+			// owned accounts are ignored
+			if !current.ownedAccounts.Has(from) {
+				current.lowGasTxs = append(current.lowGasTxs, tx)
+			}
+			continue
+		}
+
+		// Move on to the next transaction when the transactor is in ignored transactions set
+		// This may occur when a transaction hits the gas limit. When a gas limit is hit and
+		// the transaction is processed (that could potentially be included in the block) it
+		// will throw a nonce error because the previous transaction hasn't been processed.
+		// Therefor we need to ignore any transaction after the ignored one.
+		if current.ignoredTransactors.Has(from) {
+			continue
+		}
+
+		self.current.state.StartRecord(tx.Hash(), common.Hash{}, 0)
+
+		err := self.commitTransaction(tx)
+		switch {
+		case core.IsNonceErr(err) || core.IsInvalidTxErr(err):
+			// Remove invalid transactions
+			from, _ := tx.From()
+
+			self.chain.TxState().RemoveNonce(from, tx.Nonce())
+			current.remove.Add(tx.Hash())
+
+			if glog.V(logger.Detail) {
+				glog.Infof("TX (%x) failed, will be removed: %v\n", tx.Hash().Bytes()[:4], err)
+			}
+		case state.IsGasLimitErr(err):
+			from, _ := tx.From()
+			// ignore the transactor so no nonce errors will be thrown for this account
+			// next time the worker is run, they'll be picked up again.
+			current.ignoredTransactors.Add(from)
+
+			glog.V(logger.Detail).Infof("Gas limit reached for (%x) in this block. Continue to try smaller txs\n", from[:4])
+		default:
+			current.tcount++
+		}
+	}
+}
+
 func (self *worker) commitTransaction(tx *types.Transaction) error {
 	snap := self.current.state.Copy()
 	receipt, _, err := self.proc.ApplyTransaction(self.current.coinbase, self.current.state, self.current.block, tx, self.current.totalUsedGas, true)
@@ -423,3 +459,11 @@ func gasprice(price *big.Int, pct int64) *big.Int {
 	p.Mul(p, big.NewInt(pct))
 	return p
 }
+
+func accountAddressesSet(accounts []accounts.Account) *set.Set {
+	accountSet := set.New()
+	for _, account := range accounts {
+		accountSet.Add(common.BytesToAddress(account.Address))
+	}
+	return accountSet
+}

+ 2 - 0
rpc/jeth.go

@@ -2,6 +2,7 @@ package rpc
 
 import (
 	"encoding/json"
+	"fmt"
 
 	"github.com/ethereum/go-ethereum/jsre"
 	"github.com/robertkrimen/otto"
@@ -52,6 +53,7 @@ func (self *Jeth) Send(call otto.FunctionCall) (response otto.Value) {
 		var respif interface{}
 		err = self.ethApi.GetRequestReply(&req, &respif)
 		if err != nil {
+			fmt.Println("Error response:", err)
 			return self.err(call, -32603, err.Error(), req.Id)
 		}
 		call.Otto.Set("ret_jsonrpc", jsonrpcver)