Forráskód Böngészése

core: compute less transaction hashes in TxPool

Felix Lange 10 éve
szülő
commit
08befff8f1
2 módosított fájl, 82 hozzáadás és 82 törlés
  1. 72 72
      core/transaction_pool.go
  2. 10 10
      core/transaction_pool_test.go

+ 72 - 72
core/transaction_pool.go

@@ -61,7 +61,7 @@ type TxPool struct {
 	txs           map[common.Hash]*types.Transaction
 	invalidHashes *set.Set
 
-	queue map[common.Address]types.Transactions
+	queue map[common.Address]map[common.Hash]*types.Transaction
 
 	subscribers []chan TxMsg
 
@@ -71,7 +71,7 @@ type TxPool struct {
 func NewTxPool(eventMux *event.TypeMux, currentStateFn stateFn, gasLimitFn func() *big.Int) *TxPool {
 	txPool := &TxPool{
 		txs:           make(map[common.Hash]*types.Transaction),
-		queue:         make(map[common.Address]types.Transactions),
+		queue:         make(map[common.Address]map[common.Hash]*types.Transaction),
 		queueChan:     make(chan *types.Transaction, txPoolQueueSize),
 		quit:          make(chan bool),
 		eventMux:      eventMux,
@@ -157,22 +157,20 @@ func (self *TxPool) add(tx *types.Transaction) error {
 	if err != nil {
 		return err
 	}
-
-	self.queueTx(tx)
-
-	var toname string
-	if to := tx.To(); to != nil {
-		toname = common.Bytes2Hex(to[:4])
-	} else {
-		toname = "[NEW_CONTRACT]"
-	}
-	// we can ignore the error here because From is
-	// verified in ValidateTransaction.
-	f, _ := tx.From()
-	from := common.Bytes2Hex(f[:4])
+	self.queueTx(hash, tx)
 
 	if glog.V(logger.Debug) {
-		glog.Infof("(t) %x => %s (%v) %x\n", from, toname, tx.Value, tx.Hash())
+		var toname string
+		if to := tx.To(); to != nil {
+			toname = common.Bytes2Hex(to[:4])
+		} else {
+			toname = "[NEW_CONTRACT]"
+		}
+		// we can ignore the error here because From is
+		// verified in ValidateTransaction.
+		f, _ := tx.From()
+		from := common.Bytes2Hex(f[:4])
+		glog.Infof("(t) %x => %s (%v) %x\n", from, toname, tx.Value, hash)
 	}
 
 	return nil
@@ -211,16 +209,12 @@ func (tp *TxPool) GetTransaction(hash common.Hash) *types.Transaction {
 	if tx, ok := tp.txs[hash]; ok {
 		return tx
 	}
-
 	// check queue
 	for _, txs := range tp.queue {
-		for _, tx := range txs {
-			if tx.Hash() == hash {
-				return tx
-			}
+		if tx, ok := txs[hash]; ok {
+			return tx
 		}
 	}
-
 	return nil
 }
 
@@ -234,26 +228,26 @@ func (self *TxPool) GetTransactions() (txs types.Transactions) {
 		txs[i] = tx
 		i++
 	}
-
-	return
+	return txs
 }
 
 func (self *TxPool) GetQueuedTransactions() types.Transactions {
 	self.mu.RLock()
 	defer self.mu.RUnlock()
 
-	var txs types.Transactions
-	for _, ts := range self.queue {
-		txs = append(txs, ts...)
+	var ret types.Transactions
+	for _, txs := range self.queue {
+		for _, tx := range txs {
+			ret = append(ret, tx)
+		}
 	}
-
-	return txs
+	sort.Sort(types.TxByNonce{ret})
+	return ret
 }
 
 func (self *TxPool) RemoveTransactions(txs types.Transactions) {
 	self.mu.Lock()
 	defer self.mu.Unlock()
-
 	for _, tx := range txs {
 		self.removeTx(tx.Hash())
 	}
@@ -270,14 +264,17 @@ func (pool *TxPool) Stop() {
 	glog.V(logger.Info).Infoln("TX Pool stopped")
 }
 
-func (self *TxPool) queueTx(tx *types.Transaction) {
+func (self *TxPool) queueTx(hash common.Hash, tx *types.Transaction) {
 	from, _ := tx.From() // already validated
-	self.queue[from] = append(self.queue[from], tx)
+	if self.queue[from] == nil {
+		self.queue[from] = make(map[common.Hash]*types.Transaction)
+	}
+	self.queue[from][hash] = tx
 }
 
-func (pool *TxPool) addTx(tx *types.Transaction) {
-	if _, ok := pool.txs[tx.Hash()]; !ok {
-		pool.txs[tx.Hash()] = tx
+func (pool *TxPool) addTx(hash common.Hash, tx *types.Transaction) {
+	if _, ok := pool.txs[hash]; !ok {
+		pool.txs[hash] = tx
 		// Notify the subscribers. This event is posted in a goroutine
 		// because it's possible that somewhere during the post "Remove transaction"
 		// gets called which will then wait for the global tx pool lock and deadlock.
@@ -291,36 +288,33 @@ func (pool *TxPool) checkQueue() {
 	defer pool.mu.Unlock()
 
 	statedb := pool.currentState()
+	var addq txQueue
 	for address, txs := range pool.queue {
-		sort.Sort(types.TxByNonce{txs})
-
-		var (
-			nonce = statedb.GetNonce(address)
-			start int
-		)
-		// Clean up the transactions first and determine the start of the nonces
-		for _, tx := range txs {
-			if tx.Nonce() >= nonce {
-				break
+		curnonce := statedb.GetNonce(address)
+		addq := addq[:0]
+		for hash, tx := range txs {
+			if tx.AccountNonce < curnonce {
+				// Drop queued transactions whose nonce is lower than
+				// the account nonce because they have been processed.
+				delete(txs, hash)
+			} else {
+				// Collect the remaining transactions for the next pass.
+				addq = append(addq, txQueueEntry{hash, tx})
 			}
-			start++
 		}
-		pool.queue[address] = txs[start:]
-
-		// expected nonce
-		enonce := nonce
-		for _, tx := range pool.queue[address] {
-			// If the expected nonce does not match up with the next one
-			// (i.e. a nonce gap), we stop the loop
-			if enonce != tx.Nonce() {
+		// Find the next consecutive nonce range starting at the
+		// current account nonce.
+		sort.Sort(addq)
+		for _, e := range addq {
+			if e.AccountNonce != curnonce {
 				break
 			}
-			enonce++
-
-			pool.addTx(tx)
+			curnonce++
+			delete(txs, e.hash)
+			pool.addTx(e.hash, e.Transaction)
 		}
-		// delete the entire queue entry if it's empty. There's no need to keep it
-		if len(pool.queue[address]) == 0 {
+		// Delete the entire queue entry if it became empty.
+		if len(txs) == 0 {
 			delete(pool.queue, address)
 		}
 	}
@@ -329,20 +323,16 @@ func (pool *TxPool) checkQueue() {
 func (pool *TxPool) removeTx(hash common.Hash) {
 	// delete from pending pool
 	delete(pool.txs, hash)
-
 	// delete from queue
-out:
 	for address, txs := range pool.queue {
-		for i, tx := range txs {
-			if tx.Hash() == hash {
-				if len(txs) == 1 {
-					// if only one tx, remove entire address entry
-					delete(pool.queue, address)
-				} else {
-					pool.queue[address][len(txs)-1], pool.queue[address] = nil, append(txs[:i], txs[i+1:]...)
-				}
-				break out
+		if _, ok := txs[hash]; ok {
+			if len(txs) == 1 {
+				// if only one tx, remove entire address entry.
+				delete(pool.queue, address)
+			} else {
+				delete(txs, hash)
 			}
+			break
 		}
 	}
 }
@@ -356,8 +346,18 @@ func (pool *TxPool) validatePool() {
 			if glog.V(logger.Info) {
 				glog.Infof("removed tx (%x) from pool: %v\n", hash[:4], err)
 			}
-
-			pool.removeTx(hash)
+			delete(pool.txs, hash)
 		}
 	}
 }
+
+type txQueue []txQueueEntry
+
+type txQueueEntry struct {
+	hash common.Hash
+	*types.Transaction
+}
+
+func (q txQueue) Len() int           { return len(q) }
+func (q txQueue) Swap(i, j int)      { q[i], q[j] = q[j], q[i] }
+func (q txQueue) Less(i, j int) bool { return q[i].AccountNonce < q[j].AccountNonce }

+ 10 - 10
core/transaction_pool_test.go

@@ -68,7 +68,7 @@ func TestTransactionQueue(t *testing.T) {
 	tx.SignECDSA(key)
 	from, _ := tx.From()
 	pool.currentState().AddBalance(from, big.NewInt(1))
-	pool.queueTx(tx)
+	pool.queueTx(tx.Hash(), tx)
 
 	pool.checkQueue()
 	if len(pool.txs) != 1 {
@@ -80,7 +80,7 @@ func TestTransactionQueue(t *testing.T) {
 	from, _ = tx.From()
 	pool.currentState().SetNonce(from, 10)
 	tx.SetNonce(1)
-	pool.queueTx(tx)
+	pool.queueTx(tx.Hash(), tx)
 	pool.checkQueue()
 	if _, ok := pool.txs[tx.Hash()]; ok {
 		t.Error("expected transaction to be in tx pool")
@@ -97,18 +97,18 @@ func TestTransactionQueue(t *testing.T) {
 	tx1.SignECDSA(key)
 	tx2.SignECDSA(key)
 	tx3.SignECDSA(key)
-	pool.queueTx(tx1)
-	pool.queueTx(tx2)
-	pool.queueTx(tx3)
+	pool.queueTx(tx1.Hash(), tx1)
+	pool.queueTx(tx2.Hash(), tx2)
+	pool.queueTx(tx3.Hash(), tx3)
 	from, _ = tx1.From()
+
 	pool.checkQueue()
 
 	if len(pool.txs) != 1 {
 		t.Error("expected tx pool to be 1 =")
 	}
-
-	if len(pool.queue[from]) != 3 {
-		t.Error("expected transaction queue to be empty. is", len(pool.queue[from]))
+	if len(pool.queue[from]) != 2 {
+		t.Error("expected len(queue) == 2, got", len(pool.queue[from]))
 	}
 }
 
@@ -118,8 +118,8 @@ func TestRemoveTx(t *testing.T) {
 	tx.SignECDSA(key)
 	from, _ := tx.From()
 	pool.currentState().AddBalance(from, big.NewInt(1))
-	pool.queueTx(tx)
-	pool.addTx(tx)
+	pool.queueTx(tx.Hash(), tx)
+	pool.addTx(tx.Hash(), tx)
 	if len(pool.queue) != 1 {
 		t.Error("expected queue to be 1, got", len(pool.queue))
 	}