| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107 |
- package diff
- import (
- "math/rand"
- "github.com/ethereum/go-ethereum/common"
- "github.com/ethereum/go-ethereum/log"
- "github.com/ethereum/go-ethereum/p2p"
- "github.com/ethereum/go-ethereum/rlp"
- )
- const maxQueuedDiffLayers = 12
- // Peer is a collection of relevant information we have about a `diff` peer.
- type Peer struct {
- id string // Unique ID for the peer, cached
- diffSync bool // whether the peer can diff sync
- queuedDiffLayers chan []rlp.RawValue // Queue of diff layers to broadcast to the peer
- *p2p.Peer // The embedded P2P package peer
- rw p2p.MsgReadWriter // Input/output streams for diff
- version uint // Protocol version negotiated
- logger log.Logger // Contextual logger with the peer id injected
- term chan struct{} // Termination channel to stop the broadcasters
- }
- // NewPeer create a wrapper for a network connection and negotiated protocol
- // version.
- func NewPeer(version uint, p *p2p.Peer, rw p2p.MsgReadWriter) *Peer {
- id := p.ID().String()
- peer := &Peer{
- id: id,
- Peer: p,
- rw: rw,
- diffSync: false,
- version: version,
- logger: log.New("peer", id[:8]),
- queuedDiffLayers: make(chan []rlp.RawValue, maxQueuedDiffLayers),
- term: make(chan struct{}),
- }
- go peer.broadcastDiffLayers()
- return peer
- }
- func (p *Peer) broadcastDiffLayers() {
- for {
- select {
- case prop := <-p.queuedDiffLayers:
- if err := p.SendDiffLayers(prop); err != nil {
- p.Log().Error("Failed to propagated diff layer", "err", err)
- return
- }
- case <-p.term:
- return
- }
- }
- }
- // ID retrieves the peer's unique identifier.
- func (p *Peer) ID() string {
- return p.id
- }
- // Version retrieves the peer's negoatiated `diff` protocol version.
- func (p *Peer) Version() uint {
- return p.version
- }
- func (p *Peer) DiffSync() bool {
- return p.diffSync
- }
- // Log overrides the P2P logget with the higher level one containing only the id.
- func (p *Peer) Log() log.Logger {
- return p.logger
- }
- // Close signals the broadcast goroutine to terminate. Only ever call this if
- // you created the peer yourself via NewPeer. Otherwise let whoever created it
- // clean it up!
- func (p *Peer) Close() {
- close(p.term)
- }
- // RequestDiffLayers fetches a batch of diff layers corresponding to the hashes
- // specified.
- func (p *Peer) RequestDiffLayers(hashes []common.Hash) error {
- id := rand.Uint64()
- requestTracker.Track(p.id, p.version, GetDiffLayerMsg, FullDiffLayerMsg, id)
- return p2p.Send(p.rw, GetDiffLayerMsg, GetDiffLayersPacket{
- RequestId: id,
- BlockHashes: hashes,
- })
- }
- func (p *Peer) SendDiffLayers(diffs []rlp.RawValue) error {
- return p2p.Send(p.rw, DiffLayerMsg, diffs)
- }
- func (p *Peer) AsyncSendDiffLayer(diffLayers []rlp.RawValue) {
- select {
- case p.queuedDiffLayers <- diffLayers:
- default:
- p.Log().Debug("Dropping diff layers propagation")
- }
- }
|