peer.go 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107
  1. package diff
  2. import (
  3. "math/rand"
  4. "github.com/ethereum/go-ethereum/common"
  5. "github.com/ethereum/go-ethereum/log"
  6. "github.com/ethereum/go-ethereum/p2p"
  7. "github.com/ethereum/go-ethereum/rlp"
  8. )
  9. const maxQueuedDiffLayers = 12
  10. // Peer is a collection of relevant information we have about a `diff` peer.
  11. type Peer struct {
  12. id string // Unique ID for the peer, cached
  13. diffSync bool // whether the peer can diff sync
  14. queuedDiffLayers chan []rlp.RawValue // Queue of diff layers to broadcast to the peer
  15. *p2p.Peer // The embedded P2P package peer
  16. rw p2p.MsgReadWriter // Input/output streams for diff
  17. version uint // Protocol version negotiated
  18. logger log.Logger // Contextual logger with the peer id injected
  19. term chan struct{} // Termination channel to stop the broadcasters
  20. }
  21. // NewPeer create a wrapper for a network connection and negotiated protocol
  22. // version.
  23. func NewPeer(version uint, p *p2p.Peer, rw p2p.MsgReadWriter) *Peer {
  24. id := p.ID().String()
  25. peer := &Peer{
  26. id: id,
  27. Peer: p,
  28. rw: rw,
  29. diffSync: false,
  30. version: version,
  31. logger: log.New("peer", id[:8]),
  32. queuedDiffLayers: make(chan []rlp.RawValue, maxQueuedDiffLayers),
  33. term: make(chan struct{}),
  34. }
  35. go peer.broadcastDiffLayers()
  36. return peer
  37. }
  38. func (p *Peer) broadcastDiffLayers() {
  39. for {
  40. select {
  41. case prop := <-p.queuedDiffLayers:
  42. if err := p.SendDiffLayers(prop); err != nil {
  43. p.Log().Error("Failed to propagated diff layer", "err", err)
  44. return
  45. }
  46. case <-p.term:
  47. return
  48. }
  49. }
  50. }
  51. // ID retrieves the peer's unique identifier.
  52. func (p *Peer) ID() string {
  53. return p.id
  54. }
  55. // Version retrieves the peer's negoatiated `diff` protocol version.
  56. func (p *Peer) Version() uint {
  57. return p.version
  58. }
  59. func (p *Peer) DiffSync() bool {
  60. return p.diffSync
  61. }
  62. // Log overrides the P2P logget with the higher level one containing only the id.
  63. func (p *Peer) Log() log.Logger {
  64. return p.logger
  65. }
  66. // Close signals the broadcast goroutine to terminate. Only ever call this if
  67. // you created the peer yourself via NewPeer. Otherwise let whoever created it
  68. // clean it up!
  69. func (p *Peer) Close() {
  70. close(p.term)
  71. }
  72. // RequestDiffLayers fetches a batch of diff layers corresponding to the hashes
  73. // specified.
  74. func (p *Peer) RequestDiffLayers(hashes []common.Hash) error {
  75. id := rand.Uint64()
  76. requestTracker.Track(p.id, p.version, GetDiffLayerMsg, FullDiffLayerMsg, id)
  77. return p2p.Send(p.rw, GetDiffLayerMsg, GetDiffLayersPacket{
  78. RequestId: id,
  79. BlockHashes: hashes,
  80. })
  81. }
  82. func (p *Peer) SendDiffLayers(diffs []rlp.RawValue) error {
  83. return p2p.Send(p.rw, DiffLayerMsg, diffs)
  84. }
  85. func (p *Peer) AsyncSendDiffLayer(diffLayers []rlp.RawValue) {
  86. select {
  87. case p.queuedDiffLayers <- diffLayers:
  88. default:
  89. p.Log().Debug("Dropping diff layers propagation")
  90. }
  91. }