Bläddra i källkod

[R4R] add extension in eth protocol handshake to disable tx broadcast (#412)

* add extension for eth protocol handshake

* fix comments
yutianwu 4 år sedan
förälder
incheckning
610f6a5e63

+ 12 - 11
eth/backend.go

@@ -231,17 +231,18 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
 	}
 
 	if eth.handler, err = newHandler(&handlerConfig{
-		Database:        chainDb,
-		Chain:           eth.blockchain,
-		TxPool:          eth.txPool,
-		Network:         config.NetworkId,
-		Sync:            config.SyncMode,
-		BloomCache:      uint64(cacheLimit),
-		EventMux:        eth.eventMux,
-		Checkpoint:      checkpoint,
-		Whitelist:       config.Whitelist,
-		DirectBroadcast: config.DirectBroadcast,
-		DiffSync:        config.DiffSync,
+		Database:               chainDb,
+		Chain:                  eth.blockchain,
+		TxPool:                 eth.txPool,
+		Network:                config.NetworkId,
+		Sync:                   config.SyncMode,
+		BloomCache:             uint64(cacheLimit),
+		EventMux:               eth.eventMux,
+		Checkpoint:             checkpoint,
+		Whitelist:              config.Whitelist,
+		DirectBroadcast:        config.DirectBroadcast,
+		DiffSync:               config.DiffSync,
+		DisablePeerTxBroadcast: config.DisablePeerTxBroadcast,
 	}); err != nil {
 		return nil, err
 	}

+ 4 - 4
eth/downloader/peer.go

@@ -457,7 +457,7 @@ func (ps *peerSet) HeaderIdlePeers() ([]*peerConnection, int) {
 		defer p.lock.RUnlock()
 		return p.headerThroughput
 	}
-	return ps.idlePeers(eth.ETH65, eth.ETH66, idle, throughput)
+	return ps.idlePeers(eth.ETH65, eth.ETH67, idle, throughput)
 }
 
 // BodyIdlePeers retrieves a flat list of all the currently body-idle peers within
@@ -471,7 +471,7 @@ func (ps *peerSet) BodyIdlePeers() ([]*peerConnection, int) {
 		defer p.lock.RUnlock()
 		return p.blockThroughput
 	}
-	return ps.idlePeers(eth.ETH65, eth.ETH66, idle, throughput)
+	return ps.idlePeers(eth.ETH65, eth.ETH67, idle, throughput)
 }
 
 // ReceiptIdlePeers retrieves a flat list of all the currently receipt-idle peers
@@ -485,7 +485,7 @@ func (ps *peerSet) ReceiptIdlePeers() ([]*peerConnection, int) {
 		defer p.lock.RUnlock()
 		return p.receiptThroughput
 	}
-	return ps.idlePeers(eth.ETH65, eth.ETH66, idle, throughput)
+	return ps.idlePeers(eth.ETH65, eth.ETH67, idle, throughput)
 }
 
 // NodeDataIdlePeers retrieves a flat list of all the currently node-data-idle
@@ -499,7 +499,7 @@ func (ps *peerSet) NodeDataIdlePeers() ([]*peerConnection, int) {
 		defer p.lock.RUnlock()
 		return p.stateThroughput
 	}
-	return ps.idlePeers(eth.ETH65, eth.ETH66, idle, throughput)
+	return ps.idlePeers(eth.ETH65, eth.ETH67, idle, throughput)
 }
 
 // idlePeers retrieves a flat list of all currently idle peers satisfying the

+ 3 - 2
eth/ethconfig/config.go

