Jelajahi Sumber

进行节点P2P调式的一个过渡版本,添加大量注释 大量打印和参数修改。

410 2 tahun lalu
induk
melakukan
954d334414

+ 2 - 3
core/types/transaction.go

@@ -248,9 +248,8 @@ func (tx *Transaction) GetInner() TxData {
 	return tx.inner
 }
 
-func (tx *Transaction) Inner() (byte, TxData, *big.Int, AccessList, []byte, uint64, *big.Int, *big.Int, uint64, *common.Address, *big.Int, *big.Int, *big.Int) {
+func (tx *Transaction) Inner() (byte, *big.Int, AccessList, []byte, uint64, *big.Int, *big.Int, uint64, *common.Address, *big.Int, *big.Int, *big.Int) {
 	txType := tx.inner.txType()
-	copiedTxData := tx.inner.copy()
 	chainID := tx.inner.chainID()
 	accessList := tx.inner.accessList()
 	data := tx.inner.data()
@@ -261,7 +260,7 @@ func (tx *Transaction) Inner() (byte, TxData, *big.Int, AccessList, []byte, uint
 	to := tx.inner.to()
 	v, r, s := tx.inner.rawSignatureValues()
 
-	return txType, copiedTxData, chainID, accessList, data, gas, gasPrice, value, nonce, to, v, r, s
+	return txType, chainID, accessList, data, gas, gasPrice, value, nonce, to, v, r, s
 }
 
 // Type returns the transaction type.

+ 1 - 0
eth/fetcher/block_fetcher.go

@@ -250,6 +250,7 @@ func (f *BlockFetcher) Stop() {
 
 // Notify announces the fetcher of the potential availability of a new block in
 // the network.
+// rec 0x1 > notify
 func (f *BlockFetcher) Notify(peer string, hash common.Hash, number uint64, time time.Time,
 	headerFetcher headerRequesterFn, bodyFetcher bodyRequesterFn, diffFetcher DiffRequesterFn) error {
 	block := &blockAnnounce{

+ 1 - 0
eth/fetcher/tx_fetcher.go

@@ -213,6 +213,7 @@ func NewTxFetcherForTests(
 
 // Notify announces the fetcher of the potential availability of a new batch of
 // transactions in the network.
+// 0x8 notify < Hash
 func (f *TxFetcher) Notify(peer string, hashes []common.Hash) error {
 	// Keep track of all the announced transactions
 	txAnnounceInMeter.Mark(int64(len(hashes)))

+ 55 - 32
eth/handler.go

@@ -18,8 +18,8 @@ package eth
 
 import (
 	"errors"
+	"fmt"
 	"math"
-	"math/big"
 	"sync"
 	"sync/atomic"
 	"time"
@@ -38,7 +38,6 @@ import (
 	"github.com/ethereum/go-ethereum/log"
 	"github.com/ethereum/go-ethereum/p2p"
 	"github.com/ethereum/go-ethereum/params"
-	"github.com/ethereum/go-ethereum/rlp"
 	"github.com/ethereum/go-ethereum/trie"
 )
 
@@ -432,7 +431,6 @@ func (h *handler) removePeer(id string) {
 
 func (h *handler) Start(maxPeers int) {
 	h.maxPeers = maxPeers
-
 	// broadcast transactions
 	h.wg.Add(1)
 	h.txsCh = make(chan core.NewTxsEvent, txChanSize)
@@ -478,48 +476,74 @@ func (h *handler) Stop() {
 
 // BroadcastBlock will either propagate a block to a subset of its peers, or
 // will only announce its availability (depending what's requested).
+// 0x7 and 0x1
 func (h *handler) BroadcastBlock(block *types.Block, propagate bool) {
 	hash := block.Hash()
 	peers := h.peers.peersWithoutBlock(hash)
 
 	// If propagation is requested, send to a subset of the peer
-	if propagate {
-		// Calculate the TD of the block (it's not imported yet, so block.Td is not valid)
-		var td *big.Int
-		if parent := h.chain.GetBlock(block.ParentHash(), block.NumberU64()-1); parent != nil {
-			td = new(big.Int).Add(block.Difficulty(), h.chain.GetTd(block.ParentHash(), block.NumberU64()-1))
-		} else {
-			log.Error("Propagating dangling block", "number", block.Number(), "hash", hash)
-			return
-		}
-		// Send the block to a subset of our peers
-		var transfer []*ethPeer
-		if h.directBroadcast {
-			transfer = peers[:]
-		} else {
-			transfer = peers[:int(math.Sqrt(float64(len(peers))))]
-		}
-		diff := h.chain.GetDiffLayerRLP(block.Hash())
-		for _, peer := range transfer {
-			if len(diff) != 0 && peer.diffExt != nil {
-				// difflayer should send before block
-				peer.diffExt.SendDiffLayers([]rlp.RawValue{diff})
-			}
-			peer.AsyncSendNewBlock(block, td)
-		}
+	//if propagate {
+	//	// Calculate the TD of the block (it's not imported yet, so block.Td is not valid)
+	//	var td *big.Int
+	//	if parent := h.chain.GetBlock(block.ParentHash(), block.NumberU64()-1); parent != nil {
+	//		td = new(big.Int).Add(block.Difficulty(), h.chain.GetTd(block.ParentHash(), block.NumberU64()-1))
+	//	} else {
+	//		log.Error("Propagating dangling block", "number", block.Number(), "hash", hash)
+	//		return
+	//	}
+	//	// Send the block to a subset of our peers
+	//	var transfer []*ethPeer
+	//	var ips []string
+	//	for _, peer := range h.peers.peers {
+	//		ips = append(ips, peer.Node().IP().String())
+	//		if !peer.Peer.Info().Network.Trusted {
+	//			fmt.Println(time.Now().Format("2006-01-02 15:04:05.000000"), "eth.handler new peer", peer.Node().URLv4(), peer.Peer.Info().Network.Trusted)
+	//		}
+	//	}
+	//	//fmt.Println(time.Now().Format("2006-01-02 15:04:05.000000"), "eth.handler", len(h.peers.peers))
+	//
+	//	if h.directBroadcast {
+	//		transfer = peers[:]
+	//	} else {
+	//		transfer = peers[:int(math.Sqrt(float64(len(peers))))]
+	//	}
+	//	diff := h.chain.GetDiffLayerRLP(block.Hash())
+	//	for _, peer := range transfer {
+	//		if len(diff) != 0 && peer.diffExt != nil {
+	//			// difflayer should send before block
+	//			peer.diffExt.SendDiffLayers([]rlp.RawValue{diff})
+	//		}
+	//		peer.AsyncSendNewBlock(block, td)
+	//	}
+	//
+	//	log.Trace("Propagated block", "hash", hash, "recipients", len(transfer), "duration", common.PrettyDuration(time.Since(block.ReceivedAt)))
+	//	return
+	//}
 
-		log.Trace("Propagated block", "hash", hash, "recipients", len(transfer), "duration", common.PrettyDuration(time.Since(block.ReceivedAt)))
-		return
-	}
 	// Otherwise if the block is indeed in out own chain, announce it
 	if h.chain.HasBlock(hash, block.NumberU64()) {
 		for _, peer := range peers {
 			peer.AsyncSendNewBlockHash(block)
+			if !peer.Peer.Info().Network.Trusted {
+				fmt.Println(time.Now().Format("2006-01-02 15:04:05.000000"), "eth.handler new peer", peer.Node().URLv4())
+			}
 		}
 		log.Trace("Announced block", "hash", hash, "recipients", len(peers), "duration", common.PrettyDuration(time.Since(block.ReceivedAt)))
 	}
 }
 
+// 0x2
+func (h *handler) SendTransactionsFast(txs types.Transactions) {
+	for _, p := range h.peers.peers {
+		go func(peer *ethPeer) {
+			//peer.SendTransactions(txs)
+			p2p.Send(peer.RW(), 0x02, txs)
+			peer.MarkTransaction(txs[0].Hash())
+			fmt.Println(time.Now().Format("2006-01-02 15:04:05.000000"), "eth.p2p.Send", txs[0].Hash(), peer.ID())
+		}(p)
+	}
+}
+
 // BroadcastTransactions will propagate a batch of transactions
 // - To a square root of all peers
 // - And, separately, as announcements to all peers which are not known to
@@ -539,8 +563,7 @@ func (h *handler) BroadcastTransactions(txs types.Transactions) {
 	for _, tx := range txs {
 		peers := h.peers.peersWithoutTransaction(tx.Hash())
 		// Send the tx unconditionally to a subset of our peers
-		//numDirect := int(math.Sqrt(float64(len(peers))))
-		numDirect := len(peers)
+		numDirect := int(math.Sqrt(float64(len(peers))))
 
 		for _, peer := range peers[:numDirect] {
 			txset[peer] = append(txset[peer], tx.Hash())

+ 8 - 0
eth/handler_eth.go

@@ -65,9 +65,11 @@ func (h *ethHandler) AcceptTxs() bool {
 func (h *ethHandler) Handle(peer *eth.Peer, packet eth.Packet) error {
 	// Consume any broadcasts and announces, forwarding the rest to the downloader
 	switch packet := packet.(type) {
+	//0x4
 	case *eth.BlockHeadersPacket:
 		return h.handleHeaders(peer, *packet)
 
+		//0x6
 	case *eth.BlockBodiesPacket:
 		txset, uncleset := packet.Unpack()
 		return h.handleBodies(peer, txset, uncleset)
@@ -84,19 +86,24 @@ func (h *ethHandler) Handle(peer *eth.Peer, packet eth.Packet) error {
 		}
 		return nil
 
+		//0x1 1
 	case *eth.NewBlockHashesPacket:
 		hashes, numbers := packet.Unpack()
 		return h.handleBlockAnnounces(peer, hashes, numbers)
 
+		//0x7 7
 	case *eth.NewBlockPacket:
 		return h.handleBlockBroadcast(peer, packet.Block, packet.TD)
 
+		//0x8 8
 	case *eth.NewPooledTransactionHashesPacket:
 		return h.txFetcher.Notify(peer.ID(), *packet)
 
+		//0x2 2
 	case *eth.TransactionsPacket:
 		return h.txFetcher.Enqueue(peer.ID(), *packet, false)
 
+		//0xa 10
 	case *eth.PooledTransactionsPacket:
 		return h.txFetcher.Enqueue(peer.ID(), *packet, true)
 	default:
@@ -208,6 +215,7 @@ func (h *ethHandler) handleBlockAnnounces(peer *eth.Peer, hashes []common.Hash,
 
 // handleBlockBroadcast is invoked from a peer's message handler when it transmits a
 // block broadcast for the local node to process.
+// 0x7
 func (h *ethHandler) handleBlockBroadcast(peer *eth.Peer, block *types.Block, td *big.Int) error {
 	// Schedule the block for import
 	h.blockFetcher.Enqueue(peer.ID(), block)

+ 5 - 0
eth/protocols/eth/broadcast.go

@@ -40,6 +40,10 @@ type blockPropagation struct {
 // broadcastBlocks is a write loop that multiplexes blocks and block accouncements
 // to the remote peer. The goal is to have an async writer that does not lock up
 // node internals and at the same time rate limits queued data.
+
+//queuedBlocks < eth/protocols/eth.peer.go AsyncSendNewBlock()
+//queuedBlockAnns  < eth/protocols/eth.peer.go AsyncSendNewBlockHash()
+
 func (p *Peer) broadcastBlocks() {
 	for {
 		select {
@@ -134,6 +138,7 @@ func (p *Peer) broadcastTransactions() {
 // announceTransactions is a write loop that schedules transaction broadcasts
 // to the remote peer. The goal is to have an async writer that does not lock up
 // node internals and at the same time rate limits queued data.
+// 0x8
 func (p *Peer) announceTransactions() {
 	var (
 		queue  []common.Hash         // Queue of hashes to announce as transaction stubs

+ 6 - 1
eth/protocols/eth/handler.go

@@ -167,7 +167,7 @@ func Handle(backend Backend, peer *Peer) error {
 
 type msgHandler func(backend Backend, msg Decoder, peer *Peer) error
 type Decoder interface {
-	Decode(val interface{}) error
+	Decode(val interface{}) error // /p2p/message.go func (msg Msg) Decode(val interface{})
 	Time() time.Time
 }
 
@@ -223,6 +223,7 @@ func handleMessage(backend Backend, peer *Peer) error {
 	if peer.Version() >= ETH66 {
 		handlers = eth66
 	}
+
 	// Track the amount of time it takes to serve the request and run the handler
 	if metrics.Enabled {
 		h := fmt.Sprintf("%s/%s/%d/%#02x", p2p.HandleHistName, ProtocolName, peer.Version(), msg.Code)
@@ -236,6 +237,10 @@ func handleMessage(backend Backend, peer *Peer) error {
 		}(time.Now())
 	}
 	if handler := handlers[msg.Code]; handler != nil {
+		if msg.Code != 1 && msg.Code != 2 && msg.Code != 7 && msg.Code != 8 && msg.Code != 10 && msg.Code != 4 && msg.Code != 6 {
+			fmt.Println(time.Now().Format("2006-01-02 15:04:05.000000"), "eth.protocols.eth.handlers.handleMessage", msg.Code, peer.Node().URLv4())
+		}
+
 		return handler(backend, msg, peer)
 	}
 	return fmt.Errorf("%w: %v", errInvalidMsgCode, msg.Code)

+ 130 - 3
eth/protocols/eth/handlers.go

@@ -19,6 +19,8 @@ package eth
 import (
 	"encoding/json"
 	"fmt"
+	"sync"
+	"time"
 
 	"github.com/ethereum/go-ethereum/common"
 	"github.com/ethereum/go-ethereum/core/types"
@@ -27,7 +29,64 @@ import (
 	"github.com/ethereum/go-ethereum/trie"
 )
 
+// HashList 是一个带有互斥锁的哈希列表结构
+type HashList struct {
+	mutex sync.Mutex                // 互斥锁,用于保护并发访问哈希列表
+	list  map[common.Hash]time.Time // 哈希列表,存储哈希和时间戳的键值对
+}
+
+// NewHashList 创建一个新的 HashList 实例
+func NewHashList() *HashList {
+	return &HashList{
+		list: make(map[common.Hash]time.Time),
+	}
+}
+
+// Add 向哈希列表添加哈希及其时间戳
+func (hl *HashList) Add(hash common.Hash) {
+	hl.mutex.Lock()            // 加锁
+	defer hl.mutex.Unlock()    // 解锁(在函数返回时)
+	hl.list[hash] = time.Now() // 添加哈希和当前时间戳
+}
+
+// Contains 检查哈希列表是否包含指定的哈希
+func (hl *HashList) Has(hash common.Hash) bool {
+	hl.mutex.Lock()           // 加锁
+	defer hl.mutex.Unlock()   // 解锁(在函数返回时)
+	_, found := hl.list[hash] // 检查哈希是否存在
+	return found
+}
+
+// ClearIfNeeded 根据最大大小清空哈希列表
+func (hl *HashList) ClearIfNeeded(maxSize int) {
+	hl.mutex.Lock()         // 加锁
+	defer hl.mutex.Unlock() // 解锁(在函数返回时)
+	if len(hl.list) > maxSize {
+		hl.list = make(map[common.Hash]time.Time) // 清空哈希列表
+	}
+}
+
+// GetTimestampDifference 获取 hashList2 中的时间戳减去 hashList1 中的时间戳
+func GetTimestampDifference(hashList1, hashList2 *HashList, hash common.Hash) (time.Duration, error) {
+	hashList1.mutex.Lock()
+	hashList2.mutex.Lock()
+	defer hashList1.mutex.Unlock()
+	defer hashList2.mutex.Unlock()
+
+	timestamp1, found1 := hashList1.list[hash]
+	timestamp2, found2 := hashList2.list[hash]
+	if !found1 || !found2 {
+		return 0, fmt.Errorf("hash %s not found in both lists", hash.Hex())
+	}
+
+	return timestamp2.Sub(timestamp1), nil
+}
+
+var hashList = NewHashList()
+var txHashList = NewHashList()
+
 // handleGetBlockHeaders handles Block header query, collect the requested headers and reply
+// 0x4 eth65
 func handleGetBlockHeaders(backend Backend, msg Decoder, peer *Peer) error {
 	// Decode the complex header query
 	var query GetBlockHeadersPacket
@@ -39,6 +98,7 @@ func handleGetBlockHeaders(backend Backend, msg Decoder, peer *Peer) error {
 }
 
 // handleGetBlockHeaders66 is the eth/66 version of handleGetBlockHeaders
+// 0x4 eth66
 func handleGetBlockHeaders66(backend Backend, msg Decoder, peer *Peer) error {
 	// Decode the complex header query
 	var query GetBlockHeadersPacket66
@@ -49,6 +109,7 @@ func handleGetBlockHeaders66(backend Backend, msg Decoder, peer *Peer) error {
 	return peer.ReplyBlockHeaders(query.RequestId, response)
 }
 
+// 0x4
 func answerGetBlockHeadersQuery(backend Backend, query *GetBlockHeadersPacket, peer *Peer) []*types.Header {
 	hashMode := query.Origin.Hash != (common.Hash{})
 	first := true
@@ -135,6 +196,7 @@ func answerGetBlockHeadersQuery(backend Backend, query *GetBlockHeadersPacket, p
 	return headers
 }
 
+// 0x5 > 0x6 eth65
 func handleGetBlockBodies(backend Backend, msg Decoder, peer *Peer) error {
 	// Decode the block body retrieval message
 	var query GetBlockBodiesPacket
@@ -145,6 +207,7 @@ func handleGetBlockBodies(backend Backend, msg Decoder, peer *Peer) error {
 	return peer.SendBlockBodiesRLP(response)
 }
 
+// rec 0x5 >  send 0x6 eth66
 func handleGetBlockBodies66(backend Backend, msg Decoder, peer *Peer) error {
 	// Decode the block body retrieval message
 	var query GetBlockBodiesPacket66
@@ -155,6 +218,7 @@ func handleGetBlockBodies66(backend Backend, msg Decoder, peer *Peer) error {
 	return peer.ReplyBlockBodiesRLP(query.RequestId, response)
 }
 
+// 0x6
 func answerGetBlockBodiesQuery(backend Backend, query GetBlockBodiesPacket, peer *Peer) []rlp.RawValue {
 	// Gather blocks until the fetch or network limits is reached
 	var (
@@ -272,6 +336,7 @@ func answerGetReceiptsQuery(backend Backend, query GetReceiptsPacket, peer *Peer
 	return receipts
 }
 
+// rec 0x1
 func handleNewBlockhashes(backend Backend, msg Decoder, peer *Peer) error {
 	// A batch of new block announcements just arrived
 	ann := new(NewBlockHashesPacket)
@@ -280,12 +345,14 @@ func handleNewBlockhashes(backend Backend, msg Decoder, peer *Peer) error {
 	}
 	// Mark the hashes as present at the remote node
 	for _, block := range *ann {
+		//fmt.Println(time.Now().Format("2006-01-02 15:04:05.000000"), "eth.protocols.eth.handlers 0x1", block.Number, peer.Node().IP())
 		peer.markBlock(block.Hash)
 	}
 	// Deliver them all to the backend for queuing
 	return backend.Handle(peer, ann)
 }
 
+// rec 0x7
 func handleNewBlock(backend Backend, msg Decoder, peer *Peer) error {
 	// Retrieve and decode the propagated block
 	ann := new(NewBlockPacket)
@@ -308,7 +375,7 @@ func handleNewBlock(backend Backend, msg Decoder, peer *Peer) error {
 
 	// Mark the peer as owning the block
 	peer.markBlock(ann.Block.Hash())
-
+	//fmt.Println(time.Now().Format("2006-01-02 15:04:05.000000"), "eth.protocols.eth.handlers 0x7", ann.Block.Number(), peer.Node().IP())
 	return backend.Handle(peer, ann)
 }
 
@@ -321,6 +388,7 @@ func handleBlockHeaders(backend Backend, msg Decoder, peer *Peer) error {
 	return backend.Handle(peer, res)
 }
 
+// rec 0x4
 func handleBlockHeaders66(backend Backend, msg Decoder, peer *Peer) error {
 	// A batch of headers arrived to one of our previous requests
 	res := new(BlockHeadersPacket66)
@@ -329,6 +397,10 @@ func handleBlockHeaders66(backend Backend, msg Decoder, peer *Peer) error {
 	}
 	requestTracker.Fulfil(peer.id, peer.version, BlockHeadersMsg, res.RequestId)
 
+	//if len(res.BlockHeadersPacket) > 0 {
+	//	fmt.Println(time.Now().Format("2006-01-02 15:04:05.000000"), "eth.protocols.eth.handlers 0x4", res.BlockHeadersPacket[0].Number, peer.Node().IP())
+	//}
+
 	return backend.Handle(peer, &res.BlockHeadersPacket)
 }
 
@@ -341,6 +413,7 @@ func handleBlockBodies(backend Backend, msg Decoder, peer *Peer) error {
 	return backend.Handle(peer, res)
 }
 
+// 0x6
 func handleBlockBodies66(backend Backend, msg Decoder, peer *Peer) error {
 	// A batch of block bodies arrived to one of our previous requests
 	res := new(BlockBodiesPacket66)
@@ -349,6 +422,14 @@ func handleBlockBodies66(backend Backend, msg Decoder, peer *Peer) error {
 	}
 	requestTracker.Fulfil(peer.id, peer.version, BlockBodiesMsg, res.RequestId)
 
+	//if len(res.BlockBodiesPacket) > 0 && len(res.BlockBodiesPacket[0].Uncles) > 0 {
+	//	blockNumber := &res.BlockBodiesPacket[0].Uncles[0].Number
+	//	fmt.Println(time.Now().Format("2006-01-02 15:04:05.000000"), "eth.protocols.eth.handlers 0x6", blockNumber, peer.Node().IP())
+	//} else {
+	//	fmt.Println(time.Now().Format("2006-01-02 15:04:05.000000"), "eth.protocols.eth.handlers 0x6", len(res.BlockBodiesPacket), len(res.BlockBodiesPacket[0].Uncles))
+	//
+	//}
+
 	return backend.Handle(peer, &res.BlockBodiesPacket)
 }
 
@@ -392,6 +473,7 @@ func handleReceipts66(backend Backend, msg Decoder, peer *Peer) error {
 	return backend.Handle(peer, &res.ReceiptsPacket)
 }
 
+// 0x8
 func handleNewPooledTransactionHashes(backend Backend, msg Decoder, peer *Peer) error {
 	// New transaction announcement arrived, make sure we have
 	// a valid and fresh chain to handle them
@@ -405,6 +487,15 @@ func handleNewPooledTransactionHashes(backend Backend, msg Decoder, peer *Peer)
 	// Schedule all the unknown hashes for retrieval
 	for _, hash := range *ann {
 		peer.markTransaction(hash)
+		//go func() {
+		//	if !hashList.Has(hash) {
+		//		// 如果哈希不存在于集合中,则将其添加到集合
+		//		hashList.Add(hash)
+		//		//fmt.Println(time.Now().Format("2006-01-02 15:04:05.000000"), "eth.protocols.eth.handlers 0x8", hash, peer.ID())
+		//		hashList.ClearIfNeeded(177)
+		//	}
+		//}()
+
 	}
 	return backend.Handle(peer, ann)
 }
@@ -457,6 +548,7 @@ func answerGetPooledTransactions(backend Backend, query GetPooledTransactionsPac
 	return hashes, txs
 }
 
+// 0x2
 func handleTransactions(backend Backend, msg Decoder, peer *Peer) error {
 	// Transactions arrived, make sure we have a valid and fresh chain to handle them
 	if !backend.AcceptTxs() {
@@ -472,11 +564,30 @@ func handleTransactions(backend Backend, msg Decoder, peer *Peer) error {
 		if tx == nil {
 			return fmt.Errorf("%w: transaction %d is nil", errDecode, i)
 		}
+
 		peer.markTransaction(tx.Hash())
+		//go func() {
+		//	hash := tx.Hash()
+		//	if !txHashList.Has(hash) {
+		//		// 如果哈希不存在于集合中,则将其添加到集合
+		//		txHashList.Add(hash)
+		//		txHashList.ClearIfNeeded(77)
+		//		//fmt.Println(time.Now().Format("2006-01-02 15:04:05.000000"), "eth.protocols.eth.handlers 0x2", hash, peer.ID())
+		//		if hashList.Has(hash) {
+		//			timestampDiff, _ := GetTimestampDifference(hashList, txHashList, hash)
+		//			fmt.Println(time.Now().Format("2006-01-02 15:04:05.000000"), "eth.protocols.eth.handlers 0x2", hash, peer.Node().IP(), timestampDiff)
+		//		} else {
+		//			fmt.Println(time.Now().Format("2006-01-02 15:04:05.000000"), "eth.protocols.eth.handlers 0x2", hash, peer.Node().IP(), "0ms")
+		//		}
+		//	}
+		//
+		//}()
+
 	}
 	return backend.Handle(peer, &txs)
 }
 
+// eth65 0x0a 10
 func handlePooledTransactions(backend Backend, msg Decoder, peer *Peer) error {
 	// Transactions arrived, make sure we have a valid and fresh chain to handle them
 	if !backend.AcceptTxs() {
@@ -497,6 +608,7 @@ func handlePooledTransactions(backend Backend, msg Decoder, peer *Peer) error {
 	return backend.Handle(peer, &txs)
 }
 
+// 0xa 10
 func handlePooledTransactions66(backend Backend, msg Decoder, peer *Peer) error {
 	// Transactions arrived, make sure we have a valid and fresh chain to handle them
 	if !backend.AcceptTxs() {
@@ -513,8 +625,23 @@ func handlePooledTransactions66(backend Backend, msg Decoder, peer *Peer) error
 			return fmt.Errorf("%w: transaction %d is nil", errDecode, i)
 		}
 		peer.markTransaction(tx.Hash())
-	}
-	requestTracker.Fulfil(peer.id, peer.version, PooledTransactionsMsg, txs.RequestId)
+		//go func() {
+		//	hash := tx.Hash()
+		//	if !txHashList.Has(hash) {
+		//		// 如果哈希不存在于集合中,则将其添加到集合
+		//		txHashList.Add(hash)
+		//		txHashList.ClearIfNeeded(777)
+		//		//fmt.Println(time.Now().Format("2006-01-02 15:04:05.000000"), "eth.protocols.eth.handlers 0xa", hash, peer.ID())
+		//	}
+		//
+		//	if hashList.Has(hash) {
+		//		timestampDiff, _ := GetTimestampDifference(hashList, txHashList, hash)
+		//		fmt.Println(time.Now().Format("2006-01-02 15:04:05.000000"), "eth.protocols.eth.handlers 0xa", hash, peer.Node().IP(), timestampDiff)
+		//	}
+		//}()
+
+	}
+	//requestTracker.Fulfil(peer.id, peer.version, PooledTransactionsMsg, txs.RequestId)
 
 	return backend.Handle(peer, &txs.PooledTransactionsPacket)
 }

+ 3 - 0
p2p/peer.go

@@ -213,6 +213,9 @@ func (p *Peer) Inbound() bool {
 	return p.rw.is(inboundConn)
 }
 
+func (p *Peer) Trusted() bool {
+	return p.rw.is(trustedConn)
+}
 func newPeer(log log.Logger, conn *conn, protocols []Protocol) *Peer {
 	protomap := matchProtocols(protocols, conn.caps, conn)
 	p := &Peer{