peer.go 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182
  1. // Copyright 2016 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package whisperv5
  17. import (
  18. "fmt"
  19. "time"
  20. "github.com/ethereum/go-ethereum/common"
  21. "github.com/ethereum/go-ethereum/logger"
  22. "github.com/ethereum/go-ethereum/logger/glog"
  23. "github.com/ethereum/go-ethereum/p2p"
  24. "github.com/ethereum/go-ethereum/rlp"
  25. set "gopkg.in/fatih/set.v0"
  26. )
  27. // peer represents a whisper protocol peer connection.
  28. type Peer struct {
  29. host *Whisper
  30. peer *p2p.Peer
  31. ws p2p.MsgReadWriter
  32. trusted bool
  33. known *set.Set // Messages already known by the peer to avoid wasting bandwidth
  34. quit chan struct{}
  35. }
  36. // newPeer creates a new whisper peer object, but does not run the handshake itself.
  37. func newPeer(host *Whisper, remote *p2p.Peer, rw p2p.MsgReadWriter) *Peer {
  38. return &Peer{
  39. host: host,
  40. peer: remote,
  41. ws: rw,
  42. trusted: false,
  43. known: set.New(),
  44. quit: make(chan struct{}),
  45. }
  46. }
  47. // start initiates the peer updater, periodically broadcasting the whisper packets
  48. // into the network.
  49. func (p *Peer) start() {
  50. go p.update()
  51. glog.V(logger.Debug).Infof("%v: whisper started", p.peer)
  52. }
  53. // stop terminates the peer updater, stopping message forwarding to it.
  54. func (p *Peer) stop() {
  55. close(p.quit)
  56. glog.V(logger.Debug).Infof("%v: whisper stopped", p.peer)
  57. }
  58. // handshake sends the protocol initiation status message to the remote peer and
  59. // verifies the remote status too.
  60. func (p *Peer) handshake() error {
  61. // Send the handshake status message asynchronously
  62. errc := make(chan error, 1)
  63. go func() {
  64. errc <- p2p.Send(p.ws, statusCode, ProtocolVersion)
  65. }()
  66. // Fetch the remote status packet and verify protocol match
  67. packet, err := p.ws.ReadMsg()
  68. if err != nil {
  69. return err
  70. }
  71. if packet.Code != statusCode {
  72. return fmt.Errorf("peer sent %x before status packet", packet.Code)
  73. }
  74. s := rlp.NewStream(packet.Payload, uint64(packet.Size))
  75. peerVersion, err := s.Uint()
  76. if err != nil {
  77. return fmt.Errorf("bad status message: %v", err)
  78. }
  79. if peerVersion != ProtocolVersion {
  80. return fmt.Errorf("protocol version mismatch %d != %d", peerVersion, ProtocolVersion)
  81. }
  82. // Wait until out own status is consumed too
  83. if err := <-errc; err != nil {
  84. return fmt.Errorf("failed to send status packet: %v", err)
  85. }
  86. return nil
  87. }
  88. // update executes periodic operations on the peer, including message transmission
  89. // and expiration.
  90. func (p *Peer) update() {
  91. // Start the tickers for the updates
  92. expire := time.NewTicker(expirationCycle)
  93. transmit := time.NewTicker(transmissionCycle)
  94. // Loop and transmit until termination is requested
  95. for {
  96. select {
  97. case <-expire.C:
  98. p.expire()
  99. case <-transmit.C:
  100. if err := p.broadcast(); err != nil {
  101. glog.V(logger.Info).Infof("%v: broadcast failed: %v", p.peer, err)
  102. return
  103. }
  104. case <-p.quit:
  105. return
  106. }
  107. }
  108. }
  109. // mark marks an envelope known to the peer so that it won't be sent back.
  110. func (peer *Peer) mark(envelope *Envelope) {
  111. peer.known.Add(envelope.Hash())
  112. }
  113. // marked checks if an envelope is already known to the remote peer.
  114. func (peer *Peer) marked(envelope *Envelope) bool {
  115. return peer.known.Has(envelope.Hash())
  116. }
  117. // expire iterates over all the known envelopes in the host and removes all
  118. // expired (unknown) ones from the known list.
  119. func (peer *Peer) expire() {
  120. // Assemble the list of available envelopes
  121. available := set.NewNonTS()
  122. for _, envelope := range peer.host.Envelopes() {
  123. available.Add(envelope.Hash())
  124. }
  125. // Cross reference availability with known status
  126. unmark := make(map[common.Hash]struct{})
  127. peer.known.Each(func(v interface{}) bool {
  128. if !available.Has(v.(common.Hash)) {
  129. unmark[v.(common.Hash)] = struct{}{}
  130. }
  131. return true
  132. })
  133. // Dump all known but unavailable
  134. for hash := range unmark {
  135. peer.known.Remove(hash)
  136. }
  137. }
  138. // broadcast iterates over the collection of envelopes and transmits yet unknown
  139. // ones over the network.
  140. func (p *Peer) broadcast() error {
  141. // Fetch the envelopes and collect the unknown ones
  142. envelopes := p.host.Envelopes()
  143. transmit := make([]*Envelope, 0, len(envelopes))
  144. for _, envelope := range envelopes {
  145. if !p.marked(envelope) {
  146. transmit = append(transmit, envelope)
  147. p.mark(envelope)
  148. }
  149. }
  150. if len(transmit) == 0 {
  151. return nil
  152. }
  153. // Transmit the unknown batch (potentially empty)
  154. if err := p2p.Send(p.ws, messagesCode, transmit); err != nil {
  155. return err
  156. }
  157. glog.V(logger.Detail).Infoln(p.peer, "broadcasted", len(transmit), "message(s)")
  158. return nil
  159. }
  160. func (p *Peer) ID() []byte {
  161. id := p.peer.ID()
  162. return id[:]
  163. }