protocol.go 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314
  1. package p2p
  2. import (
  3. "bytes"
  4. "net"
  5. "sort"
  6. "time"
  7. "github.com/ethereum/go-ethereum/ethutil"
  8. )
  9. // Protocol is implemented by P2P subprotocols.
  10. type Protocol interface {
  11. // Start is called when the protocol becomes active.
  12. // It should read and write messages from rw.
  13. // Messages must be fully consumed.
  14. //
  15. // The connection is closed when Start returns. It should return
  16. // any protocol-level error (such as an I/O error) that is
  17. // encountered.
  18. Start(peer *Peer, rw MsgReadWriter) error
  19. // Offset should return the number of message codes
  20. // used by the protocol.
  21. Offset() MsgCode
  22. }
  23. type MsgReader interface {
  24. ReadMsg() (Msg, error)
  25. }
  26. type MsgWriter interface {
  27. WriteMsg(Msg) error
  28. }
  29. // MsgReadWriter is passed to protocols. Protocol implementations can
  30. // use it to write messages back to a connected peer.
  31. type MsgReadWriter interface {
  32. MsgReader
  33. MsgWriter
  34. }
  35. type MsgHandler func(code MsgCode, data *ethutil.Value) error
  36. // MsgLoop reads messages off the given reader and
  37. // calls the handler function for each decoded message until
  38. // it returns an error or the peer connection is closed.
  39. //
  40. // If a message is larger than the given maximum size, RunProtocol
  41. // returns an appropriate error.n
  42. func MsgLoop(r MsgReader, maxsize uint32, handler MsgHandler) error {
  43. for {
  44. msg, err := r.ReadMsg()
  45. if err != nil {
  46. return err
  47. }
  48. if msg.Size > maxsize {
  49. return NewPeerError(InvalidMsg, "size %d exceeds maximum size of %d", msg.Size, maxsize)
  50. }
  51. value, err := msg.Data()
  52. if err != nil {
  53. return err
  54. }
  55. if err := handler(msg.Code, value); err != nil {
  56. return err
  57. }
  58. }
  59. }
  60. // the ÐΞVp2p base protocol
  61. type baseProtocol struct {
  62. rw MsgReadWriter
  63. peer *Peer
  64. }
  65. type bpMsg struct {
  66. code MsgCode
  67. data *ethutil.Value
  68. }
  69. const (
  70. p2pVersion = 0
  71. pingTimeout = 2 * time.Second
  72. pingGracePeriod = 2 * time.Second
  73. )
  74. const (
  75. // message codes
  76. handshakeMsg = iota
  77. discMsg
  78. pingMsg
  79. pongMsg
  80. getPeersMsg
  81. peersMsg
  82. )
  83. const (
  84. baseProtocolOffset MsgCode = 16
  85. baseProtocolMaxMsgSize = 500 * 1024
  86. )
  87. type DiscReason byte
  88. const (
  89. // Values are given explicitly instead of by iota because these values are
  90. // defined by the wire protocol spec; it is easier for humans to ensure
  91. // correctness when values are explicit.
  92. DiscRequested = 0x00
  93. DiscNetworkError = 0x01
  94. DiscProtocolError = 0x02
  95. DiscUselessPeer = 0x03
  96. DiscTooManyPeers = 0x04
  97. DiscAlreadyConnected = 0x05
  98. DiscIncompatibleVersion = 0x06
  99. DiscInvalidIdentity = 0x07
  100. DiscQuitting = 0x08
  101. DiscUnexpectedIdentity = 0x09
  102. DiscSelf = 0x0a
  103. DiscReadTimeout = 0x0b
  104. DiscSubprotocolError = 0x10
  105. )
  106. var discReasonToString = [DiscSubprotocolError + 1]string{
  107. DiscRequested: "Disconnect requested",
  108. DiscNetworkError: "Network error",
  109. DiscProtocolError: "Breach of protocol",
  110. DiscUselessPeer: "Useless peer",
  111. DiscTooManyPeers: "Too many peers",
  112. DiscAlreadyConnected: "Already connected",
  113. DiscIncompatibleVersion: "Incompatible P2P protocol version",
  114. DiscInvalidIdentity: "Invalid node identity",
  115. DiscQuitting: "Client quitting",
  116. DiscUnexpectedIdentity: "Unexpected identity",
  117. DiscSelf: "Connected to self",
  118. DiscReadTimeout: "Read timeout",
  119. DiscSubprotocolError: "Subprotocol error",
  120. }
  121. func (d DiscReason) String() string {
  122. if len(discReasonToString) < int(d) {
  123. return "Unknown"
  124. }
  125. return discReasonToString[d]
  126. }
  127. func (bp *baseProtocol) Offset() MsgCode {
  128. return baseProtocolOffset
  129. }
  130. func (bp *baseProtocol) Start(peer *Peer, rw MsgReadWriter) error {
  131. bp.peer, bp.rw = peer, rw
  132. // Do the handshake.
  133. // TODO: disconnect is valid before handshake, too.
  134. rw.WriteMsg(bp.peer.server.handshakeMsg())
  135. msg, err := rw.ReadMsg()
  136. if err != nil {
  137. return err
  138. }
  139. if msg.Code != handshakeMsg {
  140. return NewPeerError(ProtocolBreach, " first message must be handshake")
  141. }
  142. data, err := msg.Data()
  143. if err != nil {
  144. return NewPeerError(InvalidMsg, "%v", err)
  145. }
  146. if err := bp.handleHandshake(data); err != nil {
  147. return err
  148. }
  149. msgin := make(chan bpMsg)
  150. done := make(chan error, 1)
  151. go func() {
  152. done <- MsgLoop(rw, baseProtocolMaxMsgSize,
  153. func(code MsgCode, data *ethutil.Value) error {
  154. msgin <- bpMsg{code, data}
  155. return nil
  156. })
  157. }()
  158. return bp.loop(msgin, done)
  159. }
  160. func (bp *baseProtocol) loop(msgin <-chan bpMsg, quit <-chan error) error {
  161. logger.Debugf("pingpong keepalive started at %v\n", time.Now())
  162. messenger := bp.rw.(*proto).messenger
  163. pingTimer := time.NewTimer(pingTimeout)
  164. pinged := true
  165. for {
  166. select {
  167. case msg := <-msgin:
  168. if err := bp.handle(msg.code, msg.data); err != nil {
  169. return err
  170. }
  171. case err := <-quit:
  172. return err
  173. case <-messenger.pulse:
  174. pingTimer.Reset(pingTimeout)
  175. pinged = false
  176. case <-pingTimer.C:
  177. if pinged {
  178. return NewPeerError(PingTimeout, "")
  179. }
  180. logger.Debugf("pinging at %v\n", time.Now())
  181. if err := bp.rw.WriteMsg(NewMsg(pingMsg)); err != nil {
  182. return NewPeerError(WriteError, "%v", err)
  183. }
  184. pinged = true
  185. pingTimer.Reset(pingTimeout)
  186. }
  187. }
  188. }
  189. func (bp *baseProtocol) handle(code MsgCode, data *ethutil.Value) error {
  190. switch code {
  191. case handshakeMsg:
  192. return NewPeerError(ProtocolBreach, " extra handshake received")
  193. case discMsg:
  194. logger.Infof("Disconnect requested from peer %v, reason", DiscReason(data.Get(0).Uint()))
  195. bp.peer.server.PeerDisconnect() <- DisconnectRequest{
  196. addr: bp.peer.Address,
  197. reason: DiscRequested,
  198. }
  199. case pingMsg:
  200. return bp.rw.WriteMsg(NewMsg(pongMsg))
  201. case pongMsg:
  202. // reply for ping
  203. case getPeersMsg:
  204. // Peer asked for list of connected peers.
  205. peersRLP := bp.peer.server.encodedPeerList()
  206. if peersRLP != nil {
  207. msg := Msg{
  208. Code: peersMsg,
  209. Size: uint32(len(peersRLP)),
  210. Payload: bytes.NewReader(peersRLP),
  211. }
  212. return bp.rw.WriteMsg(msg)
  213. }
  214. case peersMsg:
  215. bp.handlePeers(data)
  216. default:
  217. return NewPeerError(InvalidMsgCode, "unknown message code %v", code)
  218. }
  219. return nil
  220. }
  221. func (bp *baseProtocol) handlePeers(data *ethutil.Value) {
  222. it := data.NewIterator()
  223. for it.Next() {
  224. ip := net.IP(it.Value().Get(0).Bytes())
  225. port := it.Value().Get(1).Uint()
  226. address := &net.TCPAddr{IP: ip, Port: int(port)}
  227. go bp.peer.server.PeerConnect(address)
  228. }
  229. }
  230. func (bp *baseProtocol) handleHandshake(c *ethutil.Value) error {
  231. var (
  232. remoteVersion = c.Get(0).Uint()
  233. id = c.Get(1).Str()
  234. caps = c.Get(2)
  235. port = c.Get(3).Uint()
  236. pubkey = c.Get(4).Bytes()
  237. )
  238. // Check correctness of p2p protocol version
  239. if remoteVersion != p2pVersion {
  240. return NewPeerError(P2PVersionMismatch, "Require protocol %d, received %d\n", p2pVersion, remoteVersion)
  241. }
  242. // Handle the pub key (validation, uniqueness)
  243. if len(pubkey) == 0 {
  244. return NewPeerError(PubkeyMissing, "not supplied in handshake.")
  245. }
  246. if len(pubkey) != 64 {
  247. return NewPeerError(PubkeyInvalid, "require 512 bit, got %v", len(pubkey)*8)
  248. }
  249. // self connect detection
  250. if bytes.Compare(bp.peer.server.ClientIdentity().Pubkey()[1:], pubkey) == 0 {
  251. return NewPeerError(PubkeyForbidden, "not allowed to connect to self")
  252. }
  253. // register pubkey on server. this also sets the pubkey on the peer (need lock)
  254. if err := bp.peer.server.RegisterPubkey(bp.peer, pubkey); err != nil {
  255. return NewPeerError(PubkeyForbidden, err.Error())
  256. }
  257. // check port
  258. if bp.peer.Inbound {
  259. uint16port := uint16(port)
  260. if bp.peer.Port > 0 && bp.peer.Port != uint16port {
  261. return NewPeerError(PortMismatch, "port mismatch: %v != %v", bp.peer.Port, port)
  262. } else {
  263. bp.peer.Port = uint16port
  264. }
  265. }
  266. capsIt := caps.NewIterator()
  267. for capsIt.Next() {
  268. cap := capsIt.Value().Str()
  269. bp.peer.Caps = append(bp.peer.Caps, cap)
  270. }
  271. sort.Strings(bp.peer.Caps)
  272. bp.rw.(*proto).messenger.setRemoteProtocols(bp.peer.Caps)
  273. bp.peer.Id = id
  274. return nil
  275. }