handler.go 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242
  1. // Copyright 2020 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package eth
  17. import (
  18. "fmt"
  19. "math/big"
  20. "time"
  21. "github.com/ethereum/go-ethereum/common"
  22. "github.com/ethereum/go-ethereum/core"
  23. "github.com/ethereum/go-ethereum/core/types"
  24. "github.com/ethereum/go-ethereum/metrics"
  25. "github.com/ethereum/go-ethereum/p2p"
  26. "github.com/ethereum/go-ethereum/p2p/enode"
  27. "github.com/ethereum/go-ethereum/p2p/enr"
  28. "github.com/ethereum/go-ethereum/params"
  29. "github.com/ethereum/go-ethereum/trie"
  30. )
  31. const (
  32. // softResponseLimit is the target maximum size of replies to data retrievals.
  33. softResponseLimit = 2 * 1024 * 1024
  34. // estHeaderSize is the approximate size of an RLP encoded block header.
  35. estHeaderSize = 500
  36. // maxHeadersServe is the maximum number of block headers to serve. This number
  37. // is there to limit the number of disk lookups.
  38. maxHeadersServe = 1024
  39. // maxBodiesServe is the maximum number of block bodies to serve. This number
  40. // is mostly there to limit the number of disk lookups. With 24KB block sizes
  41. // nowadays, the practical limit will always be softResponseLimit.
  42. maxBodiesServe = 1024
  43. // maxNodeDataServe is the maximum number of state trie nodes to serve. This
  44. // number is there to limit the number of disk lookups.
  45. maxNodeDataServe = 1024
  46. // maxReceiptsServe is the maximum number of block receipts to serve. This
  47. // number is mostly there to limit the number of disk lookups. With block
  48. // containing 200+ transactions nowadays, the practical limit will always
  49. // be softResponseLimit.
  50. maxReceiptsServe = 1024
  51. )
  52. // Handler is a callback to invoke from an outside runner after the boilerplate
  53. // exchanges have passed.
  54. type Handler func(peer *Peer) error
  55. // Backend defines the data retrieval methods to serve remote requests and the
  56. // callback methods to invoke on remote deliveries.
  57. type Backend interface {
  58. // Chain retrieves the blockchain object to serve data.
  59. Chain() *core.BlockChain
  60. // StateBloom retrieves the bloom filter - if any - for state trie nodes.
  61. StateBloom() *trie.SyncBloom
  62. // TxPool retrieves the transaction pool object to serve data.
  63. TxPool() TxPool
  64. // AcceptTxs retrieves whether transaction processing is enabled on the node
  65. // or if inbound transactions should simply be dropped.
  66. AcceptTxs() bool
  67. // RunPeer is invoked when a peer joins on the `eth` protocol. The handler
  68. // should do any peer maintenance work, handshakes and validations. If all
  69. // is passed, control should be given back to the `handler` to process the
  70. // inbound messages going forward.
  71. RunPeer(peer *Peer, handler Handler) error
  72. // PeerInfo retrieves all known `eth` information about a peer.
  73. PeerInfo(id enode.ID) interface{}
  74. // Handle is a callback to be invoked when a data packet is received from
  75. // the remote peer. Only packets not consumed by the protocol handler will
  76. // be forwarded to the backend.
  77. Handle(peer *Peer, packet Packet) error
  78. }
  79. // TxPool defines the methods needed by the protocol handler to serve transactions.
  80. type TxPool interface {
  81. // Get retrieves the the transaction from the local txpool with the given hash.
  82. Get(hash common.Hash) *types.Transaction
  83. }
  84. // MakeProtocols constructs the P2P protocol definitions for `eth`.
  85. func MakeProtocols(backend Backend, network uint64, dnsdisc enode.Iterator) []p2p.Protocol {
  86. protocols := make([]p2p.Protocol, len(ProtocolVersions))
  87. for i, version := range ProtocolVersions {
  88. version := version // Closure
  89. protocols[i] = p2p.Protocol{
  90. Name: ProtocolName,
  91. Version: version,
  92. Length: protocolLengths[version],
  93. Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error {
  94. peer := NewPeer(version, p, rw, backend.TxPool())
  95. defer peer.Close()
  96. return backend.RunPeer(peer, func(peer *Peer) error {
  97. return Handle(backend, peer)
  98. })
  99. },
  100. NodeInfo: func() interface{} {
  101. return nodeInfo(backend.Chain(), network)
  102. },
  103. PeerInfo: func(id enode.ID) interface{} {
  104. return backend.PeerInfo(id)
  105. },
  106. Attributes: []enr.Entry{currentENREntry(backend.Chain())},
  107. DialCandidates: dnsdisc,
  108. }
  109. }
  110. return protocols
  111. }
  112. // NodeInfo represents a short summary of the `eth` sub-protocol metadata
  113. // known about the host peer.
  114. type NodeInfo struct {
  115. Network uint64 `json:"network"` // Ethereum network ID (1=Frontier, 2=Morden, Ropsten=3, Rinkeby=4)
  116. Difficulty *big.Int `json:"difficulty"` // Total difficulty of the host's blockchain
  117. Genesis common.Hash `json:"genesis"` // SHA3 hash of the host's genesis block
  118. Config *params.ChainConfig `json:"config"` // Chain configuration for the fork rules
  119. Head common.Hash `json:"head"` // Hex hash of the host's best owned block
  120. }
  121. // nodeInfo retrieves some `eth` protocol metadata about the running host node.
  122. func nodeInfo(chain *core.BlockChain, network uint64) *NodeInfo {
  123. head := chain.CurrentBlock()
  124. return &NodeInfo{
  125. Network: network,
  126. Difficulty: chain.GetTd(head.Hash(), head.NumberU64()),
  127. Genesis: chain.Genesis().Hash(),
  128. Config: chain.Config(),
  129. Head: head.Hash(),
  130. }
  131. }
  132. // Handle is invoked whenever an `eth` connection is made that successfully passes
  133. // the protocol handshake. This method will keep processing messages until the
  134. // connection is torn down.
  135. func Handle(backend Backend, peer *Peer) error {
  136. for {
  137. if err := handleMessage(backend, peer); err != nil {
  138. peer.Log().Debug("Message handling failed in `eth`", "err", err)
  139. return err
  140. }
  141. }
  142. }
  143. type msgHandler func(backend Backend, msg Decoder, peer *Peer) error
  144. type Decoder interface {
  145. Decode(val interface{}) error
  146. Time() time.Time
  147. }
  148. var eth65 = map[uint64]msgHandler{
  149. GetBlockHeadersMsg: handleGetBlockHeaders,
  150. BlockHeadersMsg: handleBlockHeaders,
  151. GetBlockBodiesMsg: handleGetBlockBodies,
  152. BlockBodiesMsg: handleBlockBodies,
  153. GetNodeDataMsg: handleGetNodeData,
  154. NodeDataMsg: handleNodeData,
  155. GetReceiptsMsg: handleGetReceipts,
  156. ReceiptsMsg: handleReceipts,
  157. NewBlockHashesMsg: handleNewBlockhashes,
  158. NewBlockMsg: handleNewBlock,
  159. TransactionsMsg: handleTransactions,
  160. NewPooledTransactionHashesMsg: handleNewPooledTransactionHashes,
  161. GetPooledTransactionsMsg: handleGetPooledTransactions,
  162. PooledTransactionsMsg: handlePooledTransactions,
  163. }
  164. var eth66 = map[uint64]msgHandler{
  165. NewBlockHashesMsg: handleNewBlockhashes,
  166. NewBlockMsg: handleNewBlock,
  167. TransactionsMsg: handleTransactions,
  168. NewPooledTransactionHashesMsg: handleNewPooledTransactionHashes,
  169. // eth66 messages with request-id
  170. GetBlockHeadersMsg: handleGetBlockHeaders66,
  171. BlockHeadersMsg: handleBlockHeaders66,
  172. GetBlockBodiesMsg: handleGetBlockBodies66,
  173. BlockBodiesMsg: handleBlockBodies66,
  174. GetNodeDataMsg: handleGetNodeData66,
  175. NodeDataMsg: handleNodeData66,
  176. GetReceiptsMsg: handleGetReceipts66,
  177. ReceiptsMsg: handleReceipts66,
  178. GetPooledTransactionsMsg: handleGetPooledTransactions66,
  179. PooledTransactionsMsg: handlePooledTransactions66,
  180. }
  181. // handleMessage is invoked whenever an inbound message is received from a remote
  182. // peer. The remote connection is torn down upon returning any error.
  183. func handleMessage(backend Backend, peer *Peer) error {
  184. // Read the next message from the remote peer, and ensure it's fully consumed
  185. msg, err := peer.rw.ReadMsg()
  186. if err != nil {
  187. return err
  188. }
  189. if msg.Size > maxMessageSize {
  190. return fmt.Errorf("%w: %v > %v", errMsgTooLarge, msg.Size, maxMessageSize)
  191. }
  192. defer msg.Discard()
  193. var handlers = eth65
  194. if peer.Version() >= ETH66 {
  195. handlers = eth66
  196. }
  197. // Track the amount of time it takes to serve the request and run the handler
  198. if metrics.Enabled {
  199. h := fmt.Sprintf("%s/%s/%d/%#02x", p2p.HandleHistName, ProtocolName, peer.Version(), msg.Code)
  200. defer func(start time.Time) {
  201. sampler := func() metrics.Sample {
  202. return metrics.ResettingSample(
  203. metrics.NewExpDecaySample(1028, 0.015),
  204. )
  205. }
  206. metrics.GetOrRegisterHistogramLazy(h, nil, sampler).Update(time.Since(start).Microseconds())
  207. }(time.Now())
  208. }
  209. if handler := handlers[msg.Code]; handler != nil {
  210. return handler(backend, msg, peer)
  211. }
  212. return fmt.Errorf("%w: %v", errInvalidMsgCode, msg.Code)
  213. }