handler.go 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180
  1. package diff
  2. import (
  3. "fmt"
  4. "time"
  5. "github.com/ethereum/go-ethereum/core"
  6. "github.com/ethereum/go-ethereum/metrics"
  7. "github.com/ethereum/go-ethereum/p2p"
  8. "github.com/ethereum/go-ethereum/p2p/enode"
  9. "github.com/ethereum/go-ethereum/p2p/enr"
  10. "github.com/ethereum/go-ethereum/rlp"
  11. )
  12. const (
  13. // softResponseLimit is the target maximum size of replies to data retrievals.
  14. softResponseLimit = 2 * 1024 * 1024
  15. // maxDiffLayerServe is the maximum number of diff layers to serve.
  16. maxDiffLayerServe = 1024
  17. )
  18. var requestTracker = NewTracker(time.Minute)
  19. // Handler is a callback to invoke from an outside runner after the boilerplate
  20. // exchanges have passed.
  21. type Handler func(peer *Peer) error
  22. type Backend interface {
  23. // Chain retrieves the blockchain object to serve data.
  24. Chain() *core.BlockChain
  25. // RunPeer is invoked when a peer joins on the `eth` protocol. The handler
  26. // should do any peer maintenance work, handshakes and validations. If all
  27. // is passed, control should be given back to the `handler` to process the
  28. // inbound messages going forward.
  29. RunPeer(peer *Peer, handler Handler) error
  30. PeerInfo(id enode.ID) interface{}
  31. Handle(peer *Peer, packet Packet) error
  32. }
  33. // MakeProtocols constructs the P2P protocol definitions for `diff`.
  34. func MakeProtocols(backend Backend, dnsdisc enode.Iterator) []p2p.Protocol {
  35. // Filter the discovery iterator for nodes advertising diff support.
  36. dnsdisc = enode.Filter(dnsdisc, func(n *enode.Node) bool {
  37. var diff enrEntry
  38. return n.Load(&diff) == nil
  39. })
  40. protocols := make([]p2p.Protocol, len(ProtocolVersions))
  41. for i, version := range ProtocolVersions {
  42. version := version // Closure
  43. protocols[i] = p2p.Protocol{
  44. Name: ProtocolName,
  45. Version: version,
  46. Length: protocolLengths[version],
  47. Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error {
  48. return backend.RunPeer(NewPeer(version, p, rw), func(peer *Peer) error {
  49. defer peer.Close()
  50. return Handle(backend, peer)
  51. })
  52. },
  53. NodeInfo: func() interface{} {
  54. return nodeInfo(backend.Chain())
  55. },
  56. PeerInfo: func(id enode.ID) interface{} {
  57. return backend.PeerInfo(id)
  58. },
  59. Attributes: []enr.Entry{&enrEntry{}},
  60. DialCandidates: dnsdisc,
  61. }
  62. }
  63. return protocols
  64. }
  65. // Handle is the callback invoked to manage the life cycle of a `diff` peer.
  66. // When this function terminates, the peer is disconnected.
  67. func Handle(backend Backend, peer *Peer) error {
  68. for {
  69. if err := handleMessage(backend, peer); err != nil {
  70. peer.Log().Debug("Message handling failed in `diff`", "err", err)
  71. return err
  72. }
  73. }
  74. }
  75. // handleMessage is invoked whenever an inbound message is received from a
  76. // remote peer on the `diff` protocol. The remote connection is torn down upon
  77. // returning any error.
  78. func handleMessage(backend Backend, peer *Peer) error {
  79. // Read the next message from the remote peer, and ensure it's fully consumed
  80. msg, err := peer.rw.ReadMsg()
  81. if err != nil {
  82. return err
  83. }
  84. if msg.Size > maxMessageSize {
  85. return fmt.Errorf("%w: %v > %v", errMsgTooLarge, msg.Size, maxMessageSize)
  86. }
  87. defer msg.Discard()
  88. start := time.Now()
  89. // Track the emount of time it takes to serve the request and run the handler
  90. if metrics.Enabled {
  91. h := fmt.Sprintf("%s/%s/%d/%#02x", p2p.HandleHistName, ProtocolName, peer.Version(), msg.Code)
  92. defer func(start time.Time) {
  93. sampler := func() metrics.Sample {
  94. return metrics.ResettingSample(
  95. metrics.NewExpDecaySample(1028, 0.015),
  96. )
  97. }
  98. metrics.GetOrRegisterHistogramLazy(h, nil, sampler).Update(time.Since(start).Microseconds())
  99. }(start)
  100. }
  101. // Handle the message depending on its contents
  102. switch {
  103. case msg.Code == GetDiffLayerMsg:
  104. res := new(GetDiffLayersPacket)
  105. if err := msg.Decode(res); err != nil {
  106. return fmt.Errorf("%w: message %v: %v", errDecode, msg, err)
  107. }
  108. diffs := answerDiffLayersQuery(backend, res)
  109. p2p.Send(peer.rw, FullDiffLayerMsg, &FullDiffLayersPacket{
  110. RequestId: res.RequestId,
  111. DiffLayersPacket: diffs,
  112. })
  113. return nil
  114. case msg.Code == DiffLayerMsg:
  115. // A batch of trie nodes arrived to one of our previous requests
  116. res := new(DiffLayersPacket)
  117. if err := msg.Decode(res); err != nil {
  118. return fmt.Errorf("%w: message %v: %v", errDecode, msg, err)
  119. }
  120. return backend.Handle(peer, res)
  121. case msg.Code == FullDiffLayerMsg:
  122. // A batch of trie nodes arrived to one of our previous requests
  123. res := new(FullDiffLayersPacket)
  124. if err := msg.Decode(res); err != nil {
  125. return fmt.Errorf("%w: message %v: %v", errDecode, msg, err)
  126. }
  127. if fulfilled := requestTracker.Fulfil(peer.id, peer.version, FullDiffLayerMsg, res.RequestId); fulfilled {
  128. return backend.Handle(peer, res)
  129. }
  130. return fmt.Errorf("%w: %v", errUnexpectedMsg, msg.Code)
  131. default:
  132. return fmt.Errorf("%w: %v", errInvalidMsgCode, msg.Code)
  133. }
  134. }
  135. func answerDiffLayersQuery(backend Backend, query *GetDiffLayersPacket) []rlp.RawValue {
  136. // Gather blocks until the fetch or network limits is reached
  137. var (
  138. bytes int
  139. diffLayers []rlp.RawValue
  140. )
  141. // Need avoid transfer huge package
  142. for lookups, hash := range query.BlockHashes {
  143. if bytes >= softResponseLimit || len(diffLayers) >= maxDiffLayerServe ||
  144. lookups >= 2*maxDiffLayerServe {
  145. break
  146. }
  147. if data := backend.Chain().GetDiffLayerRLP(hash); len(data) != 0 {
  148. diffLayers = append(diffLayers, data)
  149. bytes += len(data)
  150. }
  151. }
  152. return diffLayers
  153. }
  154. // NodeInfo represents a short summary of the `diff` sub-protocol metadata
  155. // known about the host peer.
  156. type NodeInfo struct{}
  157. // nodeInfo retrieves some `diff` protocol metadata about the running host node.
  158. func nodeInfo(_ *core.BlockChain) *NodeInfo {
  159. return &NodeInfo{}
  160. }