Bläddra i källkod

eth, metrics, p2p: prepare metrics and net packets to eth/62

Péter Szilágyi 10 år sedan
förälder
incheckning
c51e153b5c
7 ändrade filer med 149 tillägg och 48 borttagningar
  1. 2 1
      eth/downloader/downloader.go
  2. 1 14
      eth/handler.go
  3. 84 0
      eth/metrics.go
  4. 30 20
      eth/peer.go
  5. 19 6
      eth/protocol.go
  6. 6 6
      metrics/metrics.go
  7. 7 1
      p2p/metrics.go

+ 2 - 1
eth/downloader/downloader.go

@@ -39,6 +39,7 @@ import (
 const (
 	eth60 = 60 // Constant to check for old protocol support
 	eth61 = 61 // Constant to check for new protocol support
+	eth62 = 62 // Constant to check for experimental protocol support
 )
 
 var (
@@ -329,7 +330,7 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e
 		if err = d.fetchBlocks60(); err != nil {
 			return err
 		}
-	case eth61:
+	case eth61, eth62:
 		// New eth/61, use forward, concurrent hash and block retrieval algorithm
 		number, err := d.findAncestor(p)
 		if err != nil {

+ 1 - 14
eth/handler.go

@@ -176,7 +176,7 @@ func (pm *ProtocolManager) Stop() {
 }
 
 func (pm *ProtocolManager) newPeer(pv, nv int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer {
-	return newPeer(pv, nv, p, rw)
+	return newPeer(pv, nv, p, newMeteredMsgWriter(rw))
 }
 
 // handle is the callback invoked to manage the life cycle of an eth peer. When
@@ -281,14 +281,11 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
 	case BlockHashesMsg:
 		// A batch of hashes arrived to one of our previous requests
 		msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size))
-		reqHashInPacketsMeter.Mark(1)
 
 		var hashes []common.Hash
 		if err := msgStream.Decode(&hashes); err != nil {
 			break
 		}
-		reqHashInTrafficMeter.Mark(int64(32 * len(hashes)))
-
 		// Deliver them all to the downloader for queuing
 		err := pm.downloader.DeliverHashes(p.id, hashes)
 		if err != nil {
@@ -340,7 +337,6 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
 	case BlocksMsg:
 		// Decode the arrived block message
 		msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size))
-		reqBlockInPacketsMeter.Mark(1)
 
 		var blocks []*types.Block
 		if err := msgStream.Decode(&blocks); err != nil {
@@ -349,7 +345,6 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
 		}
 		// Update the receive timestamp of each block
 		for _, block := range blocks {
-			reqBlockInTrafficMeter.Mark(block.Size().Int64())
 			block.ReceivedAt = msg.ReceivedAt
 		}
 		// Filter out any explicitly requested blocks, deliver the rest to the downloader
@@ -365,9 +360,6 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
 		if err := msgStream.Decode(&hashes); err != nil {
 			break
 		}
-		propHashInPacketsMeter.Mark(1)
-		propHashInTrafficMeter.Mark(int64(32 * len(hashes)))
-
 		// Mark the hashes as present at the remote node
 		for _, hash := range hashes {
 			p.MarkBlock(hash)
@@ -390,9 +382,6 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
 		if err := msg.Decode(&request); err != nil {
 			return errResp(ErrDecode, "%v: %v", msg, err)
 		}
-		propBlockInPacketsMeter.Mark(1)
-		propBlockInTrafficMeter.Mark(request.Block.Size().Int64())
-
 		if err := request.Block.ValidateFields(); err != nil {
 			return errResp(ErrDecode, "block validation %v: %v", msg, err)
 		}
@@ -427,7 +416,6 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
 		if err := msg.Decode(&txs); err != nil {
 			return errResp(ErrDecode, "msg %v: %v", msg, err)
 		}
-		propTxnInPacketsMeter.Mark(1)
 		for i, tx := range txs {
 			// Validate and mark the remote transaction
 			if tx == nil {
@@ -436,7 +424,6 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
 			p.MarkTransaction(tx.Hash())
 
 			// Log it's arrival for later analysis
-			propTxnInTrafficMeter.Mark(tx.Size().Int64())
 			jsonlogger.LogJson(&logger.EthTxReceived{
 				TxHash:   tx.Hash().Hex(),
 				RemoteId: p.ID().String(),

+ 84 - 0
eth/metrics.go

@@ -18,6 +18,7 @@ package eth
 
 import (
 	"github.com/ethereum/go-ethereum/metrics"
+	"github.com/ethereum/go-ethereum/p2p"
 )
 
 var (
@@ -41,4 +42,87 @@ var (
 	reqBlockInTrafficMeter   = metrics.NewMeter("eth/req/blocks/in/traffic")
 	reqBlockOutPacketsMeter  = metrics.NewMeter("eth/req/blocks/out/packets")
 	reqBlockOutTrafficMeter  = metrics.NewMeter("eth/req/blocks/out/traffic")
+	reqHeaderInPacketsMeter  = metrics.NewMeter("eth/req/header/in/packets")
+	reqHeaderInTrafficMeter  = metrics.NewMeter("eth/req/header/in/traffic")
+	reqHeaderOutPacketsMeter = metrics.NewMeter("eth/req/header/out/packets")
+	reqHeaderOutTrafficMeter = metrics.NewMeter("eth/req/header/out/traffic")
+	reqStateInPacketsMeter   = metrics.NewMeter("eth/req/state/in/packets")
+	reqStateInTrafficMeter   = metrics.NewMeter("eth/req/state/in/traffic")
+	reqStateOutPacketsMeter  = metrics.NewMeter("eth/req/state/out/packets")
+	reqStateOutTrafficMeter  = metrics.NewMeter("eth/req/state/out/traffic")
+	miscInPacketsMeter       = metrics.NewMeter("eth/misc/in/packets")
+	miscInTrafficMeter       = metrics.NewMeter("eth/misc/in/traffic")
+	miscOutPacketsMeter      = metrics.NewMeter("eth/misc/out/packets")
+	miscOutTrafficMeter      = metrics.NewMeter("eth/misc/out/traffic")
 )
+
+// meteredMsgReadWriter is a wrapper around a p2p.MsgReadWriter, capable of
+// accumulating the above defined metrics based on the data stream contents.
+type meteredMsgReadWriter struct {
+	p2p.MsgReadWriter
+}
+
+// newMeteredMsgWriter wraps a p2p MsgReadWriter with metering support. If the
+// metrics system is disabled, this fucntion returns the original object.
+func newMeteredMsgWriter(rw p2p.MsgReadWriter) p2p.MsgReadWriter {
+	if !metrics.Enabled {
+		return rw
+	}
+	return &meteredMsgReadWriter{rw}
+}
+
+func (rw *meteredMsgReadWriter) ReadMsg() (p2p.Msg, error) {
+	// Read the message and short circuit in case of an error
+	msg, err := rw.MsgReadWriter.ReadMsg()
+	if err != nil {
+		return msg, err
+	}
+	// Account for the data traffic
+	packets, traffic := miscInPacketsMeter, miscInTrafficMeter
+	switch msg.Code {
+	case BlockHashesMsg:
+		packets, traffic = reqHashInPacketsMeter, reqHashInTrafficMeter
+	case BlocksMsg:
+		packets, traffic = reqBlockInPacketsMeter, reqBlockInTrafficMeter
+	case BlockHeadersMsg:
+		packets, traffic = reqHeaderInPacketsMeter, reqHeaderInTrafficMeter
+	case NodeDataMsg:
+		packets, traffic = reqStateInPacketsMeter, reqStateInTrafficMeter
+	case NewBlockHashesMsg:
+		packets, traffic = propHashInPacketsMeter, propHashInTrafficMeter
+	case NewBlockMsg:
+		packets, traffic = propBlockInPacketsMeter, propBlockInTrafficMeter
+	case TxMsg:
+		packets, traffic = propTxnInPacketsMeter, propTxnInTrafficMeter
+	}
+	packets.Mark(1)
+	traffic.Mark(int64(msg.Size))
+
+	return msg, err
+}
+
+func (rw *meteredMsgReadWriter) WriteMsg(msg p2p.Msg) error {
+	// Account for the data traffic
+	packets, traffic := miscOutPacketsMeter, miscOutTrafficMeter
+	switch msg.Code {
+	case BlockHashesMsg:
+		packets, traffic = reqHashOutPacketsMeter, reqHashOutTrafficMeter
+	case BlocksMsg:
+		packets, traffic = reqBlockOutPacketsMeter, reqBlockOutTrafficMeter
+	case BlockHeadersMsg:
+		packets, traffic = reqHeaderOutPacketsMeter, reqHeaderOutTrafficMeter
+	case NodeDataMsg:
+		packets, traffic = reqStateOutPacketsMeter, reqStateOutTrafficMeter
+	case NewBlockHashesMsg:
+		packets, traffic = propHashOutPacketsMeter, propHashOutTrafficMeter
+	case NewBlockMsg:
+		packets, traffic = propBlockOutPacketsMeter, propBlockOutTrafficMeter
+	case TxMsg:
+		packets, traffic = propTxnOutPacketsMeter, propTxnOutTrafficMeter
+	}
+	packets.Mark(1)
+	traffic.Mark(int64(msg.Size))
+
+	// Send the packet to the p2p layer
+	return rw.MsgReadWriter.WriteMsg(msg)
+}

+ 30 - 20
eth/peer.go

@@ -129,9 +129,7 @@ func (p *peer) MarkTransaction(hash common.Hash) {
 // SendTransactions sends transactions to the peer and includes the hashes
 // in its transaction hash set for future reference.
 func (p *peer) SendTransactions(txs types.Transactions) error {
-	propTxnOutPacketsMeter.Mark(1)
 	for _, tx := range txs {
-		propTxnOutTrafficMeter.Mark(tx.Size().Int64())
 		p.knownTxs.Add(tx.Hash())
 	}
 	return p2p.Send(p.rw, TxMsg, txs)
@@ -139,27 +137,17 @@ func (p *peer) SendTransactions(txs types.Transactions) error {
 
 // SendBlockHashes sends a batch of known hashes to the remote peer.
 func (p *peer) SendBlockHashes(hashes []common.Hash) error {
-	reqHashOutPacketsMeter.Mark(1)
-	reqHashOutTrafficMeter.Mark(int64(32 * len(hashes)))
-
 	return p2p.Send(p.rw, BlockHashesMsg, hashes)
 }
 
 // SendBlocks sends a batch of blocks to the remote peer.
 func (p *peer) SendBlocks(blocks []*types.Block) error {
-	reqBlockOutPacketsMeter.Mark(1)
-	for _, block := range blocks {
-		reqBlockOutTrafficMeter.Mark(block.Size().Int64())
-	}
 	return p2p.Send(p.rw, BlocksMsg, blocks)
 }
 
 // SendNewBlockHashes announces the availability of a number of blocks through
 // a hash notification.
 func (p *peer) SendNewBlockHashes(hashes []common.Hash) error {
-	propHashOutPacketsMeter.Mark(1)
-	propHashOutTrafficMeter.Mark(int64(32 * len(hashes)))
-
 	for _, hash := range hashes {
 		p.knownBlocks.Add(hash)
 	}
@@ -168,33 +156,55 @@ func (p *peer) SendNewBlockHashes(hashes []common.Hash) error {
 
 // SendNewBlock propagates an entire block to a remote peer.
 func (p *peer) SendNewBlock(block *types.Block, td *big.Int) error {
-	propBlockOutPacketsMeter.Mark(1)
-	propBlockOutTrafficMeter.Mark(block.Size().Int64())
-
 	p.knownBlocks.Add(block.Hash())
 	return p2p.Send(p.rw, NewBlockMsg, []interface{}{block, td})
 }
 
+// SendBlockHeaders sends a batch of block headers to the remote peer.
+func (p *peer) SendBlockHeaders(headers []*types.Header) error {
+	return p2p.Send(p.rw, BlockHeadersMsg, headers)
+}
+
+// SendNodeData sends a batch of arbitrary internal data, corresponding to the
+// hashes requested.
+func (p *peer) SendNodeData(data [][]byte) error {
+	return p2p.Send(p.rw, NodeDataMsg, data)
+}
+
 // RequestHashes fetches a batch of hashes from a peer, starting at from, going
 // towards the genesis block.
 func (p *peer) RequestHashes(from common.Hash) error {
-	glog.V(logger.Debug).Infof("Peer [%s] fetching hashes (%d) from %x...\n", p.id, downloader.MaxHashFetch, from[:4])
+	glog.V(logger.Debug).Infof("%v fetching hashes (%d) from %x...\n", p, downloader.MaxHashFetch, from[:4])
 	return p2p.Send(p.rw, GetBlockHashesMsg, getBlockHashesData{from, uint64(downloader.MaxHashFetch)})
 }
 
-// RequestHashesFromNumber fetches a batch of hashes from a peer, starting at the
-// requested block number, going upwards towards the genesis block.
+// RequestHashesFromNumber fetches a batch of hashes from a peer, starting at
+// the requested block number, going upwards towards the genesis block.
 func (p *peer) RequestHashesFromNumber(from uint64, count int) error {
-	glog.V(logger.Debug).Infof("Peer [%s] fetching hashes (%d) from #%d...\n", p.id, count, from)
+	glog.V(logger.Debug).Infof("%v fetching hashes (%d) from #%d...\n", p, count, from)
 	return p2p.Send(p.rw, GetBlockHashesFromNumberMsg, getBlockHashesFromNumberData{from, uint64(count)})
 }
 
 // RequestBlocks fetches a batch of blocks corresponding to the specified hashes.
 func (p *peer) RequestBlocks(hashes []common.Hash) error {
-	glog.V(logger.Debug).Infof("[%s] fetching %v blocks\n", p.id, len(hashes))
+	glog.V(logger.Debug).Infof("%v fetching %v blocks\n", p, len(hashes))
 	return p2p.Send(p.rw, GetBlocksMsg, hashes)
 }
 
+// RequestHeaders fetches a batch of blocks' headers corresponding to the
+// specified hashes.
+func (p *peer) RequestHeaders(hashes []common.Hash) error {
+	glog.V(logger.Debug).Infof("%v fetching %v headers\n", p, len(hashes))
+	return p2p.Send(p.rw, GetBlockHeadersMsg, hashes)
+}
+
+// RequestNodeData fetches a batch of arbitrary data from a node's known state
+// data, corresponding to the specified hashes.
+func (p *peer) RequestNodeData(hashes []common.Hash) error {
+	glog.V(logger.Debug).Infof("%v fetching %v state data\n", p, len(hashes))
+	return p2p.Send(p.rw, GetNodeDataMsg, hashes)
+}
+
 // Handshake executes the eth protocol handshake, negotiating version number,
 // network IDs, difficulties, head and genesis blocks.
 func (p *peer) Handshake(td *big.Int, head common.Hash, genesis common.Hash) error {

+ 19 - 6
eth/protocol.go

@@ -24,10 +24,10 @@ import (
 )
 
 // Supported versions of the eth protocol (first is primary).
-var ProtocolVersions = []uint{61, 60}
+var ProtocolVersions = []uint{62, 61, 60}
 
 // Number of implemented message corresponding to different protocol versions.
-var ProtocolLengths = []uint64{9, 8}
+var ProtocolLengths = []uint64{13, 9, 8}
 
 const (
 	NetworkId          = 1
@@ -36,6 +36,7 @@ const (
 
 // eth protocol message codes
 const (
+	// Protocol messages belonging to eth/60
 	StatusMsg = iota
 	NewBlockHashesMsg
 	TxMsg
@@ -44,7 +45,15 @@ const (
 	GetBlocksMsg
 	BlocksMsg
 	NewBlockMsg
+
+	// Protocol messages belonging to eth/61
 	GetBlockHashesFromNumberMsg
+
+	// Protocol messages belonging to eth/62
+	GetBlockHeadersMsg
+	BlockHeadersMsg
+	GetNodeDataMsg
+	NodeDataMsg
 )
 
 type errCode int
@@ -102,15 +111,14 @@ type statusData struct {
 	GenesisBlock    common.Hash
 }
 
-// getBlockHashesData is the network packet for the hash based block retrieval
-// message.
+// getBlockHashesData is the network packet for the hash based hash retrieval.
 type getBlockHashesData struct {
 	Hash   common.Hash
 	Amount uint64
 }
 
-// getBlockHashesFromNumberData is the network packet for the number based block
-// retrieval message.
+// getBlockHashesFromNumberData is the network packet for the number based hash
+// retrieval.
 type getBlockHashesFromNumberData struct {
 	Number uint64
 	Amount uint64
@@ -121,3 +129,8 @@ type newBlockData struct {
 	Block *types.Block
 	TD    *big.Int
 }
+
+// nodeDataData is the network response packet for a node data retrieval.
+type nodeDataData []struct {
+	Value []byte
+}

+ 6 - 6
metrics/metrics.go

@@ -31,8 +31,8 @@ import (
 // MetricsEnabledFlag is the CLI flag name to use to enable metrics collections.
 var MetricsEnabledFlag = "metrics"
 
-// enabled is the flag specifying if metrics are enable or not.
-var enabled = false
+// Enabled is the flag specifying if metrics are enable or not.
+var Enabled = false
 
 // Init enables or disables the metrics system. Since we need this to run before
 // any other code gets to create meters and timers, we'll actually do an ugly hack
@@ -41,7 +41,7 @@ func init() {
 	for _, arg := range os.Args {
 		if strings.TrimLeft(arg, "-") == MetricsEnabledFlag {
 			glog.V(logger.Info).Infof("Enabling metrics collection")
-			enabled = true
+			Enabled = true
 		}
 	}
 }
@@ -49,7 +49,7 @@ func init() {
 // NewMeter create a new metrics Meter, either a real one of a NOP stub depending
 // on the metrics flag.
 func NewMeter(name string) metrics.Meter {
-	if !enabled {
+	if !Enabled {
 		return new(metrics.NilMeter)
 	}
 	return metrics.GetOrRegisterMeter(name, metrics.DefaultRegistry)
@@ -58,7 +58,7 @@ func NewMeter(name string) metrics.Meter {
 // NewTimer create a new metrics Timer, either a real one of a NOP stub depending
 // on the metrics flag.
 func NewTimer(name string) metrics.Timer {
-	if !enabled {
+	if !Enabled {
 		return new(metrics.NilTimer)
 	}
 	return metrics.GetOrRegisterTimer(name, metrics.DefaultRegistry)
@@ -68,7 +68,7 @@ func NewTimer(name string) metrics.Timer {
 // process.
 func CollectProcessMetrics(refresh time.Duration) {
 	// Short circuit if the metrics system is disabled
-	if !enabled {
+	if !Enabled {
 		return
 	}
 	// Create the various data collectors

+ 7 - 1
p2p/metrics.go

@@ -38,8 +38,14 @@ type meteredConn struct {
 }
 
 // newMeteredConn creates a new metered connection, also bumping the ingress or
-// egress connection meter.
+// egress connection meter. If the metrics system is disabled, this function
+// returns the original object.
 func newMeteredConn(conn net.Conn, ingress bool) net.Conn {
+	// Short circuit if metrics are disabled
+	if !metrics.Enabled {
+		return conn
+	}
+	// Otherwise bump the connection counters and wrap the connection
 	if ingress {
 		ingressConnectMeter.Mark(1)
 	} else {