protocol.go 7.8 KB


  1. package p2p
  2. import (
  3. "bytes"
  4. "net"
  5. "sort"
  6. "time"
  7. "github.com/ethereum/go-ethereum/ethutil"
  8. )
  9. // Protocol is implemented by P2P subprotocols.
  10. type Protocol interface {
  11. // Start is called when the protocol becomes active.
  12. // It should read and write messages from rw.
  13. // Messages must be fully consumed.
  14. //
  15. // The connection is closed when Start returns. It should return
  16. // any protocol-level error (such as an I/O error) that is
  17. // encountered.
  18. Start(peer *Peer, rw MsgReadWriter) error
  19. // Offset should return the number of message codes
  20. // used by the protocol.
  21. Offset() MsgCode
  22. }
  23. type MsgReader interface {
  24. ReadMsg() (Msg, error)
  25. }
  26. type MsgWriter interface {
  27. WriteMsg(Msg) error
  28. }
  29. // MsgReadWriter is passed to protocols. Protocol implementations can
  30. // use it to write messages back to a connected peer.
  31. type MsgReadWriter interface {
  32. MsgReader
  33. MsgWriter
  34. }
  35. type MsgHandler func(code MsgCode, data *ethutil.Value) error
  36. // MsgLoop reads messages off the given reader and
  37. // calls the handler function for each decoded message until
  38. // it returns an error or the peer connection is closed.
  39. //
  40. // If a message is larger than the given maximum size, RunProtocol
  41. // returns an appropriate error.n
  42. func MsgLoop(r MsgReader, maxsize uint32, handler MsgHandler) error {
  43. for {
  44. msg, err := r.ReadMsg()
  45. if err != nil {
  46. return err
  47. }
  48. if msg.Size > maxsize {
  49. return NewPeerError(InvalidMsg, "size %d exceeds maximum size of %d", msg.Size, maxsize)
  50. }
  51. value, err := msg.Data()
  52. if err != nil {
  53. return err
  54. }
  55. if err := handler(msg.Code, value); err != nil {
  56. return err
  57. }
  58. }
  59. }
  60. // the ÐΞVp2p base protocol
  61. type baseProtocol struct {
  62. rw MsgReadWriter
  63. peer *Peer
  64. }
  65. type bpMsg struct {
  66. code MsgCode
  67. data *ethutil.Value
  68. }
  69. const (
  70. p2pVersion = 0
  71. pingTimeout = 2 * time.Second
  72. pingGracePeriod = 2 * time.Second
  73. )
  74. const (
  75. // message codes
  76. handshakeMsg = iota
  77. discMsg
  78. pingMsg
  79. pongMsg
  80. getPeersMsg
  81. peersMsg
  82. )
  83. const (
  84. baseProtocolOffset MsgCode = 16
  85. baseProtocolMaxMsgSize = 500 * 1024
  86. )
  87. type DiscReason byte
  88. const (
  89. // Values are given explicitly instead of by iota because these values are
  90. // defined by the wire protocol spec; it is easier for humans to ensure
  91. // correctness when values are explicit.
  92. DiscRequested = 0x00
  93. DiscNetworkError = 0x01
  94. DiscProtocolError = 0x02
  95. DiscUselessPeer = 0x03
  96. DiscTooManyPeers = 0x04
  97. DiscAlreadyConnected = 0x05
  98. DiscIncompatibleVersion = 0x06
  99. DiscInvalidIdentity = 0x07
  100. DiscQuitting = 0x08
  101. DiscUnexpectedIdentity = 0x09
  102. DiscSelf = 0x0a
  103. DiscReadTimeout = 0x0b
  104. DiscSubprotocolError = 0x10
  105. )
  106. var discReasonToString = [DiscSubprotocolError + 1]string{
  107. DiscRequested: "Disconnect requested",
  108. DiscNetworkError: "Network error",
  109. DiscProtocolError: "Breach of protocol",
  110. DiscUselessPeer: "Useless peer",
  111. DiscTooManyPeers: "Too many peers",
  112. DiscAlreadyConnected: "Already connected",
  113. DiscIncompatibleVersion: "Incompatible P2P protocol version",
  114. DiscInvalidIdentity: "Invalid node identity",
  115. DiscQuitting: "Client quitting",
  116. DiscUnexpectedIdentity: "Unexpected identity",
  117. DiscSelf: "Connected to self",
  118. DiscReadTimeout: "Read timeout",
  119. DiscSubprotocolError: "Subprotocol error",
  120. }
  121. func (d DiscReason) String() string {
  122. if len(discReasonToString) < int(d) {
  123. return "Unknown"
  124. }
  125. return discReasonToString[d]
  126. }
  127. func (bp *baseProtocol) Ping() {
  128. }
  129. func (bp *baseProtocol) Offset() MsgCode {
  130. return baseProtocolOffset
  131. }
  132. func (bp *baseProtocol) Start(peer *Peer, rw MsgReadWriter) error {
  133. bp.peer, bp.rw = peer, rw
  134. // Do the handshake.
  135. // TODO: disconnect is valid before handshake, too.
  136. rw.WriteMsg(bp.peer.server.handshakeMsg())
  137. msg, err := rw.ReadMsg()
  138. if err != nil {
  139. return err
  140. }
  141. if msg.Code != handshakeMsg {
  142. return NewPeerError(ProtocolBreach, " first message must be handshake")
  143. }
  144. data, err := msg.Data()
  145. if err != nil {
  146. return NewPeerError(InvalidMsg, "%v", err)
  147. }
  148. if err := bp.handleHandshake(data); err != nil {
  149. return err
  150. }
  151. msgin := make(chan bpMsg)
  152. done := make(chan error, 1)
  153. go func() {
  154. done <- MsgLoop(rw, baseProtocolMaxMsgSize,
  155. func(code MsgCode, data *ethutil.Value) error {
  156. msgin <- bpMsg{code, data}
  157. return nil
  158. })
  159. }()
  160. return bp.loop(msgin, done)
  161. }
  162. func (bp *baseProtocol) loop(msgin <-chan bpMsg, quit <-chan error) error {
  163. logger.Debugf("pingpong keepalive started at %v\n", time.Now())
  164. messenger := bp.rw.(*proto).messenger
  165. pingTimer := time.NewTimer(pingTimeout)
  166. pinged := true
  167. for {
  168. select {
  169. case msg := <-msgin:
  170. if err := bp.handle(msg.code, msg.data); err != nil {
  171. return err
  172. }
  173. case err := <-quit:
  174. return err
  175. case <-messenger.pulse:
  176. pingTimer.Reset(pingTimeout)
  177. pinged = false
  178. case <-pingTimer.C:
  179. if pinged {
  180. return NewPeerError(PingTimeout, "")
  181. }
  182. logger.Debugf("pinging at %v\n", time.Now())
  183. if err := bp.rw.WriteMsg(NewMsg(pingMsg)); err != nil {
  184. return NewPeerError(WriteError, "%v", err)
  185. }
  186. pinged = true
  187. pingTimer.Reset(pingTimeout)
  188. }
  189. }
  190. }
  191. func (bp *baseProtocol) handle(code MsgCode, data *ethutil.Value) error {
  192. switch code {
  193. case handshakeMsg:
  194. return NewPeerError(ProtocolBreach, " extra handshake received")
  195. case discMsg:
  196. logger.Infof("Disconnect requested from peer %v, reason", DiscReason(data.Get(0).Uint()))
  197. bp.peer.server.PeerDisconnect() <- DisconnectRequest{
  198. addr: bp.peer.Address,
  199. reason: DiscRequested,
  200. }
  201. case pingMsg:
  202. return bp.rw.WriteMsg(NewMsg(pongMsg))
  203. case pongMsg:
  204. // reply for ping
  205. case getPeersMsg:
  206. // Peer asked for list of connected peers.
  207. peersRLP := bp.peer.server.encodedPeerList()
  208. if peersRLP != nil {
  209. msg := Msg{
  210. Code: peersMsg,
  211. Size: uint32(len(peersRLP)),
  212. Payload: bytes.NewReader(peersRLP),
  213. }
  214. return bp.rw.WriteMsg(msg)
  215. }
  216. case peersMsg:
  217. bp.handlePeers(data)
  218. default:
  219. return NewPeerError(InvalidMsgCode, "unknown message code %v", code)
  220. }
  221. return nil
  222. }
  223. func (bp *baseProtocol) handlePeers(data *ethutil.Value) {
  224. it := data.NewIterator()
  225. for it.Next() {
  226. ip := net.IP(it.Value().Get(0).Bytes())
  227. port := it.Value().Get(1).Uint()
  228. address := &net.TCPAddr{IP: ip, Port: int(port)}
  229. go bp.peer.server.PeerConnect(address)
  230. }
  231. }
  232. func (bp *baseProtocol) handleHandshake(c *ethutil.Value) error {
  233. var (
  234. remoteVersion = c.Get(0).Uint()
  235. id = c.Get(1).Str()
  236. caps = c.Get(2)
  237. port = c.Get(3).Uint()
  238. pubkey = c.Get(4).Bytes()
  239. )
  240. // Check correctness of p2p protocol version
  241. if remoteVersion != p2pVersion {
  242. return NewPeerError(P2PVersionMismatch, "Require protocol %d, received %d\n", p2pVersion, remoteVersion)
  243. }
  244. // Handle the pub key (validation, uniqueness)
  245. if len(pubkey) == 0 {
  246. return NewPeerError(PubkeyMissing, "not supplied in handshake.")
  247. }
  248. if len(pubkey) != 64 {
  249. return NewPeerError(PubkeyInvalid, "require 512 bit, got %v", len(pubkey)*8)
  250. }
  251. // self connect detection
  252. if bytes.Compare(bp.peer.server.ClientIdentity().Pubkey()[1:], pubkey) == 0 {
  253. return NewPeerError(PubkeyForbidden, "not allowed to connect to bp")
  254. }
  255. // register pubkey on server. this also sets the pubkey on the peer (need lock)
  256. if err := bp.peer.server.RegisterPubkey(bp.peer, pubkey); err != nil {
  257. return NewPeerError(PubkeyForbidden, err.Error())
  258. }
  259. // check port
  260. if bp.peer.Inbound {
  261. uint16port := uint16(port)
  262. if bp.peer.Port > 0 && bp.peer.Port != uint16port {
  263. return NewPeerError(PortMismatch, "port mismatch: %v != %v", bp.peer.Port, port)
  264. } else {
  265. bp.peer.Port = uint16port
  266. }
  267. }
  268. capsIt := caps.NewIterator()
  269. for capsIt.Next() {
  270. cap := capsIt.Value().Str()
  271. bp.peer.Caps = append(bp.peer.Caps, cap)
  272. }
  273. sort.Strings(bp.peer.Caps)
  274. bp.rw.(*proto).messenger.setRemoteProtocols(bp.peer.Caps)
  275. bp.peer.Id = id
  276. return nil
  277. }