peer.go 5.5 KB


  1. package p2p
  2. import (
  3. "blockchain-go/common/gopool"
  4. "blockchain-go/common/mclock"
  5. "blockchain-go/p2p/enode"
  6. "blockchain-go/rlp"
  7. "errors"
  8. "fmt"
  9. "net"
  10. "sort"
  11. "sync"
  12. "time"
  13. )
  14. var (
  15. ErrShuttingDown = errors.New("shutting down")
  16. )
  17. const (
  18. baseProtocolVersion = 5
  19. baseProtocolLength = uint64(16)
  20. baseProtocolMaxMsgSize = 2 * 1024
  21. snappyProtocolVersion = 5
  22. pingInterval = 15 * time.Second
  23. )
  24. const (
  25. // devp2p message codes
  26. handshakeMsg = 0x00
  27. discMsg = 0x01
  28. pingMsg = 0x02
  29. pongMsg = 0x03
  30. )
  31. // protoHandshake is the RLP structure of the protocol handshake.
  32. type protoHandshake struct {
  33. Version uint64
  34. Name string
  35. Caps []Cap
  36. ListenPort uint64
  37. ID []byte // secp256k1 public key
  38. // Ignore additional fields (for forward compatibility).
  39. //Rest []rlp.RawValue `rlp:"tail"`
  40. }
  41. type PeerEventType string
  42. const (
  43. // PeerEventTypeAdd is the type of event emitted when a peer is added
  44. // to a p2p.Server
  45. PeerEventTypeAdd PeerEventType = "add"
  46. // PeerEventTypeDrop is the type of event emitted when a peer is
  47. // dropped from a p2p.Server
  48. PeerEventTypeDrop PeerEventType = "drop"
  49. // PeerEventTypeMsgSend is the type of event emitted when a
  50. // message is successfully sent to a peer
  51. PeerEventTypeMsgSend PeerEventType = "msgsend"
  52. // PeerEventTypeMsgRecv is the type of event emitted when a
  53. // message is received from a peer
  54. PeerEventTypeMsgRecv PeerEventType = "msgrecv"
  55. )
  56. // PeerEvent is an event emitted when peers are either added or dropped from
  57. // a p2p.Server or when a message is sent or received on a peer connection
  58. type PeerEvent struct {
  59. Type PeerEventType `json:"type"`
  60. Peer enode.ID `json:"peer"`
  61. Error string `json:"error,omitempty"`
  62. Protocol string `json:"protocol,omitempty"`
  63. MsgCode *uint64 `json:"msg_code,omitempty"`
  64. MsgSize *uint32 `json:"msg_size,omitempty"`
  65. LocalAddress string `json:"local,omitempty"`
  66. RemoteAddress string `json:"remote,omitempty"`
  67. }
  68. // Peer represents a connected remote node.
  69. type Peer struct {
  70. rw *conn
  71. running map[string]*protoRW
  72. created mclock.AbsTime
  73. protoErr chan error
  74. closed chan struct{}
  75. disc chan DiscReason
  76. wg sync.WaitGroup
  77. //log log.Logger
  78. // events receives message send / receive events if set
  79. //events *event.Feed
  80. }
  81. func newPeer(conn *conn, protocols []Protocol) *Peer {
  82. protoMap := matchProtocols(protocols, conn.caps, conn)
  83. p := &Peer{
  84. rw: conn,
  85. running: protoMap,
  86. created: mclock.Now(),
  87. protoErr: make(chan error, len(protoMap)+1),
  88. closed: make(chan struct{}),
  89. }
  90. return p
  91. }
  92. func (p *Peer) Inbound() bool {
  93. return p.rw.is(inboundConn)
  94. }
  95. func (p *Peer) ID() enode.ID {
  96. return p.rw.node.ID()
  97. }
  98. func (p *Peer) RemoteAddr() net.Addr {
  99. return p.rw.fd.RemoteAddr()
  100. }
  101. func (p *Peer) LocalAddr() net.Addr {
  102. return p.rw.fd.LocalAddr()
  103. }
  104. func (p *Peer) run() (remoteRequested bool, err error) {
  105. var (
  106. writeStart = make(chan struct{}, 1)
  107. writeErr = make(chan error, 1)
  108. readErr = make(chan error, 1)
  109. reason DiscReason
  110. )
  111. p.wg.Add(2)
  112. go p.readLoop(readErr)
  113. go p.pingLoop()
  114. writeStart <- struct{}{}
  115. loop:
  116. for {
  117. select {
  118. case err = <-writeErr:
  119. if err != nil {
  120. reason = DiscNetworkError
  121. break loop
  122. }
  123. writeStart <- struct{}{}
  124. case err = <-readErr:
  125. if r, ok := err.(DiscReason); ok {
  126. remoteRequested = true
  127. reason = r
  128. } else {
  129. reason = DiscNetworkError
  130. }
  131. break loop
  132. case err = <-p.protoErr:
  133. reason = discReasonForError(err)
  134. break loop
  135. case err = <-p.disc:
  136. reason = discReasonForError(err)
  137. break loop
  138. }
  139. }
  140. close(p.closed)
  141. p.rw.close(reason)
  142. p.wg.Wait()
  143. return remoteRequested, err
  144. }
  145. func (p *Peer) readLoop(errc chan<- error) {
  146. defer p.wg.Done()
  147. for {
  148. msg, err := p.rw.ReadMsg()
  149. if err != nil {
  150. errc <- err
  151. return
  152. }
  153. msg.ReceivedAt = time.Now()
  154. if err = p.handle(msg); err != nil {
  155. errc <- err
  156. return
  157. }
  158. }
  159. }
  160. func (p *Peer) handle(msg Msg) error {
  161. fmt.Printf("code: %v, msg: %v.\n", msg.Code, msg.Payload)
  162. switch {
  163. case msg.Code == pingMsg:
  164. msg.Discard()
  165. gopool.Submit(func() {
  166. SendItems(p.rw, pongMsg)
  167. })
  168. case msg.Code == discMsg:
  169. var m struct{ R DiscReason }
  170. rlp.Decode(msg.Payload, &m)
  171. return m.R
  172. case msg.Code < baseProtocolLength:
  173. return msg.Discard()
  174. }
  175. return nil
  176. }
  177. func (p *Peer) pingLoop() {
  178. ping := time.NewTimer(pingInterval)
  179. defer p.wg.Done()
  180. defer ping.Stop()
  181. for {
  182. select {
  183. case <-ping.C:
  184. if err := SendItems(p.rw, pingMsg); err != nil {
  185. p.protoErr <- err
  186. return
  187. }
  188. ping.Reset(pingInterval)
  189. case <-p.closed:
  190. return
  191. }
  192. }
  193. }
  194. type protoRW struct {
  195. Protocol
  196. in chan Msg // receives read messages
  197. closed <-chan struct{} // receives when peer is shutting down
  198. wstart <-chan struct{} // receives when write may start
  199. werr chan<- error // for write results
  200. offset uint64
  201. w MsgWriter
  202. }
  203. func matchProtocols(protocols []Protocol, caps []Cap, rw MsgReadWriter) map[string]*protoRW {
  204. sort.Sort(capsByNameAndVersion(caps))
  205. offset := baseProtocolLength
  206. result := make(map[string]*protoRW)
  207. outer:
  208. for _, capability := range caps {
  209. for _, proto := range protocols {
  210. if proto.Name == capability.Name && proto.Version == capability.Version {
  211. if old := result[capability.Name]; old != nil {
  212. offset -= old.Length
  213. }
  214. result[capability.Name] = &protoRW{Protocol: proto, offset: offset, in: make(chan Msg), w: rw}
  215. offset += proto.Length
  216. continue outer
  217. }
  218. }
  219. }
  220. return result
  221. }