peer.go 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189
  1. package p2p
  2. import (
  3. "blockchain-go/common/mclock"
  4. "blockchain-go/p2p/enode"
  5. "errors"
  6. "sort"
  7. "sync"
  8. "time"
  9. )
  10. var (
  11. ErrShuttingDown = errors.New("shutting down")
  12. )
  13. const (
  14. baseProtocolVersion = 5
  15. baseProtocolLength = uint64(16)
  16. baseProtocolMaxMsgSize = 2 * 1024
  17. snappyProtocolVersion = 5
  18. pingInterval = 15 * time.Second
  19. )
  20. const (
  21. // devp2p message codes
  22. handshakeMsg = 0x00
  23. discMsg = 0x01
  24. pingMsg = 0x02
  25. pongMsg = 0x03
  26. )
  27. // protoHandshake is the RLP structure of the protocol handshake.
  28. type protoHandshake struct {
  29. Version uint64
  30. Name string
  31. Caps []Cap
  32. ListenPort uint64
  33. ID []byte // secp256k1 public key
  34. // Ignore additional fields (for forward compatibility).
  35. //Rest []rlp.RawValue `rlp:"tail"`
  36. }
  37. type PeerEventType string
  38. const (
  39. // PeerEventTypeAdd is the type of event emitted when a peer is added
  40. // to a p2p.Server
  41. PeerEventTypeAdd PeerEventType = "add"
  42. // PeerEventTypeDrop is the type of event emitted when a peer is
  43. // dropped from a p2p.Server
  44. PeerEventTypeDrop PeerEventType = "drop"
  45. // PeerEventTypeMsgSend is the type of event emitted when a
  46. // message is successfully sent to a peer
  47. PeerEventTypeMsgSend PeerEventType = "msgsend"
  48. // PeerEventTypeMsgRecv is the type of event emitted when a
  49. // message is received from a peer
  50. PeerEventTypeMsgRecv PeerEventType = "msgrecv"
  51. )
  52. // PeerEvent is an event emitted when peers are either added or dropped from
  53. // a p2p.Server or when a message is sent or received on a peer connection
  54. type PeerEvent struct {
  55. Type PeerEventType `json:"type"`
  56. Peer enode.ID `json:"peer"`
  57. Error string `json:"error,omitempty"`
  58. Protocol string `json:"protocol,omitempty"`
  59. MsgCode *uint64 `json:"msg_code,omitempty"`
  60. MsgSize *uint32 `json:"msg_size,omitempty"`
  61. LocalAddress string `json:"local,omitempty"`
  62. RemoteAddress string `json:"remote,omitempty"`
  63. }
  64. // Peer represents a connected remote node.
  65. type Peer struct {
  66. rw *conn
  67. running map[string]*protoRW
  68. created mclock.AbsTime
  69. protoErr chan error
  70. closed chan struct{}
  71. disc chan DiscReason
  72. wg sync.WaitGroup
  73. //log log.Logger
  74. // events receives message send / receive events if set
  75. //events *event.Feed
  76. }
  77. func newPeer(conn *conn, protocols []Protocol) *Peer {
  78. protoMap := matchProtocols(protocols, conn.caps, conn)
  79. p := &Peer{
  80. rw: conn,
  81. running: protoMap,
  82. created: mclock.Now(),
  83. protoErr: make(chan error, len(protoMap)+1),
  84. closed: make(chan struct{}),
  85. }
  86. return p
  87. }
  88. func (p *Peer) Inbound() bool {
  89. return p.rw.is(inboundConn)
  90. }
  91. func (p *Peer) run() (remoteRequested bool, err error) {
  92. var (
  93. writeStart = make(chan struct{}, 1)
  94. writeErr = make(chan error, 1)
  95. readErr = make(chan error, 1)
  96. reason DiscReason
  97. )
  98. p.wg.Add(2)
  99. writeStart <- struct{}{}
  100. loop:
  101. for {
  102. select {
  103. case err = <-writeErr:
  104. if err != nil {
  105. reason = DiscNetworkError
  106. break loop
  107. }
  108. writeStart <- struct{}{}
  109. case err = <-readErr:
  110. if r, ok := err.(DiscReason); ok {
  111. remoteRequested = true
  112. reason = r
  113. } else {
  114. reason = DiscNetworkError
  115. }
  116. break loop
  117. case err = <-p.protoErr:
  118. reason = discReasonForError(err)
  119. break loop
  120. case err = <-p.disc:
  121. reason = discReasonForError(err)
  122. break loop
  123. }
  124. }
  125. close(p.closed)
  126. p.rw.close(reason)
  127. p.wg.Wait()
  128. return remoteRequested, err
  129. }
  130. type protoRW struct {
  131. Protocol
  132. in chan Msg // receives read messages
  133. closed <-chan struct{} // receives when peer is shutting down
  134. wstart <-chan struct{} // receives when write may start
  135. werr chan<- error // for write results
  136. offset uint64
  137. w MsgWriter
  138. }
  139. func matchProtocols(protocols []Protocol, caps []Cap, rw MsgReadWriter) map[string]*protoRW {
  140. sort.Sort(capsByNameAndVersion(caps))
  141. offset := baseProtocolLength
  142. result := make(map[string]*protoRW)
  143. outer:
  144. for _, capability := range caps {
  145. for _, proto := range protocols {
  146. if proto.Name == capability.Name && proto.Version == capability.Version {
  147. if old := result[capability.Name]; old != nil {
  148. offset -= old.Length
  149. }
  150. result[capability.Name] = &protoRW{Protocol: proto, offset: offset, in: make(chan Msg), w: rw}
  151. offset += proto.Length
  152. continue outer
  153. }
  154. }
  155. }
  156. return result
  157. }