peer.go 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202
  1. package p2p
  2. import (
  3. "blockchain-go/common/mclock"
  4. "blockchain-go/p2p/enode"
  5. "errors"
  6. "net"
  7. "sort"
  8. "sync"
  9. "time"
  10. )
  11. var (
  12. ErrShuttingDown = errors.New("shutting down")
  13. )
  14. const (
  15. baseProtocolVersion = 5
  16. baseProtocolLength = uint64(16)
  17. baseProtocolMaxMsgSize = 2 * 1024
  18. snappyProtocolVersion = 5
  19. pingInterval = 15 * time.Second
  20. )
  21. const (
  22. // devp2p message codes
  23. handshakeMsg = 0x00
  24. discMsg = 0x01
  25. pingMsg = 0x02
  26. pongMsg = 0x03
  27. )
  28. // protoHandshake is the RLP structure of the protocol handshake.
  29. type protoHandshake struct {
  30. Version uint64
  31. Name string
  32. Caps []Cap
  33. ListenPort uint64
  34. ID []byte // secp256k1 public key
  35. // Ignore additional fields (for forward compatibility).
  36. //Rest []rlp.RawValue `rlp:"tail"`
  37. }
  38. type PeerEventType string
  39. const (
  40. // PeerEventTypeAdd is the type of event emitted when a peer is added
  41. // to a p2p.Server
  42. PeerEventTypeAdd PeerEventType = "add"
  43. // PeerEventTypeDrop is the type of event emitted when a peer is
  44. // dropped from a p2p.Server
  45. PeerEventTypeDrop PeerEventType = "drop"
  46. // PeerEventTypeMsgSend is the type of event emitted when a
  47. // message is successfully sent to a peer
  48. PeerEventTypeMsgSend PeerEventType = "msgsend"
  49. // PeerEventTypeMsgRecv is the type of event emitted when a
  50. // message is received from a peer
  51. PeerEventTypeMsgRecv PeerEventType = "msgrecv"
  52. )
  53. // PeerEvent is an event emitted when peers are either added or dropped from
  54. // a p2p.Server or when a message is sent or received on a peer connection
  55. type PeerEvent struct {
  56. Type PeerEventType `json:"type"`
  57. Peer enode.ID `json:"peer"`
  58. Error string `json:"error,omitempty"`
  59. Protocol string `json:"protocol,omitempty"`
  60. MsgCode *uint64 `json:"msg_code,omitempty"`
  61. MsgSize *uint32 `json:"msg_size,omitempty"`
  62. LocalAddress string `json:"local,omitempty"`
  63. RemoteAddress string `json:"remote,omitempty"`
  64. }
  65. // Peer represents a connected remote node.
  66. type Peer struct {
  67. rw *conn
  68. running map[string]*protoRW
  69. created mclock.AbsTime
  70. protoErr chan error
  71. closed chan struct{}
  72. disc chan DiscReason
  73. wg sync.WaitGroup
  74. //log log.Logger
  75. // events receives message send / receive events if set
  76. //events *event.Feed
  77. }
  78. func newPeer(conn *conn, protocols []Protocol) *Peer {
  79. protoMap := matchProtocols(protocols, conn.caps, conn)
  80. p := &Peer{
  81. rw: conn,
  82. running: protoMap,
  83. created: mclock.Now(),
  84. protoErr: make(chan error, len(protoMap)+1),
  85. closed: make(chan struct{}),
  86. }
  87. return p
  88. }
  89. func (p *Peer) Inbound() bool {
  90. return p.rw.is(inboundConn)
  91. }
  92. func (p *Peer) ID() enode.ID {
  93. return p.rw.node.ID()
  94. }
  95. func (p *Peer) RemoteAddr() net.Addr {
  96. return p.rw.fd.RemoteAddr()
  97. }
  98. func (p *Peer) LocalAddr() net.Addr {
  99. return p.rw.fd.LocalAddr()
  100. }
  101. func (p *Peer) run() (remoteRequested bool, err error) {
  102. var (
  103. writeStart = make(chan struct{}, 1)
  104. writeErr = make(chan error, 1)
  105. readErr = make(chan error, 1)
  106. reason DiscReason
  107. )
  108. p.wg.Add(2)
  109. writeStart <- struct{}{}
  110. loop:
  111. for {
  112. select {
  113. case err = <-writeErr:
  114. if err != nil {
  115. reason = DiscNetworkError
  116. break loop
  117. }
  118. writeStart <- struct{}{}
  119. case err = <-readErr:
  120. if r, ok := err.(DiscReason); ok {
  121. remoteRequested = true
  122. reason = r
  123. } else {
  124. reason = DiscNetworkError
  125. }
  126. break loop
  127. case err = <-p.protoErr:
  128. reason = discReasonForError(err)
  129. break loop
  130. case err = <-p.disc:
  131. reason = discReasonForError(err)
  132. break loop
  133. }
  134. }
  135. close(p.closed)
  136. p.rw.close(reason)
  137. p.wg.Wait()
  138. return remoteRequested, err
  139. }
  140. type protoRW struct {
  141. Protocol
  142. in chan Msg // receives read messages
  143. closed <-chan struct{} // receives when peer is shutting down
  144. wstart <-chan struct{} // receives when write may start
  145. werr chan<- error // for write results
  146. offset uint64
  147. w MsgWriter
  148. }
  149. func matchProtocols(protocols []Protocol, caps []Cap, rw MsgReadWriter) map[string]*protoRW {
  150. sort.Sort(capsByNameAndVersion(caps))
  151. offset := baseProtocolLength
  152. result := make(map[string]*protoRW)
  153. outer:
  154. for _, capability := range caps {
  155. for _, proto := range protocols {
  156. if proto.Name == capability.Name && proto.Version == capability.Version {
  157. if old := result[capability.Name]; old != nil {
  158. offset -= old.Length
  159. }
  160. result[capability.Name] = &protoRW{Protocol: proto, offset: offset, in: make(chan Msg), w: rw}
  161. offset += proto.Length
  162. continue outer
  163. }
  164. }
  165. }
  166. return result
  167. }