peer.go 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306
  1. package p2p
  2. import (
  3. "errors"
  4. "fmt"
  5. "io"
  6. "net"
  7. "sort"
  8. "sync"
  9. "time"
  10. "github.com/ethereum/go-ethereum/logger"
  11. "github.com/ethereum/go-ethereum/p2p/discover"
  12. "github.com/ethereum/go-ethereum/rlp"
  13. )
  14. const (
  15. baseProtocolVersion = 3
  16. baseProtocolLength = uint64(16)
  17. baseProtocolMaxMsgSize = 10 * 1024 * 1024
  18. pingInterval = 15 * time.Second
  19. )
  20. const (
  21. // devp2p message codes
  22. handshakeMsg = 0x00
  23. discMsg = 0x01
  24. pingMsg = 0x02
  25. pongMsg = 0x03
  26. getPeersMsg = 0x04
  27. peersMsg = 0x05
  28. )
  29. // Peer represents a connected remote node.
  30. type Peer struct {
  31. // Peers have all the log methods.
  32. // Use them to display messages related to the peer.
  33. *logger.Logger
  34. conn net.Conn
  35. rw *conn
  36. running map[string]*protoRW
  37. wg sync.WaitGroup
  38. protoErr chan error
  39. closed chan struct{}
  40. disc chan DiscReason
  41. }
  42. // NewPeer returns a peer for testing purposes.
  43. func NewPeer(id discover.NodeID, name string, caps []Cap) *Peer {
  44. pipe, _ := net.Pipe()
  45. msgpipe, _ := MsgPipe()
  46. conn := &conn{msgpipe, &protoHandshake{ID: id, Name: name, Caps: caps}}
  47. peer := newPeer(pipe, conn, nil)
  48. close(peer.closed) // ensures Disconnect doesn't block
  49. return peer
  50. }
  51. // ID returns the node's public key.
  52. func (p *Peer) ID() discover.NodeID {
  53. return p.rw.ID
  54. }
  55. // Name returns the node name that the remote node advertised.
  56. func (p *Peer) Name() string {
  57. return p.rw.Name
  58. }
  59. // Caps returns the capabilities (supported subprotocols) of the remote peer.
  60. func (p *Peer) Caps() []Cap {
  61. // TODO: maybe return copy
  62. return p.rw.Caps
  63. }
  64. // RemoteAddr returns the remote address of the network connection.
  65. func (p *Peer) RemoteAddr() net.Addr {
  66. return p.conn.RemoteAddr()
  67. }
  68. // LocalAddr returns the local address of the network connection.
  69. func (p *Peer) LocalAddr() net.Addr {
  70. return p.conn.LocalAddr()
  71. }
  72. // Disconnect terminates the peer connection with the given reason.
  73. // It returns immediately and does not wait until the connection is closed.
  74. func (p *Peer) Disconnect(reason DiscReason) {
  75. select {
  76. case p.disc <- reason:
  77. case <-p.closed:
  78. }
  79. }
  80. // String implements fmt.Stringer.
  81. func (p *Peer) String() string {
  82. return fmt.Sprintf("Peer %.8x %v", p.rw.ID[:], p.RemoteAddr())
  83. }
  84. func newPeer(fd net.Conn, conn *conn, protocols []Protocol) *Peer {
  85. logtag := fmt.Sprintf("Peer %.8x %v", conn.ID[:], fd.RemoteAddr())
  86. protomap := matchProtocols(protocols, conn.Caps, conn)
  87. p := &Peer{
  88. Logger: logger.NewLogger(logtag),
  89. conn: fd,
  90. rw: conn,
  91. running: protomap,
  92. disc: make(chan DiscReason),
  93. protoErr: make(chan error, len(protomap)+1), // protocols + pingLoop
  94. closed: make(chan struct{}),
  95. }
  96. return p
  97. }
  98. func (p *Peer) run() DiscReason {
  99. readErr := make(chan error, 1)
  100. p.wg.Add(2)
  101. go p.readLoop(readErr)
  102. go p.pingLoop()
  103. p.startProtocols()
  104. // Wait for an error or disconnect.
  105. var reason DiscReason
  106. select {
  107. case err := <-readErr:
  108. if r, ok := err.(DiscReason); ok {
  109. reason = r
  110. } else {
  111. // Note: We rely on protocols to abort if there is a write
  112. // error. It might be more robust to handle them here as well.
  113. p.DebugDetailf("Read error: %v\n", err)
  114. reason = DiscNetworkError
  115. }
  116. case err := <-p.protoErr:
  117. reason = discReasonForError(err)
  118. case reason = <-p.disc:
  119. }
  120. close(p.closed)
  121. p.politeDisconnect(reason)
  122. p.wg.Wait()
  123. p.Debugf("Disconnected: %v\n", reason)
  124. return reason
  125. }
  126. func (p *Peer) politeDisconnect(reason DiscReason) {
  127. if reason != DiscNetworkError {
  128. SendItems(p.rw, discMsg, uint(reason))
  129. }
  130. p.conn.Close()
  131. }
  132. func (p *Peer) pingLoop() {
  133. ping := time.NewTicker(pingInterval)
  134. defer p.wg.Done()
  135. defer ping.Stop()
  136. for {
  137. select {
  138. case <-ping.C:
  139. if err := SendItems(p.rw, pingMsg); err != nil {
  140. p.protoErr <- err
  141. return
  142. }
  143. case <-p.closed:
  144. return
  145. }
  146. }
  147. }
  148. func (p *Peer) readLoop(errc chan<- error) {
  149. defer p.wg.Done()
  150. for {
  151. msg, err := p.rw.ReadMsg()
  152. if err != nil {
  153. errc <- err
  154. return
  155. }
  156. msg.ReceivedAt = time.Now()
  157. if err = p.handle(msg); err != nil {
  158. errc <- err
  159. return
  160. }
  161. }
  162. }
  163. func (p *Peer) handle(msg Msg) error {
  164. switch {
  165. case msg.Code == pingMsg:
  166. msg.Discard()
  167. go SendItems(p.rw, pongMsg)
  168. case msg.Code == discMsg:
  169. var reason [1]DiscReason
  170. // This is the last message. We don't need to discard or
  171. // check errors because, the connection will be closed after it.
  172. rlp.Decode(msg.Payload, &reason)
  173. p.Debugf("Disconnect requested: %v\n", reason[0])
  174. return DiscRequested
  175. case msg.Code < baseProtocolLength:
  176. // ignore other base protocol messages
  177. return msg.Discard()
  178. default:
  179. // it's a subprotocol message
  180. proto, err := p.getProto(msg.Code)
  181. if err != nil {
  182. return fmt.Errorf("msg code out of range: %v", msg.Code)
  183. }
  184. select {
  185. case proto.in <- msg:
  186. return nil
  187. case <-p.closed:
  188. return io.EOF
  189. }
  190. }
  191. return nil
  192. }
  193. // matchProtocols creates structures for matching named subprotocols.
  194. func matchProtocols(protocols []Protocol, caps []Cap, rw MsgReadWriter) map[string]*protoRW {
  195. sort.Sort(capsByName(caps))
  196. offset := baseProtocolLength
  197. result := make(map[string]*protoRW)
  198. outer:
  199. for _, cap := range caps {
  200. for _, proto := range protocols {
  201. if proto.Name == cap.Name && proto.Version == cap.Version && result[cap.Name] == nil {
  202. result[cap.Name] = &protoRW{Protocol: proto, offset: offset, in: make(chan Msg), w: rw}
  203. offset += proto.Length
  204. continue outer
  205. }
  206. }
  207. }
  208. return result
  209. }
  210. func (p *Peer) startProtocols() {
  211. p.wg.Add(len(p.running))
  212. for _, proto := range p.running {
  213. proto := proto
  214. proto.closed = p.closed
  215. p.DebugDetailf("Starting protocol %s/%d\n", proto.Name, proto.Version)
  216. go func() {
  217. err := proto.Run(p, proto)
  218. if err == nil {
  219. p.DebugDetailf("Protocol %s/%d returned\n", proto.Name, proto.Version)
  220. err = errors.New("protocol returned")
  221. } else {
  222. p.DebugDetailf("Protocol %s/%d error: %v\n", proto.Name, proto.Version, err)
  223. }
  224. p.protoErr <- err
  225. p.wg.Done()
  226. }()
  227. }
  228. }
  229. // getProto finds the protocol responsible for handling
  230. // the given message code.
  231. func (p *Peer) getProto(code uint64) (*protoRW, error) {
  232. for _, proto := range p.running {
  233. if code >= proto.offset && code < proto.offset+proto.Length {
  234. return proto, nil
  235. }
  236. }
  237. return nil, newPeerError(errInvalidMsgCode, "%d", code)
  238. }
  239. // writeProtoMsg sends the given message on behalf of the given named protocol.
  240. // this exists because of Server.Broadcast.
  241. func (p *Peer) writeProtoMsg(protoName string, msg Msg) error {
  242. proto, ok := p.running[protoName]
  243. if !ok {
  244. return fmt.Errorf("protocol %s not handled by peer", protoName)
  245. }
  246. if msg.Code >= proto.Length {
  247. return newPeerError(errInvalidMsgCode, "code %x is out of range for protocol %q", msg.Code, protoName)
  248. }
  249. msg.Code += proto.offset
  250. return p.rw.WriteMsg(msg)
  251. }
  252. type protoRW struct {
  253. Protocol
  254. in chan Msg
  255. closed <-chan struct{}
  256. offset uint64
  257. w MsgWriter
  258. }
  259. func (rw *protoRW) WriteMsg(msg Msg) error {
  260. if msg.Code >= rw.Length {
  261. return newPeerError(errInvalidMsgCode, "not handled")
  262. }
  263. msg.Code += rw.offset
  264. return rw.w.WriteMsg(msg)
  265. }
  266. func (rw *protoRW) ReadMsg() (Msg, error) {
  267. select {
  268. case msg := <-rw.in:
  269. msg.Code -= rw.offset
  270. return msg, nil
  271. case <-rw.closed:
  272. return Msg{}, io.EOF
  273. }
  274. }