protocol.go 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292
  1. package p2p
  2. import (
  3. "bytes"
  4. "time"
  5. )
  6. // Protocol represents a P2P subprotocol implementation.
  7. type Protocol struct {
  8. // Name should contain the official protocol name,
  9. // often a three-letter word.
  10. Name string
  11. // Version should contain the version number of the protocol.
  12. Version uint
  13. // Length should contain the number of message codes used
  14. // by the protocol.
  15. Length uint64
  16. // Run is called in a new groutine when the protocol has been
  17. // negotiated with a peer. It should read and write messages from
  18. // rw. The Payload for each message must be fully consumed.
  19. //
  20. // The peer connection is closed when Start returns. It should return
  21. // any protocol-level error (such as an I/O error) that is
  22. // encountered.
  23. Run func(peer *Peer, rw MsgReadWriter) error
  24. }
  25. func (p Protocol) cap() Cap {
  26. return Cap{p.Name, p.Version}
  27. }
  28. const (
  29. baseProtocolVersion = 2
  30. baseProtocolLength = uint64(16)
  31. baseProtocolMaxMsgSize = 10 * 1024 * 1024
  32. )
  33. const (
  34. // devp2p message codes
  35. handshakeMsg = 0x00
  36. discMsg = 0x01
  37. pingMsg = 0x02
  38. pongMsg = 0x03
  39. getPeersMsg = 0x04
  40. peersMsg = 0x05
  41. )
  42. // handshake is the structure of a handshake list.
  43. type handshake struct {
  44. Version uint64
  45. ID string
  46. Caps []Cap
  47. ListenPort uint64
  48. NodeID []byte
  49. }
  50. func (h *handshake) String() string {
  51. return h.ID
  52. }
  53. func (h *handshake) Pubkey() []byte {
  54. return h.NodeID
  55. }
  56. func (h *handshake) PrivKey() []byte {
  57. return nil
  58. }
  59. // Cap is the structure of a peer capability.
  60. type Cap struct {
  61. Name string
  62. Version uint
  63. }
  64. func (cap Cap) RlpData() interface{} {
  65. return []interface{}{cap.Name, cap.Version}
  66. }
  67. type capsByName []Cap
  68. func (cs capsByName) Len() int { return len(cs) }
  69. func (cs capsByName) Less(i, j int) bool { return cs[i].Name < cs[j].Name }
  70. func (cs capsByName) Swap(i, j int) { cs[i], cs[j] = cs[j], cs[i] }
  71. type baseProtocol struct {
  72. rw MsgReadWriter
  73. peer *Peer
  74. }
  75. func runBaseProtocol(peer *Peer, rw MsgReadWriter) error {
  76. bp := &baseProtocol{rw, peer}
  77. errc := make(chan error, 1)
  78. go func() { errc <- rw.WriteMsg(bp.handshakeMsg()) }()
  79. if err := bp.readHandshake(); err != nil {
  80. return err
  81. }
  82. // handle write error
  83. if err := <-errc; err != nil {
  84. return err
  85. }
  86. // run main loop
  87. go func() {
  88. for {
  89. if err := bp.handle(rw); err != nil {
  90. errc <- err
  91. break
  92. }
  93. }
  94. }()
  95. return bp.loop(errc)
  96. }
  97. var pingTimeout = 2 * time.Second
  98. func (bp *baseProtocol) loop(quit <-chan error) error {
  99. ping := time.NewTimer(pingTimeout)
  100. activity := bp.peer.activity.Subscribe(time.Time{})
  101. lastActive := time.Time{}
  102. defer ping.Stop()
  103. defer activity.Unsubscribe()
  104. getPeersTick := time.NewTicker(10 * time.Second)
  105. defer getPeersTick.Stop()
  106. err := EncodeMsg(bp.rw, getPeersMsg)
  107. for err == nil {
  108. select {
  109. case err = <-quit:
  110. return err
  111. case <-getPeersTick.C:
  112. err = EncodeMsg(bp.rw, getPeersMsg)
  113. case event := <-activity.Chan():
  114. ping.Reset(pingTimeout)
  115. lastActive = event.(time.Time)
  116. case t := <-ping.C:
  117. if lastActive.Add(pingTimeout * 2).Before(t) {
  118. err = newPeerError(errPingTimeout, "")
  119. } else if lastActive.Add(pingTimeout).Before(t) {
  120. err = EncodeMsg(bp.rw, pingMsg)
  121. }
  122. }
  123. }
  124. return err
  125. }
  126. func (bp *baseProtocol) handle(rw MsgReadWriter) error {
  127. msg, err := rw.ReadMsg()
  128. if err != nil {
  129. return err
  130. }
  131. if msg.Size > baseProtocolMaxMsgSize {
  132. return newPeerError(errMisc, "message too big")
  133. }
  134. // make sure that the payload has been fully consumed
  135. defer msg.Discard()
  136. switch msg.Code {
  137. case handshakeMsg:
  138. return newPeerError(errProtocolBreach, "extra handshake received")
  139. case discMsg:
  140. var reason [1]DiscReason
  141. if err := msg.Decode(&reason); err != nil {
  142. return err
  143. }
  144. return discRequestedError(reason[0])
  145. case pingMsg:
  146. return EncodeMsg(bp.rw, pongMsg)
  147. case pongMsg:
  148. case getPeersMsg:
  149. peers := bp.peerList()
  150. // this is dangerous. the spec says that we should _delay_
  151. // sending the response if no new information is available.
  152. // this means that would need to send a response later when
  153. // new peers become available.
  154. //
  155. // TODO: add event mechanism to notify baseProtocol for new peers
  156. if len(peers) > 0 {
  157. return EncodeMsg(bp.rw, peersMsg, peers...)
  158. }
  159. case peersMsg:
  160. var peers []*peerAddr
  161. if err := msg.Decode(&peers); err != nil {
  162. return err
  163. }
  164. for _, addr := range peers {
  165. bp.peer.Debugf("received peer suggestion: %v", addr)
  166. bp.peer.newPeerAddr <- addr
  167. }
  168. default:
  169. return newPeerError(errInvalidMsgCode, "unknown message code %v", msg.Code)
  170. }
  171. return nil
  172. }
  173. func (bp *baseProtocol) readHandshake() error {
  174. // read and handle remote handshake
  175. msg, err := bp.rw.ReadMsg()
  176. if err != nil {
  177. return err
  178. }
  179. if msg.Code != handshakeMsg {
  180. return newPeerError(errProtocolBreach, "first message must be handshake, got %x", msg.Code)
  181. }
  182. if msg.Size > baseProtocolMaxMsgSize {
  183. return newPeerError(errMisc, "message too big")
  184. }
  185. var hs handshake
  186. if err := msg.Decode(&hs); err != nil {
  187. return err
  188. }
  189. // validate handshake info
  190. if hs.Version != baseProtocolVersion {
  191. return newPeerError(errP2PVersionMismatch, "Require protocol %d, received %d\n",
  192. baseProtocolVersion, hs.Version)
  193. }
  194. if len(hs.NodeID) == 0 {
  195. return newPeerError(errPubkeyMissing, "")
  196. }
  197. if len(hs.NodeID) != 64 {
  198. return newPeerError(errPubkeyInvalid, "require 512 bit, got %v", len(hs.NodeID)*8)
  199. }
  200. if da := bp.peer.dialAddr; da != nil {
  201. // verify that the peer we wanted to connect to
  202. // actually holds the target public key.
  203. if da.Pubkey != nil && !bytes.Equal(da.Pubkey, hs.NodeID) {
  204. return newPeerError(errPubkeyForbidden, "dial address pubkey mismatch")
  205. }
  206. }
  207. pa := newPeerAddr(bp.peer.conn.RemoteAddr(), hs.NodeID)
  208. if err := bp.peer.pubkeyHook(pa); err != nil {
  209. return newPeerError(errPubkeyForbidden, "%v", err)
  210. }
  211. // TODO: remove Caps with empty name
  212. var addr *peerAddr
  213. if hs.ListenPort != 0 {
  214. addr = newPeerAddr(bp.peer.conn.RemoteAddr(), hs.NodeID)
  215. addr.Port = hs.ListenPort
  216. }
  217. bp.peer.setHandshakeInfo(&hs, addr, hs.Caps)
  218. bp.peer.startSubprotocols(hs.Caps)
  219. return nil
  220. }
  221. func (bp *baseProtocol) handshakeMsg() Msg {
  222. var (
  223. port uint64
  224. caps []interface{}
  225. )
  226. if bp.peer.ourListenAddr != nil {
  227. port = bp.peer.ourListenAddr.Port
  228. }
  229. for _, proto := range bp.peer.protocols {
  230. caps = append(caps, proto.cap())
  231. }
  232. return NewMsg(handshakeMsg,
  233. baseProtocolVersion,
  234. bp.peer.ourID.String(),
  235. caps,
  236. port,
  237. bp.peer.ourID.Pubkey()[1:],
  238. )
  239. }
  240. func (bp *baseProtocol) peerList() []interface{} {
  241. peers := bp.peer.otherPeers()
  242. ds := make([]interface{}, 0, len(peers))
  243. for _, p := range peers {
  244. p.infolock.Lock()
  245. addr := p.listenAddr
  246. p.infolock.Unlock()
  247. // filter out this peer and peers that are not listening or
  248. // have not completed the handshake.
  249. // TODO: track previously sent peers and exclude them as well.
  250. if p == bp.peer || addr == nil {
  251. continue
  252. }
  253. ds = append(ds, addr)
  254. }
  255. ourAddr := bp.peer.ourListenAddr
  256. if ourAddr != nil && !ourAddr.IP.IsLoopback() && !ourAddr.IP.IsUnspecified() {
  257. ds = append(ds, ourAddr)
  258. }
  259. return ds
  260. }