protocol.go 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290
  1. package p2p
  2. import (
  3. "bytes"
  4. "net"
  5. "time"
  6. "github.com/ethereum/go-ethereum/ethutil"
  7. )
  8. // Protocol represents a P2P subprotocol implementation.
  9. type Protocol struct {
  10. // Name should contain the official protocol name,
  11. // often a three-letter word.
  12. Name string
  13. // Version should contain the version number of the protocol.
  14. Version uint
  15. // Length should contain the number of message codes used
  16. // by the protocol.
  17. Length uint64
  18. // Run is called in a new groutine when the protocol has been
  19. // negotiated with a peer. It should read and write messages from
  20. // rw. The Payload for each message must be fully consumed.
  21. //
  22. // The peer connection is closed when Start returns. It should return
  23. // any protocol-level error (such as an I/O error) that is
  24. // encountered.
  25. Run func(peer *Peer, rw MsgReadWriter) error
  26. }
  27. func (p Protocol) cap() Cap {
  28. return Cap{p.Name, p.Version}
  29. }
  30. const (
  31. baseProtocolVersion = 2
  32. baseProtocolLength = uint64(16)
  33. baseProtocolMaxMsgSize = 10 * 1024 * 1024
  34. )
  35. const (
  36. // devp2p message codes
  37. handshakeMsg = 0x00
  38. discMsg = 0x01
  39. pingMsg = 0x02
  40. pongMsg = 0x03
  41. getPeersMsg = 0x04
  42. peersMsg = 0x05
  43. )
  44. // handshake is the structure of a handshake list.
  45. type handshake struct {
  46. Version uint64
  47. ID string
  48. Caps []Cap
  49. ListenPort uint64
  50. NodeID []byte
  51. }
  52. func (h *handshake) String() string {
  53. return h.ID
  54. }
  55. func (h *handshake) Pubkey() []byte {
  56. return h.NodeID
  57. }
  58. // Cap is the structure of a peer capability.
  59. type Cap struct {
  60. Name string
  61. Version uint
  62. }
  63. func (cap Cap) RlpData() interface{} {
  64. return []interface{}{cap.Name, cap.Version}
  65. }
  66. type capsByName []Cap
  67. func (cs capsByName) Len() int { return len(cs) }
  68. func (cs capsByName) Less(i, j int) bool { return cs[i].Name < cs[j].Name }
  69. func (cs capsByName) Swap(i, j int) { cs[i], cs[j] = cs[j], cs[i] }
  70. type baseProtocol struct {
  71. rw MsgReadWriter
  72. peer *Peer
  73. }
  74. func runBaseProtocol(peer *Peer, rw MsgReadWriter) error {
  75. bp := &baseProtocol{rw, peer}
  76. // do handshake
  77. if err := rw.WriteMsg(bp.handshakeMsg()); err != nil {
  78. return err
  79. }
  80. msg, err := rw.ReadMsg()
  81. if err != nil {
  82. return err
  83. }
  84. if msg.Code != handshakeMsg {
  85. return newPeerError(errProtocolBreach, "first message must be handshake, got %x", msg.Code)
  86. }
  87. data, err := msg.Data()
  88. if err != nil {
  89. return newPeerError(errInvalidMsg, "%v", err)
  90. }
  91. if err := bp.handleHandshake(data); err != nil {
  92. return err
  93. }
  94. // run main loop
  95. quit := make(chan error, 1)
  96. go func() {
  97. quit <- MsgLoop(rw, baseProtocolMaxMsgSize, bp.handle)
  98. }()
  99. return bp.loop(quit)
  100. }
  101. var pingTimeout = 2 * time.Second
  102. func (bp *baseProtocol) loop(quit <-chan error) error {
  103. ping := time.NewTimer(pingTimeout)
  104. activity := bp.peer.activity.Subscribe(time.Time{})
  105. lastActive := time.Time{}
  106. defer ping.Stop()
  107. defer activity.Unsubscribe()
  108. getPeersTick := time.NewTicker(10 * time.Second)
  109. defer getPeersTick.Stop()
  110. err := bp.rw.EncodeMsg(getPeersMsg)
  111. for err == nil {
  112. select {
  113. case err = <-quit:
  114. return err
  115. case <-getPeersTick.C:
  116. err = bp.rw.EncodeMsg(getPeersMsg)
  117. case event := <-activity.Chan():
  118. ping.Reset(pingTimeout)
  119. lastActive = event.(time.Time)
  120. case t := <-ping.C:
  121. if lastActive.Add(pingTimeout * 2).Before(t) {
  122. err = newPeerError(errPingTimeout, "")
  123. } else if lastActive.Add(pingTimeout).Before(t) {
  124. err = bp.rw.EncodeMsg(pingMsg)
  125. }
  126. }
  127. }
  128. return err
  129. }
  130. func (bp *baseProtocol) handle(code uint64, data *ethutil.Value) error {
  131. switch code {
  132. case handshakeMsg:
  133. return newPeerError(errProtocolBreach, "extra handshake received")
  134. case discMsg:
  135. bp.peer.Disconnect(DiscReason(data.Get(0).Uint()))
  136. return nil
  137. case pingMsg:
  138. return bp.rw.EncodeMsg(pongMsg)
  139. case pongMsg:
  140. case getPeersMsg:
  141. peers := bp.peerList()
  142. // this is dangerous. the spec says that we should _delay_
  143. // sending the response if no new information is available.
  144. // this means that would need to send a response later when
  145. // new peers become available.
  146. //
  147. // TODO: add event mechanism to notify baseProtocol for new peers
  148. if len(peers) > 0 {
  149. return bp.rw.EncodeMsg(peersMsg, peers)
  150. }
  151. case peersMsg:
  152. bp.handlePeers(data)
  153. default:
  154. return newPeerError(errInvalidMsgCode, "unknown message code %v", code)
  155. }
  156. return nil
  157. }
  158. func (bp *baseProtocol) handlePeers(data *ethutil.Value) {
  159. it := data.NewIterator()
  160. for it.Next() {
  161. addr := &peerAddr{
  162. IP: net.IP(it.Value().Get(0).Bytes()),
  163. Port: it.Value().Get(1).Uint(),
  164. Pubkey: it.Value().Get(2).Bytes(),
  165. }
  166. bp.peer.Debugf("received peer suggestion: %v", addr)
  167. bp.peer.newPeerAddr <- addr
  168. }
  169. }
  170. func (bp *baseProtocol) handleHandshake(c *ethutil.Value) error {
  171. hs := handshake{
  172. Version: c.Get(0).Uint(),
  173. ID: c.Get(1).Str(),
  174. Caps: nil, // decoded below
  175. ListenPort: c.Get(3).Uint(),
  176. NodeID: c.Get(4).Bytes(),
  177. }
  178. if hs.Version != baseProtocolVersion {
  179. return newPeerError(errP2PVersionMismatch, "Require protocol %d, received %d\n",
  180. baseProtocolVersion, hs.Version)
  181. }
  182. if len(hs.NodeID) == 0 {
  183. return newPeerError(errPubkeyMissing, "")
  184. }
  185. if len(hs.NodeID) != 64 {
  186. return newPeerError(errPubkeyInvalid, "require 512 bit, got %v", len(hs.NodeID)*8)
  187. }
  188. if da := bp.peer.dialAddr; da != nil {
  189. // verify that the peer we wanted to connect to
  190. // actually holds the target public key.
  191. if da.Pubkey != nil && !bytes.Equal(da.Pubkey, hs.NodeID) {
  192. return newPeerError(errPubkeyForbidden, "dial address pubkey mismatch")
  193. }
  194. }
  195. pa := newPeerAddr(bp.peer.conn.RemoteAddr(), hs.NodeID)
  196. if err := bp.peer.pubkeyHook(pa); err != nil {
  197. return newPeerError(errPubkeyForbidden, "%v", err)
  198. }
  199. capsIt := c.Get(2).NewIterator()
  200. for capsIt.Next() {
  201. cap := capsIt.Value()
  202. name := cap.Get(0).Str()
  203. if name != "" {
  204. hs.Caps = append(hs.Caps, Cap{Name: name, Version: uint(cap.Get(1).Uint())})
  205. }
  206. }
  207. var addr *peerAddr
  208. if hs.ListenPort != 0 {
  209. addr = newPeerAddr(bp.peer.conn.RemoteAddr(), hs.NodeID)
  210. addr.Port = hs.ListenPort
  211. }
  212. bp.peer.setHandshakeInfo(&hs, addr, hs.Caps)
  213. bp.peer.startSubprotocols(hs.Caps)
  214. return nil
  215. }
  216. func (bp *baseProtocol) handshakeMsg() Msg {
  217. var (
  218. port uint64
  219. caps []interface{}
  220. )
  221. if bp.peer.ourListenAddr != nil {
  222. port = bp.peer.ourListenAddr.Port
  223. }
  224. for _, proto := range bp.peer.protocols {
  225. caps = append(caps, proto.cap())
  226. }
  227. return NewMsg(handshakeMsg,
  228. baseProtocolVersion,
  229. bp.peer.ourID.String(),
  230. caps,
  231. port,
  232. bp.peer.ourID.Pubkey()[1:],
  233. )
  234. }
  235. func (bp *baseProtocol) peerList() []ethutil.RlpEncodable {
  236. peers := bp.peer.otherPeers()
  237. ds := make([]ethutil.RlpEncodable, 0, len(peers))
  238. for _, p := range peers {
  239. p.infolock.Lock()
  240. addr := p.listenAddr
  241. p.infolock.Unlock()
  242. // filter out this peer and peers that are not listening or
  243. // have not completed the handshake.
  244. // TODO: track previously sent peers and exclude them as well.
  245. if p == bp.peer || addr == nil {
  246. continue
  247. }
  248. ds = append(ds, addr)
  249. }
  250. ourAddr := bp.peer.ourListenAddr
  251. if ourAddr != nil && !ourAddr.IP.IsLoopback() && !ourAddr.IP.IsUnspecified() {
  252. ds = append(ds, ourAddr)
  253. }
  254. return ds
  255. }