@@ -124,8 +124,9 @@ type Config struct {
 	Genesis *core.Genesis `toml:",omitempty"`
 
 	// Protocol options
-	NetworkId uint64 // Network ID to use for selecting peers to connect to
-	SyncMode  downloader.SyncMode
+	NetworkId              uint64 // Network ID to use for selecting peers to connect to
+	SyncMode               downloader.SyncMode
+	DisablePeerTxBroadcast bool
 
 	// This can be set to list of enrtree:// URLs which will be queried for
 	// for nodes to connect to.

+ 6 - 0
eth/ethconfig/gen_config.go

@@ -20,6 +20,7 @@ func (c Config) MarshalTOML() (interface{}, error) {
 		Genesis                 *core.Genesis `toml:",omitempty"`
 		NetworkId               uint64
 		SyncMode                downloader.SyncMode
+		DisablePeerTxBroadcast  bool
 		EthDiscoveryURLs        []string
 		SnapDiscoveryURLs       []string
 		NoPruning               bool
@@ -68,6 +69,7 @@ func (c Config) MarshalTOML() (interface{}, error) {
 	enc.Genesis = c.Genesis
 	enc.NetworkId = c.NetworkId
 	enc.SyncMode = c.SyncMode
+	enc.DisablePeerTxBroadcast = c.DisablePeerTxBroadcast
 	enc.EthDiscoveryURLs = c.EthDiscoveryURLs
 	enc.SnapDiscoveryURLs = c.SnapDiscoveryURLs
 	enc.NoPruning = c.NoPruning
@@ -119,6 +121,7 @@ func (c *Config) UnmarshalTOML(unmarshal func(interface{}) error) error {
 		Genesis                 *core.Genesis `toml:",omitempty"`
 		NetworkId               *uint64
 		SyncMode                *downloader.SyncMode
+		DisablePeerTxBroadcast  *bool
 		EthDiscoveryURLs        []string
 		SnapDiscoveryURLs       []string
 		NoPruning               *bool
@@ -176,6 +179,9 @@ func (c *Config) UnmarshalTOML(unmarshal func(interface{}) error) error {
 	if dec.SyncMode != nil {
 		c.SyncMode = *dec.SyncMode
 	}
+	if dec.DisablePeerTxBroadcast != nil {
+		c.DisablePeerTxBroadcast = *dec.DisablePeerTxBroadcast
+	}
 	if dec.EthDiscoveryURLs != nil {
 		c.EthDiscoveryURLs = dec.EthDiscoveryURLs
 	}

+ 29 - 26
eth/handler.go

@@ -78,22 +78,24 @@ type txPool interface {
 // handlerConfig is the collection of initialization parameters to create a full
 // node network handler.
 type handlerConfig struct {
-	Database        ethdb.Database            // Database for direct sync insertions
-	Chain           *core.BlockChain          // Blockchain to serve data from
-	TxPool          txPool                    // Transaction pool to propagate from
-	Network         uint64                    // Network identifier to adfvertise
-	Sync            downloader.SyncMode       // Whether to fast or full sync
-	DiffSync        bool                      // Whether to diff sync
-	BloomCache      uint64                    // Megabytes to alloc for fast sync bloom
-	EventMux        *event.TypeMux            // Legacy event mux, deprecate for `feed`
-	Checkpoint      *params.TrustedCheckpoint // Hard coded checkpoint for sync challenges
-	Whitelist       map[uint64]common.Hash    // Hard coded whitelist for sync challenged
-	DirectBroadcast bool
+	Database               ethdb.Database            // Database for direct sync insertions
+	Chain                  *core.BlockChain          // Blockchain to serve data from
+	TxPool                 txPool                    // Transaction pool to propagate from
+	Network                uint64                    // Network identifier to adfvertise
+	Sync                   downloader.SyncMode       // Whether to fast or full sync
+	DiffSync               bool                      // Whether to diff sync
+	BloomCache             uint64                    // Megabytes to alloc for fast sync bloom
+	EventMux               *event.TypeMux            // Legacy event mux, deprecate for `feed`
+	Checkpoint             *params.TrustedCheckpoint // Hard coded checkpoint for sync challenges
+	Whitelist              map[uint64]common.Hash    // Hard coded whitelist for sync challenged
+	DirectBroadcast        bool
+	DisablePeerTxBroadcast bool
 }
 
 type handler struct {
-	networkID  uint64
-	forkFilter forkid.Filter // Fork ID filter, constant across the lifetime of the node
+	networkID              uint64
+	forkFilter             forkid.Filter // Fork ID filter, constant across the lifetime of the node
+	disablePeerTxBroadcast bool
 
 	fastSync        uint32 // Flag whether fast sync is enabled (gets disabled if we already have blocks)
 	snapSync        uint32 // Flag whether fast sync should operate on top of the snap protocol
@@ -138,18 +140,19 @@ func newHandler(config *handlerConfig) (*handler, error) {
 		config.EventMux = new(event.TypeMux) // Nicety initialization for tests
 	}
 	h := &handler{
-		networkID:       config.Network,
-		forkFilter:      forkid.NewFilter(config.Chain),
-		eventMux:        config.EventMux,
-		database:        config.Database,
-		txpool:          config.TxPool,
-		chain:           config.Chain,
-		peers:           newPeerSet(),
-		whitelist:       config.Whitelist,
-		directBroadcast: config.DirectBroadcast,
-		diffSync:        config.DiffSync,
-		txsyncCh:        make(chan *txsync),
-		quitSync:        make(chan struct{}),
+		networkID:              config.Network,
+		forkFilter:             forkid.NewFilter(config.Chain),
+		disablePeerTxBroadcast: config.DisablePeerTxBroadcast,
+		eventMux:               config.EventMux,
+		database:               config.Database,
+		txpool:                 config.TxPool,
+		chain:                  config.Chain,
+		peers:                  newPeerSet(),
+		whitelist:              config.Whitelist,
+		directBroadcast:        config.DirectBroadcast,
+		diffSync:               config.DiffSync,
+		txsyncCh:               make(chan *txsync),
+		quitSync:               make(chan struct{}),
 	}
 	if config.Sync == downloader.FullSync {
 		// The database seems empty as the current block is the genesis. Yet the fast
@@ -276,7 +279,7 @@ func (h *handler) runEthPeer(peer *eth.Peer, handler eth.Handler) error {
 		td      = h.chain.GetTd(hash, number)
 	)
 	forkID := forkid.NewID(h.chain.Config(), h.chain.Genesis().Hash(), h.chain.CurrentHeader().Number.Uint64())
-	if err := peer.Handshake(h.networkID, td, hash, genesis.Hash(), forkID, h.forkFilter); err != nil {
+	if err := peer.Handshake(h.networkID, td, hash, genesis.Hash(), forkID, h.forkFilter, &eth.UpgradeStatusExtension{DisablePeerTxBroadcast: h.disablePeerTxBroadcast}); err != nil {
 		peer.Log().Debug("Ethereum handshake failed", "err", err)
 		return err
 	}

+ 5 - 5
eth/handler_eth_test.go

@@ -271,7 +271,7 @@ func testRecvTransactions(t *testing.T, protocol uint) {
 		head    = handler.chain.CurrentBlock()
 		td      = handler.chain.GetTd(head.Hash(), head.NumberU64())
 	)
-	if err := src.Handshake(1, td, head.Hash(), genesis.Hash(), forkid.NewIDWithChain(handler.chain), forkid.NewFilter(handler.chain)); err != nil {
+	if err := src.Handshake(1, td, head.Hash(), genesis.Hash(), forkid.NewIDWithChain(handler.chain), forkid.NewFilter(handler.chain), nil); err != nil {
 		t.Fatalf("failed to run protocol handshake")
 	}
 	// Send the transaction to the sink and verify that it's added to the tx pool
@@ -333,7 +333,7 @@ func testSendTransactions(t *testing.T, protocol uint) {
 		head    = handler.chain.CurrentBlock()
 		td      = handler.chain.GetTd(head.Hash(), head.NumberU64())
 	)
-	if err := sink.Handshake(1, td, head.Hash(), genesis.Hash(), forkid.NewIDWithChain(handler.chain), forkid.NewFilter(handler.chain)); err != nil {
+	if err := sink.Handshake(1, td, head.Hash(), genesis.Hash(), forkid.NewIDWithChain(handler.chain), forkid.NewFilter(handler.chain), nil); err != nil {
 		t.Fatalf("failed to run protocol handshake")
 	}
 	// After the handshake completes, the source handler should stream the sink
@@ -532,7 +532,7 @@ func testCheckpointChallenge(t *testing.T, syncmode downloader.SyncMode, checkpo
 		head    = handler.chain.CurrentBlock()
 		td      = handler.chain.GetTd(head.Hash(), head.NumberU64())
 	)
-	if err := remote.Handshake(1, td, head.Hash(), genesis.Hash(), forkid.NewIDWithChain(handler.chain), forkid.NewFilter(handler.chain)); err != nil {
+	if err := remote.Handshake(1, td, head.Hash(), genesis.Hash(), forkid.NewIDWithChain(handler.chain), forkid.NewFilter(handler.chain), nil); err != nil {
 		t.Fatalf("failed to run protocol handshake")
 	}
 	// Connect a new peer and check that we receive the checkpoint challenge
@@ -616,7 +616,7 @@ func testBroadcastBlock(t *testing.T, peers, bcasts int) {
 		go source.handler.runEthPeer(sourcePeer, func(peer *eth.Peer) error {
 			return eth.Handle((*ethHandler)(source.handler), peer)
 		})
-		if err := sinkPeer.Handshake(1, td, genesis.Hash(), genesis.Hash(), forkid.NewIDWithChain(source.chain), forkid.NewFilter(source.chain)); err != nil {
+		if err := sinkPeer.Handshake(1, td, genesis.Hash(), genesis.Hash(), forkid.NewIDWithChain(source.chain), forkid.NewFilter(source.chain), nil); err != nil {
 			t.Fatalf("failed to run protocol handshake")
 		}
 		go eth.Handle(sink, sinkPeer)
@@ -689,7 +689,7 @@ func testBroadcastMalformedBlock(t *testing.T, protocol uint) {
 		genesis = source.chain.Genesis()
 		td      = source.chain.GetTd(genesis.Hash(), genesis.NumberU64())
 	)
-	if err := sink.Handshake(1, td, genesis.Hash(), genesis.Hash(), forkid.NewIDWithChain(source.chain), forkid.NewFilter(source.chain)); err != nil {
+	if err := sink.Handshake(1, td, genesis.Hash(), genesis.Hash(), forkid.NewIDWithChain(source.chain), forkid.NewFilter(source.chain), nil); err != nil {
 		t.Fatalf("failed to run protocol handshake")
 	}
 	// After the handshake completes, the source handler should stream the sink

+ 6 - 0
eth/protocols/eth/broadcast.go

@@ -122,6 +122,9 @@ func (p *Peer) broadcastTransactions() {
 		case <-fail:
 			failed = true
 
+		case <-p.txTerm:
+			return
+
 		case <-p.term:
 			return
 		}
@@ -189,6 +192,9 @@ func (p *Peer) announceTransactions() {
 		case <-fail:
 			failed = true
 
+		case <-p.txTerm:
+			return
+
 		case <-p.term:
 			return
 		}

+ 61 - 1
eth/protocols/eth/handshake.go

@@ -35,7 +35,7 @@ const (
 
 // Handshake executes the eth protocol handshake, negotiating version number,
 // network IDs, difficulties, head and genesis blocks.
-func (p *Peer) Handshake(network uint64, td *big.Int, head common.Hash, genesis common.Hash, forkID forkid.ID, forkFilter forkid.Filter) error {
+func (p *Peer) Handshake(network uint64, td *big.Int, head common.Hash, genesis common.Hash, forkID forkid.ID, forkFilter forkid.Filter, extension *UpgradeStatusExtension) error {
 	// Send out own handshake in a new thread
 	errc := make(chan error, 2)
 
@@ -68,6 +68,49 @@ func (p *Peer) Handshake(network uint64, td *big.Int, head common.Hash, genesis
 	}
 	p.td, p.head = status.TD, status.Head
 
+	if p.version >= ETH67 {
+		var upgradeStatus UpgradeStatusPacket // safe to read after two values have been received from errc
+		if extension == nil {
+			extension = &UpgradeStatusExtension{}
+		}
+		extensionRaw, err := extension.Encode()
+		if err != nil {
+			return err
+		}
+
+		gopool.Submit(func() {
+			errc <- p2p.Send(p.rw, UpgradeStatusMsg, &UpgradeStatusPacket{
+				Extension: extensionRaw,
+			})
+		})
+		gopool.Submit(func() {
+			errc <- p.readUpgradeStatus(&upgradeStatus)
+		})
+		timeout := time.NewTimer(handshakeTimeout)
+		defer timeout.Stop()
+		for i := 0; i < 2; i++ {
+			select {
+			case err := <-errc:
+				if err != nil {
+					return err
+				}
+			case <-timeout.C:
+				return p2p.DiscReadTimeout
+			}
+		}
+
+		extension, err := upgradeStatus.GetExtension()
+		if err != nil {
+			return err
+		}
+		p.statusExtension = extension
+
+		if p.statusExtension.DisablePeerTxBroadcast {
+			p.Log().Debug("peer does not need broadcast txs, closing broadcast routines")
+			p.CloseTxBroadcast()
+		}
+	}
+
 	// TD at mainnet block #7753254 is 76 bits. If it becomes 100 million times
 	// larger, it will still fit within 100 bits
 	if tdlen := p.td.BitLen(); tdlen > 100 {
@@ -106,3 +149,20 @@ func (p *Peer) readStatus(network uint64, status *StatusPacket, genesis common.H
 	}
 	return nil
 }
+
+func (p *Peer) readUpgradeStatus(status *UpgradeStatusPacket) error {
+	msg, err := p.rw.ReadMsg()
+	if err != nil {
+		return err
+	}
+	if msg.Code != UpgradeStatusMsg {
+		return fmt.Errorf("%w: upgrade status msg has code %x (!= %x)", errNoStatusMsg, msg.Code, UpgradeStatusMsg)
+	}
+	if msg.Size > maxMessageSize {
+		return fmt.Errorf("%w: %v > %v", errMsgTooLarge, msg.Size, maxMessageSize)
+	}
+	if err := msg.Decode(&status); err != nil {
+		return fmt.Errorf("%w: message %v: %v", errDecode, msg, err)
+	}
+	return nil
+}

+ 1 - 1
eth/protocols/eth/handshake_test.go

@@ -81,7 +81,7 @@ func testHandshake(t *testing.T, protocol uint) {
 		// Send the junk test with one peer, check the handshake failure
 		go p2p.Send(app, test.code, test.data)
 
-		err := peer.Handshake(1, td, head.Hash(), genesis.Hash(), forkID, forkid.NewFilter(backend.chain))
+		err := peer.Handshake(1, td, head.Hash(), genesis.Hash(), forkID, forkid.NewFilter(backend.chain), nil)
 		if err == nil {
 			t.Errorf("test %d: protocol returned nil error, want %q", i, test.want)
 		} else if !errors.Is(err, test.want) {

+ 28 - 5
eth/protocols/eth/peer.go

@@ -22,6 +22,7 @@ import (
 	"sync"
 
 	mapset "github.com/deckarep/golang-set"
+
 	"github.com/ethereum/go-ethereum/common"
 	"github.com/ethereum/go-ethereum/core/types"
 	"github.com/ethereum/go-ethereum/p2p"
@@ -68,9 +69,10 @@ func max(a, b int) int {
 type Peer struct {
 	id string // Unique ID for the peer, cached
 
-	*p2p.Peer                   // The embedded P2P package peer
-	rw        p2p.MsgReadWriter // Input/output streams for snap
-	version   uint              // Protocol version negotiated
+	*p2p.Peer                         // The embedded P2P package peer
+	rw              p2p.MsgReadWriter // Input/output streams for snap
+	version         uint              // Protocol version negotiated
+	statusExtension *UpgradeStatusExtension
 
 	head common.Hash // Latest advertised head block hash
 	td   *big.Int    // Latest advertised head block total difficulty
@@ -84,8 +86,9 @@ type Peer struct {
 	txBroadcast chan []common.Hash // Channel used to queue transaction propagation requests
 	txAnnounce  chan []common.Hash // Channel used to queue transaction announcement requests
 
-	term chan struct{} // Termination channel to stop the broadcasters
-	lock sync.RWMutex  // Mutex protecting the internal fields
+	term   chan struct{} // Termination channel to stop the broadcasters
+	txTerm chan struct{} // Termination channel to stop the tx broadcasters
+	lock   sync.RWMutex  // Mutex protecting the internal fields
 }
 
 // NewPeer create a wrapper for a network connection and negotiated  protocol
@@ -104,6 +107,7 @@ func NewPeer(version uint, p *p2p.Peer, rw p2p.MsgReadWriter, txpool TxPool) *Pe
 		txAnnounce:      make(chan []common.Hash),
 		txpool:          txpool,
 		term:            make(chan struct{}),
+		txTerm:          make(chan struct{}),
 	}
 	// Start up all the broadcasters
 	go peer.broadcastBlocks()
@@ -119,6 +123,17 @@ func NewPeer(version uint, p *p2p.Peer, rw p2p.MsgReadWriter, txpool TxPool) *Pe
 // clean it up!
 func (p *Peer) Close() {
 	close(p.term)
+
+	p.CloseTxBroadcast()
+}
+
+// CloseTxBroadcast signals the tx broadcast goroutine to terminate.
+func (p *Peer) CloseTxBroadcast() {
+	select {
+	case <-p.txTerm:
+	default:
+		close(p.txTerm)
+	}
 }
 
 // ID retrieves the peer's unique identifier.
@@ -212,6 +227,10 @@ func (p *Peer) AsyncSendTransactions(hashes []common.Hash) {
 		for _, hash := range hashes {
 			p.knownTxs.Add(hash)
 		}
+
+	case <-p.txTerm:
+		p.Log().Debug("Dropping transaction propagation", "count", len(hashes))
+
 	case <-p.term:
 		p.Log().Debug("Dropping transaction propagation", "count", len(hashes))
 	}
@@ -247,6 +266,10 @@ func (p *Peer) AsyncSendPooledTransactionHashes(hashes []common.Hash) {
 		for _, hash := range hashes {
 			p.knownTxs.Add(hash)
 		}
+
+	case <-p.txTerm:
+		p.Log().Debug("Dropping transaction announcement", "count", len(hashes))
+
 	case <-p.term:
 		p.Log().Debug("Dropping transaction announcement", "count", len(hashes))
 	}

+ 38 - 2
eth/protocols/eth/protocol.go

@@ -32,6 +32,7 @@ import (
 const (
 	ETH65 = 65
 	ETH66 = 66
+	ETH67 = 67
 )
 
 // ProtocolName is the official short name of the `eth` protocol used during
@@ -40,11 +41,11 @@ const ProtocolName = "eth"
 
 // ProtocolVersions are the supported versions of the `eth` protocol (first
 // is primary).
-var ProtocolVersions = []uint{ETH66, ETH65}
+var ProtocolVersions = []uint{ETH67, ETH66, ETH65}
 
 // protocolLengths are the number of implemented message corresponding to
 // different protocol versions.
-var protocolLengths = map[uint]uint64{ETH66: 17, ETH65: 17}
+var protocolLengths = map[uint]uint64{ETH67: 18, ETH66: 17, ETH65: 17}
 
 // maxMessageSize is the maximum cap on the size of a protocol message.
 const maxMessageSize = 10 * 1024 * 1024
@@ -68,6 +69,9 @@ const (
 	NewPooledTransactionHashesMsg = 0x08
 	GetPooledTransactionsMsg      = 0x09
 	PooledTransactionsMsg         = 0x0a
+
+	// Protocol messages overloaded in eth/66
+	UpgradeStatusMsg = 0x0b
 )
 
 var (
@@ -97,6 +101,35 @@ type StatusPacket struct {
 	ForkID          forkid.ID
 }
 
+type UpgradeStatusExtension struct {
+	DisablePeerTxBroadcast bool
+}
+
+func (e *UpgradeStatusExtension) Encode() (*rlp.RawValue, error) {
+	rawBytes, err := rlp.EncodeToBytes(e)
+	if err != nil {
+		return nil, err
+	}
+	raw := rlp.RawValue(rawBytes)
+	return &raw, nil
+}
+
+type UpgradeStatusPacket struct {
+	Extension *rlp.RawValue `rlp:"nil"`
+}
+
+func (p *UpgradeStatusPacket) GetExtension() (*UpgradeStatusExtension, error) {
+	extension := &UpgradeStatusExtension{}
+	if p.Extension == nil {
+		return extension, nil
+	}
+	err := rlp.DecodeBytes(*p.Extension, extension)
+	if err != nil {
+		return nil, err
+	}
+	return extension, nil
+}
+
 // NewBlockHashesPacket is the network packet for the block announcements.
 type NewBlockHashesPacket []struct {
 	Hash   common.Hash // Hash of one particular block being announced
@@ -324,6 +357,9 @@ type PooledTransactionsRLPPacket66 struct {
 func (*StatusPacket) Name() string { return "Status" }
 func (*StatusPacket) Kind() byte   { return StatusMsg }
 
+func (*UpgradeStatusPacket) Name() string { return "UpgradeStatus" }
+func (*UpgradeStatusPacket) Kind() byte   { return UpgradeStatusMsg }
+
 func (*NewBlockHashesPacket) Name() string { return "NewBlockHashes" }
 func (*NewBlockHashesPacket) Kind() byte   { return NewBlockHashesMsg }