瀏覽代碼

[R4R]reannounce local pending transactions (#570)

* reannouce local pending transactions

* add tests for tx_pool reannouce local pending transactions

* add tests for handler reannounce local pending transactions
KeefeL 4 年之前
父節點
當前提交
90fd01423a
共有 11 個文件被更改,包括 259 次插入15 次删除
  1. 1 0
      cmd/geth/main.go
  2. 1 0
      cmd/geth/usage.go
  3. 8 0
      cmd/utils/flags.go
  4. 3 0
      core/events.go
  5. 59 13
      core/tx_pool.go
  6. 41 0
      core/tx_pool_test.go
  7. 5 0
      core/types/transaction.go
  8. 51 0
      eth/handler.go
  9. 53 0
      eth/handler_eth_test.go
  10. 21 2
      eth/handler_test.go
  11. 16 0
      eth/peerset.go

+ 1 - 0
cmd/geth/main.go

@@ -90,6 +90,7 @@ var (
 		utils.TxPoolAccountQueueFlag,
 		utils.TxPoolGlobalQueueFlag,
 		utils.TxPoolLifetimeFlag,
+		utils.TxPoolReannounceTimeFlag,
 		utils.SyncModeFlag,
 		utils.ExitWhenSyncedFlag,
 		utils.GCModeFlag,

+ 1 - 0
cmd/geth/usage.go

@@ -108,6 +108,7 @@ var AppHelpFlagGroups = []flags.FlagGroup{
 			utils.TxPoolAccountQueueFlag,
 			utils.TxPoolGlobalQueueFlag,
 			utils.TxPoolLifetimeFlag,
+			utils.TxPoolReannounceTimeFlag,
 		},
 	},
 	{

+ 8 - 0
cmd/utils/flags.go

@@ -398,6 +398,11 @@ var (
 		Usage: "Maximum amount of time non-executable transaction are queued",
 		Value: ethconfig.Defaults.TxPool.Lifetime,
 	}
+	TxPoolReannounceTimeFlag = cli.DurationFlag{
+		Name:  "txpool.reannouncetime",
+		Usage: "Duration for announcing local pending transactions again (default = 10 years, minimum = 1 minute)",
+		Value: ethconfig.Defaults.TxPool.ReannounceTime,
+	}
 	// Performance tuning settings
 	CacheFlag = cli.IntFlag{
 		Name:  "cache",
@@ -1410,6 +1415,9 @@ func setTxPool(ctx *cli.Context, cfg *core.TxPoolConfig) {
 	if ctx.GlobalIsSet(TxPoolLifetimeFlag.Name) {
 		cfg.Lifetime = ctx.GlobalDuration(TxPoolLifetimeFlag.Name)
 	}
+	if ctx.GlobalIsSet(TxPoolReannounceTimeFlag.Name) {
+		cfg.ReannounceTime = ctx.GlobalDuration(TxPoolReannounceTimeFlag.Name)
+	}
 }
 
 func setEthash(ctx *cli.Context, cfg *ethconfig.Config) {

+ 3 - 0
core/events.go

@@ -24,6 +24,9 @@ import (
 // NewTxsEvent is posted when a batch of transactions enter the transaction pool.
 type NewTxsEvent struct{ Txs []*types.Transaction }
 
+// ReannoTxsEvent is posted when a batch of local pending transactions exceed a specified duration.
+type ReannoTxsEvent struct{ Txs []*types.Transaction }
+
 // NewMinedBlockEvent is posted when a block has been imported.
 type NewMinedBlockEvent struct{ Block *types.Block }
 

+ 59 - 13
core/tx_pool.go

@@ -49,6 +49,9 @@ const (
 	// more expensive to propagate; larger transactions also take more resources
 	// to validate whether they fit into the pool or not.
 	txMaxSize = 4 * txSlotSize // 128KB
+
+	// txReannoMaxNum is the maximum number of transactions a reannounce action can include.
+	txReannoMaxNum = 1024
 )
 
 var (
@@ -88,6 +91,7 @@ var (
 var (
 	evictionInterval    = time.Minute     // Time interval to check for evictable transactions
 	statsReportInterval = 8 * time.Second // Time interval to report transaction pool stats
+	reannounceInterval  = time.Minute     // Time interval to check for reannounce transactions
 )
 
 var (
@@ -152,7 +156,8 @@ type TxPoolConfig struct {
 	AccountQueue uint64 // Maximum number of non-executable transaction slots permitted per account
 	GlobalQueue  uint64 // Maximum number of non-executable transaction slots for all accounts
 
-	Lifetime time.Duration // Maximum amount of time non-executable transaction are queued
+	Lifetime       time.Duration // Maximum amount of time non-executable transaction are queued
+	ReannounceTime time.Duration // Duration for announcing local pending transactions again
 }
 
 // DefaultTxPoolConfig contains the default configurations for the transaction
@@ -169,7 +174,8 @@ var DefaultTxPoolConfig = TxPoolConfig{
 	AccountQueue: 64,
 	GlobalQueue:  1024,
 
-	Lifetime: 3 * time.Hour,
+	Lifetime:       3 * time.Hour,
+	ReannounceTime: 10 * 365 * 24 * time.Hour,
 }
 
 // sanitize checks the provided user configurations and changes anything that's
@@ -208,6 +214,10 @@ func (config *TxPoolConfig) sanitize() TxPoolConfig {
 		log.Warn("Sanitizing invalid txpool lifetime", "provided", conf.Lifetime, "updated", DefaultTxPoolConfig.Lifetime)
 		conf.Lifetime = DefaultTxPoolConfig.Lifetime
 	}
+	if conf.ReannounceTime < time.Minute {
+		log.Warn("Sanitizing invalid txpool reannounce time", "provided", conf.ReannounceTime, "updated", time.Minute)
+		conf.ReannounceTime = time.Minute
+	}
 	return conf
 }
 
@@ -219,14 +229,15 @@ func (config *TxPoolConfig) sanitize() TxPoolConfig {
 // current state) and future transactions. Transactions move between those
 // two states over time as they are received and processed.
 type TxPool struct {
-	config      TxPoolConfig
-	chainconfig *params.ChainConfig
-	chain       blockChain
-	gasPrice    *big.Int
-	txFeed      event.Feed
-	scope       event.SubscriptionScope
-	signer      types.Signer
-	mu          sync.RWMutex
+	config       TxPoolConfig
+	chainconfig  *params.ChainConfig
+	chain        blockChain
+	gasPrice     *big.Int
+	txFeed       event.Feed
+	reannoTxFeed event.Feed // Event feed for announcing transactions again
+	scope        event.SubscriptionScope
+	signer       types.Signer
+	mu           sync.RWMutex
 
 	istanbul bool // Fork indicator whether we are in the istanbul stage.
 	eip2718  bool // Fork indicator whether we are using EIP-2718 type transactions.
@@ -323,14 +334,16 @@ func (pool *TxPool) loop() {
 	var (
 		prevPending, prevQueued, prevStales int
 		// Start the stats reporting and transaction eviction tickers
-		report  = time.NewTicker(statsReportInterval)
-		evict   = time.NewTicker(evictionInterval)
-		journal = time.NewTicker(pool.config.Rejournal)
+		report     = time.NewTicker(statsReportInterval)
+		evict      = time.NewTicker(evictionInterval)
+		reannounce = time.NewTicker(reannounceInterval)
+		journal    = time.NewTicker(pool.config.Rejournal)
 		// Track the previous head headers for transaction reorgs
 		head = pool.chain.CurrentBlock()
 	)
 	defer report.Stop()
 	defer evict.Stop()
+	defer reannounce.Stop()
 	defer journal.Stop()
 
 	for {
@@ -378,6 +391,33 @@ func (pool *TxPool) loop() {
 			}
 			pool.mu.Unlock()
 
+		case <-reannounce.C:
+			pool.mu.RLock()
+			reannoTxs := func() []*types.Transaction {
+				txs := make([]*types.Transaction, 0)
+				for addr, list := range pool.pending {
+					if !pool.locals.contains(addr) {
+						continue
+					}
+
+					for _, tx := range list.Flatten() {
+						// Default ReannounceTime is 10 years, won't announce by default.
+						if time.Since(tx.Time()) < pool.config.ReannounceTime {
+							break
+						}
+						txs = append(txs, tx)
+						if len(txs) >= txReannoMaxNum {
+							return txs
+						}
+					}
+				}
+				return txs
+			}()
+			pool.mu.RUnlock()
+			if len(reannoTxs) > 0 {
+				pool.reannoTxFeed.Send(ReannoTxsEvent{reannoTxs})
+			}
+
 		// Handle local transaction journal rotation
 		case <-journal.C:
 			if pool.journal != nil {
@@ -412,6 +452,12 @@ func (pool *TxPool) SubscribeNewTxsEvent(ch chan<- NewTxsEvent) event.Subscripti
 	return pool.scope.Track(pool.txFeed.Subscribe(ch))
 }
 
+// SubscribeReannoTxsEvent registers a subscription of ReannoTxsEvent and
+// starts sending event to the given channel.
+func (pool *TxPool) SubscribeReannoTxsEvent(ch chan<- ReannoTxsEvent) event.Subscription {
+	return pool.scope.Track(pool.reannoTxFeed.Subscribe(ch))
+}
+
 // GasPrice returns the current gas price enforced by the transaction pool.
 func (pool *TxPool) GasPrice() *big.Int {
 	pool.mu.RLock()

+ 41 - 0
core/tx_pool_test.go

@@ -1933,6 +1933,47 @@ func TestTransactionSlotCount(t *testing.T) {
 	}
 }
 
+// Tests the local pending transaction announced again correctly.
+func TestTransactionPendingReannouce(t *testing.T) {
+	t.Parallel()
+
+	// Create the pool to test the limit enforcement with
+	statedb, _ := state.New(common.Hash{}, state.NewDatabase(rawdb.NewMemoryDatabase()), nil)
+	blockchain := &testBlockChain{statedb, 1000000, new(event.Feed)}
+
+	config := testTxPoolConfig
+	// This ReannounceTime will be modified to time.Minute when creating tx_pool.
+	config.ReannounceTime = time.Second
+	reannounceInterval = time.Second
+
+	pool := NewTxPool(config, params.TestChainConfig, blockchain)
+	// Modify ReannounceTime to trigger quicker.
+	pool.config.ReannounceTime = time.Second
+	defer pool.Stop()
+
+	key, _ := crypto.GenerateKey()
+	account := crypto.PubkeyToAddress(key.PublicKey)
+	pool.currentState.AddBalance(account, big.NewInt(1000000))
+
+	events := make(chan ReannoTxsEvent, testTxPoolConfig.AccountQueue)
+	sub := pool.reannoTxFeed.Subscribe(events)
+	defer sub.Unsubscribe()
+
+	// Generate a batch of transactions and add to tx_pool locally.
+	txs := make([]*types.Transaction, 0, testTxPoolConfig.AccountQueue)
+	for i := uint64(0); i < testTxPoolConfig.AccountQueue; i++ {
+		txs = append(txs, transaction(i, 100000, key))
+	}
+	pool.AddLocals(txs)
+
+	select {
+	case ev := <-events:
+		t.Logf("received reannouce event, txs length: %d", len(ev.Txs))
+	case <-time.After(5 * time.Second):
+		t.Errorf("reannouce event not fired")
+	}
+}
+
 // Benchmarks the speed of validating the contents of the pending queue of the
 // transaction pool.
 func BenchmarkPendingDemotion100(b *testing.B)   { benchmarkPendingDemotion(b, 100) }

+ 5 - 0
core/types/transaction.go

@@ -82,6 +82,11 @@ type TxData interface {
 	setSignatureValues(chainID, v, r, s *big.Int)
 }
 
+// Time returns transaction's time
+func (tx *Transaction) Time() time.Time {
+	return tx.time
+}
+
 // EncodeRLP implements rlp.Encoder
 func (tx *Transaction) EncodeRLP(w io.Writer) error {
 	if tx.Type() == LegacyTxType {

+ 51 - 0
eth/handler.go

@@ -73,6 +73,10 @@ type txPool interface {
 	// SubscribeNewTxsEvent should return an event subscription of
 	// NewTxsEvent and send events to the given channel.
 	SubscribeNewTxsEvent(chan<- core.NewTxsEvent) event.Subscription
+
+	// SubscribeReannoTxsEvent should return an event subscription of
+	// ReannoTxsEvent and send events to the given channel.
+	SubscribeReannoTxsEvent(chan<- core.ReannoTxsEvent) event.Subscription
 }
 
 // handlerConfig is the collection of initialization parameters to create a full
@@ -120,6 +124,8 @@ type handler struct {
 	eventMux      *event.TypeMux
 	txsCh         chan core.NewTxsEvent
 	txsSub        event.Subscription
+	reannoTxsCh   chan core.ReannoTxsEvent
+	reannoTxsSub  event.Subscription
 	minedBlockSub *event.TypeMuxSubscription
 
 	whitelist map[uint64]common.Hash
@@ -432,6 +438,12 @@ func (h *handler) Start(maxPeers int) {
 	h.txsSub = h.txpool.SubscribeNewTxsEvent(h.txsCh)
 	go h.txBroadcastLoop()
 
+	// announce local pending transactions again
+	h.wg.Add(1)
+	h.reannoTxsCh = make(chan core.ReannoTxsEvent, txChanSize)
+	h.reannoTxsSub = h.txpool.SubscribeReannoTxsEvent(h.reannoTxsCh)
+	go h.txReannounceLoop()
+
 	// broadcast mined blocks
 	h.wg.Add(1)
 	h.minedBlockSub = h.eventMux.Subscribe(core.NewMinedBlockEvent{})
@@ -445,6 +457,7 @@ func (h *handler) Start(maxPeers int) {
 
 func (h *handler) Stop() {
 	h.txsSub.Unsubscribe()        // quits txBroadcastLoop
+	h.reannoTxsSub.Unsubscribe()  // quits txReannounceLoop
 	h.minedBlockSub.Unsubscribe() // quits blockBroadcastLoop
 
 	// Quit chainSync and txsync64.
@@ -549,6 +562,31 @@ func (h *handler) BroadcastTransactions(txs types.Transactions) {
 		"tx packs", directPeers, "broadcast txs", directCount)
 }
 
+// ReannounceTransactions will announce a batch of local pending transactions
+// to a square root of all peers.
+func (h *handler) ReannounceTransactions(txs types.Transactions) {
+	var (
+		annoCount int                                // Count of announcements made
+		annos     = make(map[*ethPeer][]common.Hash) // Set peer->hash to announce
+	)
+
+	// Announce transactions hash to a batch of peers
+	peersCount := uint(math.Sqrt(float64(h.peers.len())))
+	peers := h.peers.headPeers(peersCount)
+	for _, tx := range txs {
+		for _, peer := range peers {
+			annos[peer] = append(annos[peer], tx.Hash())
+		}
+	}
+
+	for peer, hashes := range annos {
+		annoCount += len(hashes)
+		peer.AsyncSendPooledTransactionHashes(hashes)
+	}
+	log.Debug("Transaction reannounce", "txs", len(txs),
+		"announce packs", peersCount, "announced hashes", annoCount)
+}
+
 // minedBroadcastLoop sends mined blocks to connected peers.
 func (h *handler) minedBroadcastLoop() {
 	defer h.wg.Done()
@@ -573,3 +611,16 @@ func (h *handler) txBroadcastLoop() {
 		}
 	}
 }
+
+// txReannounceLoop announces local pending transactions to connected peers again.
+func (h *handler) txReannounceLoop() {
+	defer h.wg.Done()
+	for {
+		select {
+		case event := <-h.reannoTxsCh:
+			h.ReannounceTransactions(event.Txs)
+		case <-h.reannoTxsSub.Err():
+			return
+		}
+	}
+}

+ 53 - 0
eth/handler_eth_test.go

@@ -450,6 +450,59 @@ func testTransactionPropagation(t *testing.T, protocol uint) {
 	}
 }
 
+// Tests that local pending transactions get propagated to peers.
+func TestTransactionPendingReannounce(t *testing.T) {
+	t.Parallel()
+
+	// Create a source handler to announce transactions from and a sink handler
+	// to receive them.
+	source := newTestHandler()
+	defer source.close()
+
+	sink := newTestHandler()
+	defer sink.close()
+	sink.handler.acceptTxs = 1 // mark synced to accept transactions
+
+	sourcePipe, sinkPipe := p2p.MsgPipe()
+	defer sourcePipe.Close()
+	defer sinkPipe.Close()
+
+	sourcePeer := eth.NewPeer(eth.ETH65, p2p.NewPeer(enode.ID{0}, "", nil), sourcePipe, source.txpool)
+	sinkPeer := eth.NewPeer(eth.ETH65, p2p.NewPeer(enode.ID{0}, "", nil), sinkPipe, sink.txpool)
+	defer sourcePeer.Close()
+	defer sinkPeer.Close()
+
+	go source.handler.runEthPeer(sourcePeer, func(peer *eth.Peer) error {
+		return eth.Handle((*ethHandler)(source.handler), peer)
+	})
+	go sink.handler.runEthPeer(sinkPeer, func(peer *eth.Peer) error {
+		return eth.Handle((*ethHandler)(sink.handler), peer)
+	})
+
+	// Subscribe transaction pools
+	txCh := make(chan core.NewTxsEvent, 1024)
+	sub := sink.txpool.SubscribeNewTxsEvent(txCh)
+	defer sub.Unsubscribe()
+
+	txs := make([]*types.Transaction, 64)
+	for nonce := range txs {
+		tx := types.NewTransaction(uint64(nonce), common.Address{}, big.NewInt(0), 100000, big.NewInt(0), nil)
+		tx, _ = types.SignTx(tx, types.HomesteadSigner{}, testKey)
+
+		txs[nonce] = tx
+	}
+	source.txpool.ReannouceTransactions(txs)
+
+	for arrived := 0; arrived < len(txs); {
+		select {
+		case event := <-txCh:
+			arrived += len(event.Txs)
+		case <-time.NewTimer(time.Second).C:
+			t.Errorf("sink: transaction propagation timed out: have %d, want %d", arrived, len(txs))
+		}
+	}
+}
+
 // Tests that post eth protocol handshake, clients perform a mutual checkpoint
 // challenge to validate each other's chains. Hash mismatches, or missing ones
 // during a fast sync should lead to the peer getting dropped.

+ 21 - 2
eth/handler_test.go

@@ -48,8 +48,9 @@ var (
 type testTxPool struct {
 	pool map[common.Hash]*types.Transaction // Hash map of collected transactions
 
-	txFeed event.Feed   // Notification feed to allow waiting for inclusion
-	lock   sync.RWMutex // Protects the transaction pool
+	txFeed       event.Feed   // Notification feed to allow waiting for inclusion
+	reannoTxFeed event.Feed   // Notification feed to trigger reannouce
+	lock         sync.RWMutex // Protects the transaction pool
 }
 
 // newTestTxPool creates a mock transaction pool.
@@ -90,6 +91,18 @@ func (p *testTxPool) AddRemotes(txs []*types.Transaction) []error {
 	return make([]error, len(txs))
 }
 
+// ReannouceTransactions announce the transactions to some peers.
+func (p *testTxPool) ReannouceTransactions(txs []*types.Transaction) []error {
+	p.lock.Lock()
+	defer p.lock.Unlock()
+
+	for _, tx := range txs {
+		p.pool[tx.Hash()] = tx
+	}
+	p.reannoTxFeed.Send(core.ReannoTxsEvent{Txs: txs})
+	return make([]error, len(txs))
+}
+
 // Pending returns all the transactions known to the pool
 func (p *testTxPool) Pending() (map[common.Address]types.Transactions, error) {
 	p.lock.RLock()
@@ -112,6 +125,12 @@ func (p *testTxPool) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.Subs
 	return p.txFeed.Subscribe(ch)
 }
 
+// SubscribeReannoTxsEvent should return an event subscription of ReannoTxsEvent and
+// send events to the given channel.
+func (p *testTxPool) SubscribeReannoTxsEvent(ch chan<- core.ReannoTxsEvent) event.Subscription {
+	return p.reannoTxFeed.Subscribe(ch)
+}
+
 // testHandler is a live implementation of the Ethereum protocol handler, just
 // preinitialized with some sane testing defaults and the transaction pool mocked
 // out.

+ 16 - 0
eth/peerset.go

@@ -266,6 +266,22 @@ func (ps *peerSet) peer(id string) *ethPeer {
 	return ps.peers[id]
 }
 
+// headPeers retrieves a specified number list of peers.
+func (ps *peerSet) headPeers(num uint) []*ethPeer {
+	ps.lock.RLock()
+	defer ps.lock.RUnlock()
+
+	if num > uint(len(ps.peers)) {
+		num = uint(len(ps.peers))
+	}
+
+	list := make([]*ethPeer, 0, num)
+	for _, p := range ps.peers {
+		list = append(list, p)
+	}
+	return list
+}
+
 // peersWithoutBlock retrieves a list of peers that do not have a given block in
 // their set of known hashes so it might be propagated to them.
 func (ps *peerSet) peersWithoutBlock(hash common.Hash) []*ethPeer {