protocol.go 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288
  1. package p2p
  2. import (
  3. "bytes"
  4. "time"
  5. )
  6. // Protocol represents a P2P subprotocol implementation.
  7. type Protocol struct {
  8. // Name should contain the official protocol name,
  9. // often a three-letter word.
  10. Name string
  11. // Version should contain the version number of the protocol.
  12. Version uint
  13. // Length should contain the number of message codes used
  14. // by the protocol.
  15. Length uint64
  16. // Run is called in a new groutine when the protocol has been
  17. // negotiated with a peer. It should read and write messages from
  18. // rw. The Payload for each message must be fully consumed.
  19. //
  20. // The peer connection is closed when Start returns. It should return
  21. // any protocol-level error (such as an I/O error) that is
  22. // encountered.
  23. Run func(peer *Peer, rw MsgReadWriter) error
  24. }
  25. func (p Protocol) cap() Cap {
  26. return Cap{p.Name, p.Version}
  27. }
  28. const (
  29. baseProtocolVersion = 2
  30. baseProtocolLength = uint64(16)
  31. baseProtocolMaxMsgSize = 10 * 1024 * 1024
  32. )
  33. const (
  34. // devp2p message codes
  35. handshakeMsg = 0x00
  36. discMsg = 0x01
  37. pingMsg = 0x02
  38. pongMsg = 0x03
  39. getPeersMsg = 0x04
  40. peersMsg = 0x05
  41. )
  42. // handshake is the structure of a handshake list.
  43. type handshake struct {
  44. Version uint64
  45. ID string
  46. Caps []Cap
  47. ListenPort uint64
  48. NodeID []byte
  49. }
  50. func (h *handshake) String() string {
  51. return h.ID
  52. }
  53. func (h *handshake) Pubkey() []byte {
  54. return h.NodeID
  55. }
  56. // Cap is the structure of a peer capability.
  57. type Cap struct {
  58. Name string
  59. Version uint
  60. }
  61. func (cap Cap) RlpData() interface{} {
  62. return []interface{}{cap.Name, cap.Version}
  63. }
  64. type capsByName []Cap
  65. func (cs capsByName) Len() int { return len(cs) }
  66. func (cs capsByName) Less(i, j int) bool { return cs[i].Name < cs[j].Name }
  67. func (cs capsByName) Swap(i, j int) { cs[i], cs[j] = cs[j], cs[i] }
  68. type baseProtocol struct {
  69. rw MsgReadWriter
  70. peer *Peer
  71. }
  72. func runBaseProtocol(peer *Peer, rw MsgReadWriter) error {
  73. bp := &baseProtocol{rw, peer}
  74. errc := make(chan error, 1)
  75. go func() { errc <- rw.WriteMsg(bp.handshakeMsg()) }()
  76. if err := bp.readHandshake(); err != nil {
  77. return err
  78. }
  79. // handle write error
  80. if err := <-errc; err != nil {
  81. return err
  82. }
  83. // run main loop
  84. go func() {
  85. for {
  86. if err := bp.handle(rw); err != nil {
  87. errc <- err
  88. break
  89. }
  90. }
  91. }()
  92. return bp.loop(errc)
  93. }
  94. var pingTimeout = 2 * time.Second
  95. func (bp *baseProtocol) loop(quit <-chan error) error {
  96. ping := time.NewTimer(pingTimeout)
  97. activity := bp.peer.activity.Subscribe(time.Time{})
  98. lastActive := time.Time{}
  99. defer ping.Stop()
  100. defer activity.Unsubscribe()
  101. getPeersTick := time.NewTicker(10 * time.Second)
  102. defer getPeersTick.Stop()
  103. err := EncodeMsg(bp.rw, getPeersMsg)
  104. for err == nil {
  105. select {
  106. case err = <-quit:
  107. return err
  108. case <-getPeersTick.C:
  109. err = EncodeMsg(bp.rw, getPeersMsg)
  110. case event := <-activity.Chan():
  111. ping.Reset(pingTimeout)
  112. lastActive = event.(time.Time)
  113. case t := <-ping.C:
  114. if lastActive.Add(pingTimeout * 2).Before(t) {
  115. err = newPeerError(errPingTimeout, "")
  116. } else if lastActive.Add(pingTimeout).Before(t) {
  117. err = EncodeMsg(bp.rw, pingMsg)
  118. }
  119. }
  120. }
  121. return err
  122. }
  123. func (bp *baseProtocol) handle(rw MsgReadWriter) error {
  124. msg, err := rw.ReadMsg()
  125. if err != nil {
  126. return err
  127. }
  128. if msg.Size > baseProtocolMaxMsgSize {
  129. return newPeerError(errMisc, "message too big")
  130. }
  131. // make sure that the payload has been fully consumed
  132. defer msg.Discard()
  133. switch msg.Code {
  134. case handshakeMsg:
  135. return newPeerError(errProtocolBreach, "extra handshake received")
  136. case discMsg:
  137. var reason [1]DiscReason
  138. if err := msg.Decode(&reason); err != nil {
  139. return err
  140. }
  141. return discRequestedError(reason[0])
  142. case pingMsg:
  143. return EncodeMsg(bp.rw, pongMsg)
  144. case pongMsg:
  145. case getPeersMsg:
  146. peers := bp.peerList()
  147. // this is dangerous. the spec says that we should _delay_
  148. // sending the response if no new information is available.
  149. // this means that would need to send a response later when
  150. // new peers become available.
  151. //
  152. // TODO: add event mechanism to notify baseProtocol for new peers
  153. if len(peers) > 0 {
  154. return EncodeMsg(bp.rw, peersMsg, peers...)
  155. }
  156. case peersMsg:
  157. var peers []*peerAddr
  158. if err := msg.Decode(&peers); err != nil {
  159. return err
  160. }
  161. for _, addr := range peers {
  162. bp.peer.Debugf("received peer suggestion: %v", addr)
  163. bp.peer.newPeerAddr <- addr
  164. }
  165. default:
  166. return newPeerError(errInvalidMsgCode, "unknown message code %v", msg.Code)
  167. }
  168. return nil
  169. }
  170. func (bp *baseProtocol) readHandshake() error {
  171. // read and handle remote handshake
  172. msg, err := bp.rw.ReadMsg()
  173. if err != nil {
  174. return err
  175. }
  176. if msg.Code != handshakeMsg {
  177. return newPeerError(errProtocolBreach, "first message must be handshake, got %x", msg.Code)
  178. }
  179. if msg.Size > baseProtocolMaxMsgSize {
  180. return newPeerError(errMisc, "message too big")
  181. }
  182. var hs handshake
  183. if err := msg.Decode(&hs); err != nil {
  184. return err
  185. }
  186. // validate handshake info
  187. if hs.Version != baseProtocolVersion {
  188. return newPeerError(errP2PVersionMismatch, "Require protocol %d, received %d\n",
  189. baseProtocolVersion, hs.Version)
  190. }
  191. if len(hs.NodeID) == 0 {
  192. return newPeerError(errPubkeyMissing, "")
  193. }
  194. if len(hs.NodeID) != 64 {
  195. return newPeerError(errPubkeyInvalid, "require 512 bit, got %v", len(hs.NodeID)*8)
  196. }
  197. if da := bp.peer.dialAddr; da != nil {
  198. // verify that the peer we wanted to connect to
  199. // actually holds the target public key.
  200. if da.Pubkey != nil && !bytes.Equal(da.Pubkey, hs.NodeID) {
  201. return newPeerError(errPubkeyForbidden, "dial address pubkey mismatch")
  202. }
  203. }
  204. pa := newPeerAddr(bp.peer.conn.RemoteAddr(), hs.NodeID)
  205. if err := bp.peer.pubkeyHook(pa); err != nil {
  206. return newPeerError(errPubkeyForbidden, "%v", err)
  207. }
  208. // TODO: remove Caps with empty name
  209. var addr *peerAddr
  210. if hs.ListenPort != 0 {
  211. addr = newPeerAddr(bp.peer.conn.RemoteAddr(), hs.NodeID)
  212. addr.Port = hs.ListenPort
  213. }
  214. bp.peer.setHandshakeInfo(&hs, addr, hs.Caps)
  215. bp.peer.startSubprotocols(hs.Caps)
  216. return nil
  217. }
  218. func (bp *baseProtocol) handshakeMsg() Msg {
  219. var (
  220. port uint64
  221. caps []interface{}
  222. )
  223. if bp.peer.ourListenAddr != nil {
  224. port = bp.peer.ourListenAddr.Port
  225. }
  226. for _, proto := range bp.peer.protocols {
  227. caps = append(caps, proto.cap())
  228. }
  229. return NewMsg(handshakeMsg,
  230. baseProtocolVersion,
  231. bp.peer.ourID.String(),
  232. caps,
  233. port,
  234. bp.peer.ourID.Pubkey()[1:],
  235. )
  236. }
  237. func (bp *baseProtocol) peerList() []interface{} {
  238. peers := bp.peer.otherPeers()
  239. ds := make([]interface{}, 0, len(peers))
  240. for _, p := range peers {
  241. p.infolock.Lock()
  242. addr := p.listenAddr
  243. p.infolock.Unlock()
  244. // filter out this peer and peers that are not listening or
  245. // have not completed the handshake.
  246. // TODO: track previously sent peers and exclude them as well.
  247. if p == bp.peer || addr == nil {
  248. continue
  249. }
  250. ds = append(ds, addr)
  251. }
  252. ourAddr := bp.peer.ourListenAddr
  253. if ourAddr != nil && !ourAddr.IP.IsLoopback() && !ourAddr.IP.IsUnspecified() {
  254. ds = append(ds, ourAddr)
  255. }
  256. return ds
  257. }