peer.go 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175
  1. // Copyright 2014 The go-ethereum Authors
  2. // This file is part of go-ethereum.
  3. //
  4. // go-ethereum is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // go-ethereum is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with go-ethereum. If not, see <http://www.gnu.org/licenses/>.
  16. package whisper
  17. import (
  18. "fmt"
  19. "time"
  20. "github.com/ethereum/go-ethereum/common"
  21. "github.com/ethereum/go-ethereum/logger"
  22. "github.com/ethereum/go-ethereum/logger/glog"
  23. "github.com/ethereum/go-ethereum/p2p"
  24. "github.com/ethereum/go-ethereum/rlp"
  25. "gopkg.in/fatih/set.v0"
  26. )
  27. // peer represents a whisper protocol peer connection.
  28. type peer struct {
  29. host *Whisper
  30. peer *p2p.Peer
  31. ws p2p.MsgReadWriter
  32. known *set.Set // Messages already known by the peer to avoid wasting bandwidth
  33. quit chan struct{}
  34. }
  35. // newPeer creates a new whisper peer object, but does not run the handshake itself.
  36. func newPeer(host *Whisper, remote *p2p.Peer, rw p2p.MsgReadWriter) *peer {
  37. return &peer{
  38. host: host,
  39. peer: remote,
  40. ws: rw,
  41. known: set.New(),
  42. quit: make(chan struct{}),
  43. }
  44. }
  45. // start initiates the peer updater, periodically broadcasting the whisper packets
  46. // into the network.
  47. func (self *peer) start() {
  48. go self.update()
  49. glog.V(logger.Debug).Infof("%v: whisper started", self.peer)
  50. }
  51. // stop terminates the peer updater, stopping message forwarding to it.
  52. func (self *peer) stop() {
  53. close(self.quit)
  54. glog.V(logger.Debug).Infof("%v: whisper stopped", self.peer)
  55. }
  56. // handshake sends the protocol initiation status message to the remote peer and
  57. // verifies the remote status too.
  58. func (self *peer) handshake() error {
  59. // Send the handshake status message asynchronously
  60. errc := make(chan error, 1)
  61. go func() {
  62. errc <- p2p.SendItems(self.ws, statusCode, protocolVersion)
  63. }()
  64. // Fetch the remote status packet and verify protocol match
  65. packet, err := self.ws.ReadMsg()
  66. if err != nil {
  67. return err
  68. }
  69. if packet.Code != statusCode {
  70. return fmt.Errorf("peer sent %x before status packet", packet.Code)
  71. }
  72. s := rlp.NewStream(packet.Payload, uint64(packet.Size))
  73. if _, err := s.List(); err != nil {
  74. return fmt.Errorf("bad status message: %v", err)
  75. }
  76. peerVersion, err := s.Uint()
  77. if err != nil {
  78. return fmt.Errorf("bad status message: %v", err)
  79. }
  80. if peerVersion != protocolVersion {
  81. return fmt.Errorf("protocol version mismatch %d != %d", peerVersion, protocolVersion)
  82. }
  83. // Wait until out own status is consumed too
  84. if err := <-errc; err != nil {
  85. return fmt.Errorf("failed to send status packet: %v", err)
  86. }
  87. return nil
  88. }
  89. // update executes periodic operations on the peer, including message transmission
  90. // and expiration.
  91. func (self *peer) update() {
  92. // Start the tickers for the updates
  93. expire := time.NewTicker(expirationCycle)
  94. transmit := time.NewTicker(transmissionCycle)
  95. // Loop and transmit until termination is requested
  96. for {
  97. select {
  98. case <-expire.C:
  99. self.expire()
  100. case <-transmit.C:
  101. if err := self.broadcast(); err != nil {
  102. glog.V(logger.Info).Infof("%v: broadcast failed: %v", self.peer, err)
  103. return
  104. }
  105. case <-self.quit:
  106. return
  107. }
  108. }
  109. }
  110. // mark marks an envelope known to the peer so that it won't be sent back.
  111. func (self *peer) mark(envelope *Envelope) {
  112. self.known.Add(envelope.Hash())
  113. }
  114. // marked checks if an envelope is already known to the remote peer.
  115. func (self *peer) marked(envelope *Envelope) bool {
  116. return self.known.Has(envelope.Hash())
  117. }
  118. // expire iterates over all the known envelopes in the host and removes all
  119. // expired (unknown) ones from the known list.
  120. func (self *peer) expire() {
  121. // Assemble the list of available envelopes
  122. available := set.NewNonTS()
  123. for _, envelope := range self.host.envelopes() {
  124. available.Add(envelope.Hash())
  125. }
  126. // Cross reference availability with known status
  127. unmark := make(map[common.Hash]struct{})
  128. self.known.Each(func(v interface{}) bool {
  129. if !available.Has(v.(common.Hash)) {
  130. unmark[v.(common.Hash)] = struct{}{}
  131. }
  132. return true
  133. })
  134. // Dump all known but unavailable
  135. for hash, _ := range unmark {
  136. self.known.Remove(hash)
  137. }
  138. }
  139. // broadcast iterates over the collection of envelopes and transmits yet unknown
  140. // ones over the network.
  141. func (self *peer) broadcast() error {
  142. // Fetch the envelopes and collect the unknown ones
  143. envelopes := self.host.envelopes()
  144. transmit := make([]*Envelope, 0, len(envelopes))
  145. for _, envelope := range envelopes {
  146. if !self.marked(envelope) {
  147. transmit = append(transmit, envelope)
  148. self.mark(envelope)
  149. }
  150. }
  151. // Transmit the unknown batch (potentially empty)
  152. if err := p2p.Send(self.ws, messagesCode, transmit); err != nil {
  153. return err
  154. }
  155. glog.V(logger.Detail).Infoln(self.peer, "broadcasted", len(transmit), "message(s)")
  156. return nil
  157. }