peer.go 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214
  1. package p2p
  2. import (
  3. "blockchain-go/common/mclock"
  4. "blockchain-go/p2p/enode"
  5. "errors"
  6. "net"
  7. "sort"
  8. "sync"
  9. "time"
  10. )
  11. var (
  12. ErrShuttingDown = errors.New("shutting down")
  13. )
  14. const (
  15. baseProtocolVersion = 5
  16. baseProtocolLength = uint64(16)
  17. baseProtocolMaxMsgSize = 2 * 1024
  18. snappyProtocolVersion = 5
  19. pingInterval = 15 * time.Second
  20. )
  21. const (
  22. // devp2p message codes
  23. handshakeMsg = 0x00
  24. discMsg = 0x01
  25. pingMsg = 0x02
  26. pongMsg = 0x03
  27. )
  28. // protoHandshake is the RLP structure of the protocol handshake.
  29. type protoHandshake struct {
  30. Version uint64
  31. Name string
  32. Caps []Cap
  33. ListenPort uint64
  34. ID []byte // secp256k1 public key
  35. // Ignore additional fields (for forward compatibility).
  36. //Rest []rlp.RawValue `rlp:"tail"`
  37. }
  38. type PeerEventType string
  39. const (
  40. // PeerEventTypeAdd is the type of event emitted when a peer is added
  41. // to a p2p.Server
  42. PeerEventTypeAdd PeerEventType = "add"
  43. // PeerEventTypeDrop is the type of event emitted when a peer is
  44. // dropped from a p2p.Server
  45. PeerEventTypeDrop PeerEventType = "drop"
  46. // PeerEventTypeMsgSend is the type of event emitted when a
  47. // message is successfully sent to a peer
  48. PeerEventTypeMsgSend PeerEventType = "msgsend"
  49. // PeerEventTypeMsgRecv is the type of event emitted when a
  50. // message is received from a peer
  51. PeerEventTypeMsgRecv PeerEventType = "msgrecv"
  52. )
  53. // PeerEvent is an event emitted when peers are either added or dropped from
  54. // a p2p.Server or when a message is sent or received on a peer connection
  55. type PeerEvent struct {
  56. Type PeerEventType `json:"type"`
  57. Peer enode.ID `json:"peer"`
  58. Error string `json:"error,omitempty"`
  59. Protocol string `json:"protocol,omitempty"`
  60. MsgCode *uint64 `json:"msg_code,omitempty"`
  61. MsgSize *uint32 `json:"msg_size,omitempty"`
  62. LocalAddress string `json:"local,omitempty"`
  63. RemoteAddress string `json:"remote,omitempty"`
  64. }
  65. // Peer represents a connected remote node.
  66. type Peer struct {
  67. rw *conn
  68. running map[string]*protoRW
  69. created mclock.AbsTime
  70. protoErr chan error
  71. closed chan struct{}
  72. disc chan DiscReason
  73. wg sync.WaitGroup
  74. //log log.Logger
  75. // events receives message send / receive events if set
  76. //events *event.Feed
  77. }
  78. func newPeer(conn *conn, protocols []Protocol) *Peer {
  79. protoMap := matchProtocols(protocols, conn.caps, conn)
  80. p := &Peer{
  81. rw: conn,
  82. running: protoMap,
  83. created: mclock.Now(),
  84. protoErr: make(chan error, len(protoMap)+1),
  85. closed: make(chan struct{}),
  86. }
  87. return p
  88. }
  89. func (p *Peer) Inbound() bool {
  90. return p.rw.is(inboundConn)
  91. }
  92. func (p *Peer) ID() enode.ID {
  93. return p.rw.node.ID()
  94. }
  95. func (p *Peer) RemoteAddr() net.Addr {
  96. return p.rw.fd.RemoteAddr()
  97. }
  98. func (p *Peer) LocalAddr() net.Addr {
  99. return p.rw.fd.LocalAddr()
  100. }
  101. func (p *Peer) run() (remoteRequested bool, err error) {
  102. var (
  103. writeStart = make(chan struct{}, 1)
  104. writeErr = make(chan error, 1)
  105. readErr = make(chan error, 1)
  106. reason DiscReason
  107. )
  108. p.wg.Add(2)
  109. go p.readLoop(readErr)
  110. go p.pingLoop()
  111. writeStart <- struct{}{}
  112. loop:
  113. for {
  114. select {
  115. case err = <-writeErr:
  116. if err != nil {
  117. reason = DiscNetworkError
  118. break loop
  119. }
  120. writeStart <- struct{}{}
  121. case err = <-readErr:
  122. if r, ok := err.(DiscReason); ok {
  123. remoteRequested = true
  124. reason = r
  125. } else {
  126. reason = DiscNetworkError
  127. }
  128. break loop
  129. case err = <-p.protoErr:
  130. reason = discReasonForError(err)
  131. break loop
  132. case err = <-p.disc:
  133. reason = discReasonForError(err)
  134. break loop
  135. }
  136. }
  137. close(p.closed)
  138. p.rw.close(reason)
  139. p.wg.Wait()
  140. return remoteRequested, err
  141. }
  142. func (p *Peer) readLoop(errc chan<- error) {
  143. defer p.wg.Done()
  144. // TODO 实现readLoop
  145. }
  146. func (p *Peer) pingLoop() {
  147. // TODO 实现pingLoop
  148. }
  149. type protoRW struct {
  150. Protocol
  151. in chan Msg // receives read messages
  152. closed <-chan struct{} // receives when peer is shutting down
  153. wstart <-chan struct{} // receives when write may start
  154. werr chan<- error // for write results
  155. offset uint64
  156. w MsgWriter
  157. }
  158. func matchProtocols(protocols []Protocol, caps []Cap, rw MsgReadWriter) map[string]*protoRW {
  159. sort.Sort(capsByNameAndVersion(caps))
  160. offset := baseProtocolLength
  161. result := make(map[string]*protoRW)
  162. outer:
  163. for _, capability := range caps {
  164. for _, proto := range protocols {
  165. if proto.Name == capability.Name && proto.Version == capability.Version {
  166. if old := result[capability.Name]; old != nil {
  167. offset -= old.Length
  168. }
  169. result[capability.Name] = &protoRW{Protocol: proto, offset: offset, in: make(chan Msg), w: rw}
  170. offset += proto.Length
  171. continue outer
  172. }
  173. }
  174. }
  175. return result
  176. }