Browse Source

add directbroadcast flag (#99)

zjubfd 4 years ago
parent
commit
7f7d1bb027
10 changed files with 47 additions and 24 deletions
  1. 1 0
      cmd/geth/main.go
  2. 1 0
      cmd/geth/usage.go
  3. 12 2
      cmd/utils/flags.go
  4. 1 1
      eth/backend.go
  5. 3 2
      eth/config.go
  6. 20 13
      eth/handler.go
  7. 3 3
      eth/handler_test.go
  8. 1 1
      eth/helper_test.go
  9. 2 2
      eth/protocol_test.go
  10. 3 0
      node/config.go

+ 1 - 0
cmd/geth/main.go

@@ -68,6 +68,7 @@ var (
 		utils.KeyStoreDirFlag,
 		utils.ExternalSignerFlag,
 		utils.NoUSBFlag,
+		utils.DirectBroadcastFlag,
 		utils.SmartCardDaemonPathFlag,
 		utils.OverrideIstanbulFlag,
 		utils.OverrideMuirGlacierFlag,

+ 1 - 0
cmd/geth/usage.go

@@ -70,6 +70,7 @@ var AppHelpFlagGroups = []flagGroup{
 			utils.AncientFlag,
 			utils.KeyStoreDirFlag,
 			utils.NoUSBFlag,
+			utils.DirectBroadcastFlag,
 			utils.SmartCardDaemonPathFlag,
 			utils.NetworkIdFlag,
 			utils.GoerliFlag,

+ 12 - 2
cmd/utils/flags.go

@@ -143,6 +143,10 @@ var (
 		Usage: "Data directory for the databases and keystore",
 		Value: DirectoryString(node.DefaultDataDir()),
 	}
+	DirectBroadcastFlag = cli.BoolFlag{
+		Name:  "directbroadcast",
+		Usage: "Enable directly broadcast mined block to all peers",
+	}
 	AncientFlag = DirectoryFlag{
 		Name:  "datadir.ancient",
 		Usage: "Data directory for ancient chain segments (default = inside chaindata)",
@@ -787,13 +791,13 @@ var (
 		Value: "",
 	}
 
-	InitNetworkIps= cli.StringFlag{
+	InitNetworkIps = cli.StringFlag{
 		Name:  "init.ips",
 		Usage: "the ips of each node in the network, example '192.168.0.1,192.168.0.2'",
 		Value: "",
 	}
 
-	InitNetworkPort= cli.IntFlag{
+	InitNetworkPort = cli.IntFlag{
 		Name:  "init.p2p-port",
 		Usage: "the p2p port of the nodes in the network",
 		Value: 30311,
@@ -1243,6 +1247,9 @@ func SetNodeConfig(ctx *cli.Context, cfg *node.Config) {
 	if ctx.GlobalIsSet(NoUSBFlag.Name) {
 		cfg.NoUSB = ctx.GlobalBool(NoUSBFlag.Name)
 	}
+	if ctx.GlobalIsSet(DirectBroadcastFlag.Name) {
+		cfg.DirectBroadcast = ctx.GlobalBool(DirectBroadcastFlag.Name)
+	}
 	if ctx.GlobalIsSet(InsecureUnlockAllowedFlag.Name) {
 		cfg.InsecureUnlockAllowed = ctx.GlobalBool(InsecureUnlockAllowedFlag.Name)
 	}
@@ -1519,6 +1526,9 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *eth.Config) {
 	if ctx.GlobalIsSet(GCModeFlag.Name) {
 		cfg.NoPruning = ctx.GlobalString(GCModeFlag.Name) == "archive"
 	}
+	if ctx.GlobalIsSet(DirectBroadcastFlag.Name) {
+		cfg.DirectBroadcast = ctx.GlobalBool(DirectBroadcastFlag.Name)
+	}
 	if ctx.GlobalIsSet(CacheNoPrefetchFlag.Name) {
 		cfg.NoPrefetch = ctx.GlobalBool(CacheNoPrefetchFlag.Name)
 	}

+ 1 - 1
eth/backend.go

@@ -213,7 +213,7 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) {
 	if checkpoint == nil {
 		checkpoint = params.TrustedCheckpoints[genesisHash]
 	}
-	if eth.protocolManager, err = NewProtocolManager(chainConfig, checkpoint, config.SyncMode, config.NetworkId, eth.eventMux, eth.txPool, eth.engine, eth.blockchain, chainDb, cacheLimit, config.Whitelist); err != nil {
+	if eth.protocolManager, err = NewProtocolManager(chainConfig, checkpoint, config.SyncMode, config.NetworkId, eth.eventMux, eth.txPool, eth.engine, eth.blockchain, chainDb, cacheLimit, config.Whitelist, config.DirectBroadcast); err != nil {
 		return nil, err
 	}
 

+ 3 - 2
eth/config.go

@@ -103,8 +103,9 @@ type Config struct {
 	// for nodes to connect to.
 	DiscoveryURLs []string
 
-	NoPruning  bool // Whether to disable pruning and flush everything to disk
-	NoPrefetch bool // Whether to disable prefetching and only load state on demand
+	NoPruning       bool // Whether to disable pruning and flush everything to disk
+	NoPrefetch      bool // Whether to disable prefetching and only load state on demand
+	DirectBroadcast bool
 
 	// Whitelist of required block number -> hash values to accept
 	Whitelist map[uint64]common.Hash `toml:"-"`

+ 20 - 13
eth/handler.go

@@ -64,8 +64,9 @@ type ProtocolManager struct {
 	networkID  uint64
 	forkFilter forkid.Filter // Fork ID filter, constant across the lifetime of the node
 
-	fastSync  uint32 // Flag whether fast sync is enabled (gets disabled if we already have blocks)
-	acceptTxs uint32 // Flag whether we're considered synchronised (enables transaction processing)
+	fastSync        uint32 // Flag whether fast sync is enabled (gets disabled if we already have blocks)
+	acceptTxs       uint32 // Flag whether we're considered synchronised (enables transaction processing)
+	directBroadcast bool
 
 	checkpointNumber uint64      // Block number for the sync progress validator to cross reference
 	checkpointHash   common.Hash // Block hash for the sync progress validator to cross reference
@@ -100,18 +101,19 @@ type ProtocolManager struct {
 
 // NewProtocolManager returns a new Ethereum sub protocol manager. The Ethereum sub protocol manages peers capable
 // with the Ethereum network.
-func NewProtocolManager(config *params.ChainConfig, checkpoint *params.TrustedCheckpoint, mode downloader.SyncMode, networkID uint64, mux *event.TypeMux, txpool txPool, engine consensus.Engine, blockchain *core.BlockChain, chaindb ethdb.Database, cacheLimit int, whitelist map[uint64]common.Hash) (*ProtocolManager, error) {
+func NewProtocolManager(config *params.ChainConfig, checkpoint *params.TrustedCheckpoint, mode downloader.SyncMode, networkID uint64, mux *event.TypeMux, txpool txPool, engine consensus.Engine, blockchain *core.BlockChain, chaindb ethdb.Database, cacheLimit int, whitelist map[uint64]common.Hash, directBroadcast bool) (*ProtocolManager, error) {
 	// Create the protocol manager with the base fields
 	manager := &ProtocolManager{
-		networkID:  networkID,
-		forkFilter: forkid.NewFilter(blockchain),
-		eventMux:   mux,
-		txpool:     txpool,
-		blockchain: blockchain,
-		peers:      newPeerSet(),
-		whitelist:  whitelist,
-		txsyncCh:   make(chan *txsync),
-		quitSync:   make(chan struct{}),
+		directBroadcast: directBroadcast,
+		networkID:       networkID,
+		forkFilter:      forkid.NewFilter(blockchain),
+		eventMux:        mux,
+		txpool:          txpool,
+		blockchain:      blockchain,
+		peers:           newPeerSet(),
+		whitelist:       whitelist,
+		txsyncCh:        make(chan *txsync),
+		quitSync:        make(chan struct{}),
 	}
 
 	if mode == downloader.FullSync {
@@ -821,7 +823,12 @@ func (pm *ProtocolManager) BroadcastBlock(block *types.Block, propagate bool) {
 			return
 		}
 		// Send the block to a subset of our peers
-		transfer := peers[:int(math.Sqrt(float64(len(peers))))]
+		var transfer []*peer
+		if pm.directBroadcast {
+			transfer = peers[:int(len(peers))]
+		} else {
+			transfer = peers[:int(math.Sqrt(float64(len(peers))))]
+		}
 		for _, peer := range transfer {
 			peer.AsyncSendNewBlock(block, td)
 		}

+ 3 - 3
eth/handler_test.go

@@ -495,7 +495,7 @@ func testCheckpointChallenge(t *testing.T, syncmode downloader.SyncMode, checkpo
 	if err != nil {
 		t.Fatalf("failed to create new blockchain: %v", err)
 	}
-	pm, err := NewProtocolManager(config, cht, syncmode, DefaultConfig.NetworkId, new(event.TypeMux), &testTxPool{pool: make(map[common.Hash]*types.Transaction)}, ethash.NewFaker(), blockchain, db, 1, nil)
+	pm, err := NewProtocolManager(config, cht, syncmode, DefaultConfig.NetworkId, new(event.TypeMux), &testTxPool{pool: make(map[common.Hash]*types.Transaction)}, ethash.NewFaker(), blockchain, db, 1, nil, false)
 	if err != nil {
 		t.Fatalf("failed to start test protocol manager: %v", err)
 	}
@@ -582,7 +582,7 @@ func testBroadcastBlock(t *testing.T, totalPeers, broadcastExpected int) {
 	if err != nil {
 		t.Fatalf("failed to create new blockchain: %v", err)
 	}
-	pm, err := NewProtocolManager(config, nil, downloader.FullSync, DefaultConfig.NetworkId, evmux, &testTxPool{pool: make(map[common.Hash]*types.Transaction)}, pow, blockchain, db, 1, nil)
+	pm, err := NewProtocolManager(config, nil, downloader.FullSync, DefaultConfig.NetworkId, evmux, &testTxPool{pool: make(map[common.Hash]*types.Transaction)}, pow, blockchain, db, 1, nil, false)
 	if err != nil {
 		t.Fatalf("failed to start test protocol manager: %v", err)
 	}
@@ -643,7 +643,7 @@ func TestBroadcastMalformedBlock(t *testing.T) {
 	if err != nil {
 		t.Fatalf("failed to create new blockchain: %v", err)
 	}
-	pm, err := NewProtocolManager(config, nil, downloader.FullSync, DefaultConfig.NetworkId, new(event.TypeMux), new(testTxPool), engine, blockchain, db, 1, nil)
+	pm, err := NewProtocolManager(config, nil, downloader.FullSync, DefaultConfig.NetworkId, new(event.TypeMux), new(testTxPool), engine, blockchain, db, 1, nil, false)
 	if err != nil {
 		t.Fatalf("failed to start test protocol manager: %v", err)
 	}

+ 1 - 1
eth/helper_test.go

@@ -68,7 +68,7 @@ func newTestProtocolManager(mode downloader.SyncMode, blocks int, generator func
 	if _, err := blockchain.InsertChain(chain); err != nil {
 		panic(err)
 	}
-	pm, err := NewProtocolManager(gspec.Config, nil, mode, DefaultConfig.NetworkId, evmux, &testTxPool{added: newtx, pool: make(map[common.Hash]*types.Transaction)}, engine, blockchain, db, 1, nil)
+	pm, err := NewProtocolManager(gspec.Config, nil, mode, DefaultConfig.NetworkId, evmux, &testTxPool{added: newtx, pool: make(map[common.Hash]*types.Transaction)}, engine, blockchain, db, 1, nil, false)
 	if err != nil {
 		return nil, nil, err
 	}

+ 2 - 2
eth/protocol_test.go

@@ -181,8 +181,8 @@ func TestForkIDSplit(t *testing.T) {
 		blocksNoFork, _  = core.GenerateChain(configNoFork, genesisNoFork, engine, dbNoFork, 2, nil)
 		blocksProFork, _ = core.GenerateChain(configProFork, genesisProFork, engine, dbProFork, 2, nil)
 
-		ethNoFork, _  = NewProtocolManager(configNoFork, nil, downloader.FullSync, 1, new(event.TypeMux), &testTxPool{pool: make(map[common.Hash]*types.Transaction)}, engine, chainNoFork, dbNoFork, 1, nil)
-		ethProFork, _ = NewProtocolManager(configProFork, nil, downloader.FullSync, 1, new(event.TypeMux), &testTxPool{pool: make(map[common.Hash]*types.Transaction)}, engine, chainProFork, dbProFork, 1, nil)
+		ethNoFork, _  = NewProtocolManager(configNoFork, nil, downloader.FullSync, 1, new(event.TypeMux), &testTxPool{pool: make(map[common.Hash]*types.Transaction)}, engine, chainNoFork, dbNoFork, 1, nil, false)
+		ethProFork, _ = NewProtocolManager(configProFork, nil, downloader.FullSync, 1, new(event.TypeMux), &testTxPool{pool: make(map[common.Hash]*types.Transaction)}, engine, chainProFork, dbProFork, 1, nil, false)
 	)
 	ethNoFork.Start(1000)
 	ethProFork.Start(1000)

+ 3 - 0
node/config.go

@@ -95,6 +95,9 @@ type Config struct {
 	// NoUSB disables hardware wallet monitoring and connectivity.
 	NoUSB bool `toml:",omitempty"`
 
+	// DirectBroadcast enable directly broadcast mined block to all peers
+	DirectBroadcast bool `toml:",omitempty"`
+
 	// SmartCardDaemonPath is the path to the smartcard daemon's socket
 	SmartCardDaemonPath string `toml:",omitempty"`