Procházet zdrojové kódy

进行节点P2P调式的一个过渡版本,添加大量注释 大量打印和参数修改。

410 před 2 roky
rodič
revize
efa97dd161
6 změnil soubory, kde provedl 68 přidání a 115 odebrání
  1. 2 81
      cmd/geth/main.go
  2. 1 1
      core/tx_pool.go
  3. 29 3
      eth/protocols/eth/peer.go
  4. 8 8
      p2p/dial.go
  5. 25 20
      p2p/discover/table.go
  6. 3 2
      p2p/enode/iter.go

+ 2 - 81
cmd/geth/main.go

@@ -19,21 +19,16 @@ package main
 
 import (
 	"fmt"
-	"github.com/ethereum/go-ethereum/arbitrage"
 	"os"
 	"sort"
 	"strconv"
-	"strings"
 	"time"
 
-	"github.com/ethereum/go-ethereum/accounts"
-	"github.com/ethereum/go-ethereum/accounts/keystore"
 	"github.com/ethereum/go-ethereum/cmd/utils"
 	"github.com/ethereum/go-ethereum/common"
 	"github.com/ethereum/go-ethereum/console/prompt"
 	"github.com/ethereum/go-ethereum/eth"
 	"github.com/ethereum/go-ethereum/eth/downloader"
-	"github.com/ethereum/go-ethereum/ethclient"
 	"github.com/ethereum/go-ethereum/internal/debug"
 	"github.com/ethereum/go-ethereum/internal/ethapi"
 	"github.com/ethereum/go-ethereum/internal/flags"
@@ -336,9 +331,6 @@ func geth(ctx *cli.Context) error {
 	stack, backend := makeFullNode(ctx)
 	defer stack.Close()
 
-	// 注入套利系统
-	arbitrage.RegisterHistoryArbitrage(stack, ctx, backend)
-
 	startNode(ctx, stack, backend)
 	stack.Wait()
 	return nil
@@ -353,53 +345,7 @@ func startNode(ctx *cli.Context, stack *node.Node, backend ethapi.Backend) {
 	// Start up the node itself
 	utils.StartNode(ctx, stack)
 
-	// Unlock any account specifically requested
-	unlockAccounts(ctx, stack)
-
-	// Register wallet event handlers to open and auto-derive wallets
-	events := make(chan accounts.WalletEvent, 16)
-	stack.AccountManager().Subscribe(events)
-
-	// Create a client to interact with local geth node.
-	rpcClient, err := stack.Attach()
-	if err != nil {
-		utils.Fatalf("Failed to attach to self: %v", err)
-	}
-	ethClient := ethclient.NewClient(rpcClient)
-
-	go func() {
-		// Open any wallets already attached
-		for _, wallet := range stack.AccountManager().Wallets() {
-			if err := wallet.Open(""); err != nil {
-				log.Warn("Failed to open wallet", "url", wallet.URL(), "err", err)
-			}
-		}
-		// Listen for wallet event till termination
-		for event := range events {
-			switch event.Kind {
-			case accounts.WalletArrived:
-				if err := event.Wallet.Open(""); err != nil {
-					log.Warn("New wallet appeared, failed to open", "url", event.Wallet.URL(), "err", err)
-				}
-			case accounts.WalletOpened:
-				status, _ := event.Wallet.Status()
-				log.Info("New wallet appeared", "url", event.Wallet.URL(), "status", status)
-
-				var derivationPaths []accounts.DerivationPath
-				if event.Wallet.URL().Scheme == "ledger" {
-					derivationPaths = append(derivationPaths, accounts.LegacyLedgerBaseDerivationPath)
-				}
-				derivationPaths = append(derivationPaths, accounts.DefaultBaseDerivationPath)
-
-				event.Wallet.SelfDerive(derivationPaths, ethClient)
-
-			case accounts.WalletDropped:
-				log.Info("Old wallet dropped", "url", event.Wallet.URL())
-				event.Wallet.Close()
-			}
-		}
-	}()
-
+	//不需要钱包了,手动签名
 	// Spawn a standalone goroutine for status synchronization monitoring,
 	// close the node when synchronization is complete if user required.
 	if ctx.GlobalBool(utils.ExitWhenSyncedFlag.Name) {
@@ -432,7 +378,7 @@ func startNode(ctx *cli.Context, stack *node.Node, backend ethapi.Backend) {
 		}
 		ethBackend, ok := backend.(*eth.EthAPIBackend)
 		if !ok {
-			utils.Fatalf("Ethereum service not running: %v", err)
+			utils.Fatalf("Ethereum service not running: %v")
 		}
 		// Set the gas price to the limits from the CLI and start mining
 		gasprice := utils.GlobalBig(ctx, utils.MinerGasPriceFlag.Name)
@@ -444,28 +390,3 @@ func startNode(ctx *cli.Context, stack *node.Node, backend ethapi.Backend) {
 		}
 	}
 }
-
-// unlockAccounts unlocks any account specifically requested.
-func unlockAccounts(ctx *cli.Context, stack *node.Node) {
-	var unlocks []string
-	inputs := strings.Split(ctx.GlobalString(utils.UnlockedAccountFlag.Name), ",")
-	for _, input := range inputs {
-		if trimmed := strings.TrimSpace(input); trimmed != "" {
-			unlocks = append(unlocks, trimmed)
-		}
-	}
-	// Short circuit if there is no account to unlock.
-	if len(unlocks) == 0 {
-		return
-	}
-	// If insecure account unlocking is not allowed if node's APIs are exposed to external.
-	// Print warning log to user and skip unlocking.
-	if !stack.Config().InsecureUnlockAllowed && stack.Config().ExtRPCEnabled() {
-		utils.Fatalf("Account unlock with HTTP access is forbidden!")
-	}
-	ks := stack.AccountManager().Backends(keystore.KeyStoreType)[0].(*keystore.KeyStore)
-	passwords := utils.MakePasswordList(ctx)
-	for i, account := range unlocks {
-		unlockAccount(ks, account, i, passwords)
-	}
-}

+ 1 - 1
core/tx_pool.go

@@ -846,7 +846,7 @@ func (pool *TxPool) AddLocalFast(tx *types.Transaction) error {
 	//这里进行tx的添加插播
 	pool.mu.Lock()
 	pool.all.Add(tx, true)
-	pool.queueTxEvent(tx)
+	pool.txFeed.Send(NewTxsEvent{[]*types.Transaction{tx}})
 	pool.mu.Unlock()
 	return nil
 }

+ 29 - 3
eth/protocols/eth/peer.go

@@ -17,12 +17,11 @@
 package eth
 
 import (
+	mapset "github.com/deckarep/golang-set"
 	"math/big"
 	"math/rand"
 	"sync"
 
-	mapset "github.com/deckarep/golang-set"
-
 	"github.com/ethereum/go-ethereum/common"
 	"github.com/ethereum/go-ethereum/core/types"
 	"github.com/ethereum/go-ethereum/p2p"
@@ -141,6 +140,10 @@ func (p *Peer) ID() string {
 	return p.id
 }
 
+func (p *Peer) RW() p2p.MsgReadWriter {
+	return p.rw
+}
+
 // Version retrieves the peer's negoatiated `eth` protocol version.
 func (p *Peer) Version() uint {
 	return p.version
@@ -194,6 +197,10 @@ func (p *Peer) markTransaction(hash common.Hash) {
 	p.knownTxs.Add(hash)
 }
 
+func (p *Peer) MarkTransaction(hash common.Hash) {
+	p.knownTxs.Add(hash)
+}
+
 // SendTransactions sends transactions to the peer and includes the hashes
 // in its transaction hash set for future reference.
 //
@@ -211,12 +218,14 @@ func (p *Peer) SendTransactions(txs types.Transactions) error {
 	for _, tx := range txs {
 		p.knownTxs.Add(tx.Hash())
 	}
+
 	return p2p.Send(p.rw, TransactionsMsg, txs)
 }
 
 // AsyncSendTransactions queues a list of transactions (by hash) to eventually
 // propagate to a remote peer. The number of pending sends are capped (new ones
 // will force old sends to be dropped)
+// 0x2 >eth/protocols/eth/broadcast.go broadcastTransactions()
 func (p *Peer) AsyncSendTransactions(hashes []common.Hash) {
 	select {
 	case p.txBroadcast <- hashes:
@@ -242,6 +251,7 @@ func (p *Peer) AsyncSendTransactions(hashes []common.Hash) {
 // This method is a helper used by the async transaction announcer. Don't call it
 // directly as the queueing (memory) and transmission (bandwidth) costs should
 // not be managed directly.
+// 0x8
 func (p *Peer) sendPooledTransactionHashes(hashes []common.Hash) error {
 	// Mark all the transactions as known, but ensure we don't overflow our limits
 	for p.knownTxs.Cardinality() > max(0, maxKnownTxs-len(hashes)) {
@@ -256,6 +266,7 @@ func (p *Peer) sendPooledTransactionHashes(hashes []common.Hash) error {
 // AsyncSendPooledTransactionHashes queues a list of transactions hashes to eventually
 // announce to a remote peer.  The number of pending sends are capped (new ones
 // will force old sends to be dropped)
+// 0x8 > eth/protocols/eth/broadcast.go announceTransactions
 func (p *Peer) AsyncSendPooledTransactionHashes(hashes []common.Hash) {
 	select {
 	case p.txAnnounce <- hashes:
@@ -280,6 +291,7 @@ func (p *Peer) AsyncSendPooledTransactionHashes(hashes []common.Hash) {
 //
 // Note, the method assumes the hashes are correct and correspond to the list of
 // transactions being sent.
+// 0xa eth65
 func (p *Peer) SendPooledTransactionsRLP(hashes []common.Hash, txs []rlp.RawValue) error {
 	// Mark all the transactions as known, but ensure we don't overflow our limits
 	for p.knownTxs.Cardinality() > max(0, maxKnownTxs-len(hashes)) {
@@ -292,6 +304,7 @@ func (p *Peer) SendPooledTransactionsRLP(hashes []common.Hash, txs []rlp.RawValu
 }
 
 // ReplyPooledTransactionsRLP is the eth/66 version of SendPooledTransactionsRLP.
+// 0xa  eth66
 func (p *Peer) ReplyPooledTransactionsRLP(id uint64, hashes []common.Hash, txs []rlp.RawValue) error {
 	// Mark all the transactions as known, but ensure we don't overflow our limits
 	for p.knownTxs.Cardinality() > max(0, maxKnownTxs-len(hashes)) {
@@ -309,6 +322,7 @@ func (p *Peer) ReplyPooledTransactionsRLP(id uint64, hashes []common.Hash, txs [
 
 // SendNewBlockHashes announces the availability of a number of blocks through
 // a hash notification.
+// 0x1
 func (p *Peer) SendNewBlockHashes(hashes []common.Hash, numbers []uint64) error {
 	// Mark all the block hashes as known, but ensure we don't overflow our limits
 	for p.knownBlocks.Cardinality() > max(0, maxKnownBlocks-len(hashes)) {
@@ -328,6 +342,7 @@ func (p *Peer) SendNewBlockHashes(hashes []common.Hash, numbers []uint64) error
 // AsyncSendNewBlockHash queues the availability of a block for propagation to a
 // remote peer. If the peer's broadcast queue is full, the event is silently
 // dropped.
+// 0x1 > broadcastBlocks > queuedBlockAnns > this.SendNewBlockHashes
 func (p *Peer) AsyncSendNewBlockHash(block *types.Block) {
 	select {
 	case p.queuedBlockAnns <- block:
@@ -342,6 +357,7 @@ func (p *Peer) AsyncSendNewBlockHash(block *types.Block) {
 }
 
 // SendNewBlock propagates an entire block to a remote peer.
+// 0x7
 func (p *Peer) SendNewBlock(block *types.Block, td *big.Int) error {
 	// Mark all the block hash as known, but ensure we don't overflow our limits
 	for p.knownBlocks.Cardinality() >= maxKnownBlocks {
@@ -356,6 +372,7 @@ func (p *Peer) SendNewBlock(block *types.Block, td *big.Int) error {
 
 // AsyncSendNewBlock queues an entire block for propagation to a remote peer. If
 // the peer's broadcast queue is full, the event is silently dropped.
+// 0x7  queuedBlocks > broadcastBlocks > queuedBlocks > this.SendNewBlock
 func (p *Peer) AsyncSendNewBlock(block *types.Block, td *big.Int) {
 	select {
 	case p.queuedBlocks <- &blockPropagation{block: block, td: td}:
@@ -370,11 +387,13 @@ func (p *Peer) AsyncSendNewBlock(block *types.Block, td *big.Int) {
 }
 
 // SendBlockHeaders sends a batch of block headers to the remote peer.
+// 0x4 eth65
 func (p *Peer) SendBlockHeaders(headers []*types.Header) error {
 	return p2p.Send(p.rw, BlockHeadersMsg, BlockHeadersPacket(headers))
 }
 
 // ReplyBlockHeaders is the eth/66 version of SendBlockHeaders.
+// 0x4 eth66
 func (p *Peer) ReplyBlockHeaders(id uint64, headers []*types.Header) error {
 	return p2p.Send(p.rw, BlockHeadersMsg, BlockHeadersPacket66{
 		RequestId:          id,
@@ -384,11 +403,13 @@ func (p *Peer) ReplyBlockHeaders(id uint64, headers []*types.Header) error {
 
 // SendBlockBodiesRLP sends a batch of block contents to the remote peer from
 // an already RLP encoded format.
+// 0x6 eth65
 func (p *Peer) SendBlockBodiesRLP(bodies []rlp.RawValue) error {
 	return p2p.Send(p.rw, BlockBodiesMsg, bodies) // Not packed into BlockBodiesPacket to avoid RLP decoding
 }
 
 // ReplyBlockBodiesRLP is the eth/66 version of SendBlockBodiesRLP.
+// 0x6 eth66
 func (p *Peer) ReplyBlockBodiesRLP(id uint64, bodies []rlp.RawValue) error {
 	// Not packed into BlockBodiesPacket to avoid RLP decoding
 	return p2p.Send(p.rw, BlockBodiesMsg, BlockBodiesRLPPacket66{
@@ -413,11 +434,13 @@ func (p *Peer) ReplyNodeData(id uint64, data [][]byte) error {
 
 // SendReceiptsRLP sends a batch of transaction receipts, corresponding to the
 // ones requested from an already RLP encoded format.
+// 0x10 eth65
 func (p *Peer) SendReceiptsRLP(receipts []rlp.RawValue) error {
 	return p2p.Send(p.rw, ReceiptsMsg, receipts) // Not packed into ReceiptsPacket to avoid RLP decoding
 }
 
 // ReplyReceiptsRLP is the eth/66 response to GetReceipts.
+// 0x10 eth66
 func (p *Peer) ReplyReceiptsRLP(id uint64, receipts []rlp.RawValue) error {
 	return p2p.Send(p.rw, ReceiptsMsg, ReceiptsRLPPacket66{
 		RequestId:         id,
@@ -427,6 +450,7 @@ func (p *Peer) ReplyReceiptsRLP(id uint64, receipts []rlp.RawValue) error {
 
 // RequestOneHeader is a wrapper around the header query functions to fetch a
 // single header. It is used solely by the fetcher.
+// 0x3 > get 0x4
 func (p *Peer) RequestOneHeader(hash common.Hash) error {
 	p.Log().Debug("Fetching single header", "hash", hash)
 	query := GetBlockHeadersPacket{
@@ -505,6 +529,7 @@ func (p *Peer) ExpectRequestHeadersByNumber(origin uint64, amount int, skip int,
 
 // RequestBodies fetches a batch of blocks' bodies corresponding to the hashes
 // specified.
+// 0x5  > get 0x6
 func (p *Peer) RequestBodies(hashes []common.Hash) error {
 	p.Log().Debug("Fetching batch of block bodies", "count", len(hashes))
 	if p.Version() >= ETH66 {
@@ -551,12 +576,13 @@ func (p *Peer) RequestReceipts(hashes []common.Hash) error {
 }
 
 // RequestTxs fetches a batch of transactions from a remote node.
+// 0x9 > 0x10
 func (p *Peer) RequestTxs(hashes []common.Hash) error {
 	p.Log().Debug("Fetching batch of transactions", "count", len(hashes))
 	if p.Version() >= ETH66 {
 		id := rand.Uint64()
 
-		requestTracker.Track(p.id, p.version, GetPooledTransactionsMsg, PooledTransactionsMsg, id)
+		//requestTracker.Track(p.id, p.version, GetPooledTransactionsMsg, PooledTransactionsMsg, id)
 		return p2p.Send(p.rw, GetPooledTransactionsMsg, &GetPooledTransactionsPacket66{
 			RequestId:                   id,
 			GetPooledTransactionsPacket: hashes,

+ 8 - 8
p2p/dial.go

@@ -42,7 +42,7 @@ const (
 
 	// Config for the "Looking for peers" message.
 	dialStatsLogInterval = 10 * time.Second // printed at most this often
-	dialStatsPeerLimit   = 3                // but not if more than this many dialed peers
+	dialStatsPeerLimit   = 60               // but not if more than this many dialed peers
 
 	// Endpoint resolution is throttled with bounded backoff.
 	initialResolveDelay = 60 * time.Second
@@ -85,13 +85,12 @@ var (
 // dialer creates outbound connections and submits them into Server.
 // Two types of peer connections can be created:
 //
-//  - static dials are pre-configured connections. The dialer attempts
-//    keep these nodes connected at all times.
-//
-//  - dynamic dials are created from node discovery results. The dialer
-//    continuously reads candidate nodes from its input iterator and attempts
-//    to create peer connections to nodes arriving through the iterator.
+//   - static dials are pre-configured connections. The dialer attempts
+//     keep these nodes connected at all times.
 //
+//   - dynamic dials are created from node discovery results. The dialer
+//     continuously reads candidate nodes from its input iterator and attempts
+//     to create peer connections to nodes arriving through the iterator.
 type dialScheduler struct {
 	dialConfig
 	setupFunc   dialSetupFunc
@@ -344,7 +343,8 @@ func (d *dialScheduler) logStats() {
 	if d.dialPeers < dialStatsPeerLimit && d.dialPeers < d.maxDialPeers {
 		d.log.Info("Looking for peers", "peercount", len(d.peers), "tried", d.doneSinceLastLog, "static", len(d.static))
 	}
-	d.doneSinceLastLog = 0
+	fmt.Println(time.Now().Format("2006-01-02 15:04:05.000000"), "Looking for peers", "peercount", len(d.peers), "tried", d.doneSinceLastLog, "static", len(d.static))
+	//d.doneSinceLastLog = 0
 	d.lastStatsLog = now
 }
 

+ 25 - 20
p2p/discover/table.go

@@ -40,26 +40,31 @@ import (
 )
 
 const (
-	alpha           = 3  // Kademlia concurrency factor
-	bucketSize      = 16 // Kademlia bucket size
-	maxReplacements = 10 // Size of per-bucket replacement list
-
-	// We keep buckets for the upper 1/15 of distances because
-	// it's very unlikely we'll ever encounter a node that's closer.
-	hashBits          = len(common.Hash{}) * 8
-	nBuckets          = hashBits / 15       // Number of buckets
-	bucketMinDistance = hashBits - nBuckets // Log distance of closest bucket
-
-	// IP address limits.
-	bucketIPLimit, bucketSubnet = 2, 24 // at most 2 addresses from the same /24
-	tableIPLimit, tableSubnet   = 10, 24
-
-	refreshInterval    = 30 * time.Minute
-	revalidateInterval = 10 * time.Second
-	copyNodesInterval  = 30 * time.Second
-	seedMinTableTime   = 5 * time.Minute
-	seedCount          = 30
-	seedMaxAge         = 5 * 24 * time.Hour
+	// 32, 256*256, 256*32, 32, 2, 12 24, 12 24 = 18000
+	// 32, 256*32, 256*32, 32, 2, 12 24, 12 24 = 9466
+	// 32, 256*32, 256*32, 32, 15, 12 24, 12 24 = 10136
+	// 32, 256*2, 256*2, 32, 15, 12 24, 12 24 = 2345
+	// 32, 256*1024, 256*32, 32, 15, 12 24, 12 24 = 21000
+
+	alpha           = 3        // Kademlia 并发因子,表示每一轮查找中并行发送的请求数量。
+	bucketSize      = 256 * 32 // Kademlia 桶的大小,表示每个桶中能够容纳的最大节点数。
+	maxReplacements = 256 * 16 // 每个桶中的替换列表的最大大小,用于存储替换节点。
+
+	// 我们保留上半部分 1/15 的距离的桶,因为几乎不可能遇到更接近的节点。
+	hashBits          = len(common.Hash{}) * 32
+	nBuckets          = hashBits / 15       // 桶的数量
+	bucketMinDistance = hashBits - nBuckets // 最近桶的距离的对数值
+
+	// IP 地址限制。
+	bucketIPLimit, bucketSubnet = 12, 24 // 每个桶中来自同一个 /24 子网的 IP 地址数量上限
+	tableIPLimit, tableSubnet   = 12, 24
+
+	refreshInterval    = 5 * time.Minute    // 刷新路由表的时间间隔
+	revalidateInterval = 10 * time.Second   // 重新验证节点的时间间隔
+	copyNodesInterval  = 30 * time.Second   // 复制节点的时间间隔
+	seedMinTableTime   = 5 * time.Minute    // 种子节点的最小保留时间
+	seedCount          = 10                 // 种子节点的数量
+	seedMaxAge         = 5 * 24 * time.Hour // 种子节点的最大存活时间
 )
 
 // Table is the 'node table', a Kademlia-like index of neighbor nodes. The table keeps

+ 3 - 2
p2p/enode/iter.go

@@ -17,10 +17,10 @@
 package enode
 
 import (
+	"fmt"
+	"github.com/ethereum/go-ethereum/common/gopool"
 	"sync"
 	"time"
-
-	"github.com/ethereum/go-ethereum/common/gopool"
 )
 
 // Iterator represents a sequence of nodes. The Next method moves to the next node in the
@@ -280,6 +280,7 @@ func (m *FairMix) deleteSource(s *mixSource) {
 func (m *FairMix) runSource(closed chan struct{}, s *mixSource) {
 	defer m.wg.Done()
 	defer close(s.next)
+	fmt.Println(time.Now().Format("2006-01-02 15:04:05.000000"), "p2p/server.go runSource")
 	for s.it.Next() {
 		n := s.it.Node()
 		select {