protocol.go 6.4 KB


  1. package p2p
  2. import (
  3. "bytes"
  4. "fmt"
  5. "net"
  6. "sort"
  7. "sync"
  8. "time"
  9. )
  10. type Protocol interface {
  11. Start()
  12. Stop()
  13. HandleIn(*Msg, chan *Msg)
  14. HandleOut(*Msg) bool
  15. Offset() MsgCode
  16. Name() string
  17. }
  18. const (
  19. P2PVersion = 0
  20. pingTimeout = 2
  21. pingGracePeriod = 2
  22. )
  23. const (
  24. HandshakeMsg = iota
  25. DiscMsg
  26. PingMsg
  27. PongMsg
  28. GetPeersMsg
  29. PeersMsg
  30. offset = 16
  31. )
  32. type ProtocolState uint8
  33. const (
  34. nullState = iota
  35. handshakeReceived
  36. )
  37. type DiscReason byte
  38. const (
  39. // Values are given explicitly instead of by iota because these values are
  40. // defined by the wire protocol spec; it is easier for humans to ensure
  41. // correctness when values are explicit.
  42. DiscRequested = 0x00
  43. DiscNetworkError = 0x01
  44. DiscProtocolError = 0x02
  45. DiscUselessPeer = 0x03
  46. DiscTooManyPeers = 0x04
  47. DiscAlreadyConnected = 0x05
  48. DiscIncompatibleVersion = 0x06
  49. DiscInvalidIdentity = 0x07
  50. DiscQuitting = 0x08
  51. DiscUnexpectedIdentity = 0x09
  52. DiscSelf = 0x0a
  53. DiscReadTimeout = 0x0b
  54. DiscSubprotocolError = 0x10
  55. )
  56. var discReasonToString = map[DiscReason]string{
  57. DiscRequested: "Disconnect requested",
  58. DiscNetworkError: "Network error",
  59. DiscProtocolError: "Breach of protocol",
  60. DiscUselessPeer: "Useless peer",
  61. DiscTooManyPeers: "Too many peers",
  62. DiscAlreadyConnected: "Already connected",
  63. DiscIncompatibleVersion: "Incompatible P2P protocol version",
  64. DiscInvalidIdentity: "Invalid node identity",
  65. DiscQuitting: "Client quitting",
  66. DiscUnexpectedIdentity: "Unexpected identity",
  67. DiscSelf: "Connected to self",
  68. DiscReadTimeout: "Read timeout",
  69. DiscSubprotocolError: "Subprotocol error",
  70. }
  71. func (d DiscReason) String() string {
  72. if len(discReasonToString) < int(d) {
  73. return "Unknown"
  74. }
  75. return discReasonToString[d]
  76. }
  77. type BaseProtocol struct {
  78. peer *Peer
  79. state ProtocolState
  80. stateLock sync.RWMutex
  81. }
  82. func NewBaseProtocol(peer *Peer) *BaseProtocol {
  83. self := &BaseProtocol{
  84. peer: peer,
  85. }
  86. return self
  87. }
  88. func (self *BaseProtocol) Start() {
  89. if self.peer != nil {
  90. self.peer.Write("", self.peer.Server().Handshake())
  91. go self.peer.Messenger().PingPong(
  92. pingTimeout*time.Second,
  93. pingGracePeriod*time.Second,
  94. self.Ping,
  95. self.Timeout,
  96. )
  97. }
  98. }
  99. func (self *BaseProtocol) Stop() {
  100. }
  101. func (self *BaseProtocol) Ping() {
  102. msg, _ := NewMsg(PingMsg)
  103. self.peer.Write("", msg)
  104. }
  105. func (self *BaseProtocol) Timeout() {
  106. self.peerError(PingTimeout, "")
  107. }
  108. func (self *BaseProtocol) Name() string {
  109. return ""
  110. }
  111. func (self *BaseProtocol) Offset() MsgCode {
  112. return offset
  113. }
  114. func (self *BaseProtocol) CheckState(state ProtocolState) bool {
  115. self.stateLock.RLock()
  116. self.stateLock.RUnlock()
  117. if self.state != state {
  118. return false
  119. } else {
  120. return true
  121. }
  122. }
  123. func (self *BaseProtocol) HandleIn(msg *Msg, response chan *Msg) {
  124. if msg.Code() == HandshakeMsg {
  125. self.handleHandshake(msg)
  126. } else {
  127. if !self.CheckState(handshakeReceived) {
  128. self.peerError(ProtocolBreach, "message code %v not allowed", msg.Code())
  129. close(response)
  130. return
  131. }
  132. switch msg.Code() {
  133. case DiscMsg:
  134. logger.Infof("Disconnect requested from peer %v, reason", DiscReason(msg.Data().Get(0).Uint()))
  135. self.peer.Server().PeerDisconnect() <- DisconnectRequest{
  136. addr: self.peer.Address,
  137. reason: DiscRequested,
  138. }
  139. case PingMsg:
  140. out, _ := NewMsg(PongMsg)
  141. response <- out
  142. case PongMsg:
  143. case GetPeersMsg:
  144. // Peer asked for list of connected peers
  145. if out, err := self.peer.Server().PeersMessage(); err != nil {
  146. response <- out
  147. }
  148. case PeersMsg:
  149. self.handlePeers(msg)
  150. default:
  151. self.peerError(InvalidMsgCode, "unknown message code %v", msg.Code())
  152. }
  153. }
  154. close(response)
  155. }
  156. func (self *BaseProtocol) HandleOut(msg *Msg) (allowed bool) {
  157. // somewhat overly paranoid
  158. allowed = msg.Code() == HandshakeMsg || msg.Code() == DiscMsg || msg.Code() < self.Offset() && self.CheckState(handshakeReceived)
  159. return
  160. }
  161. func (self *BaseProtocol) peerError(errorCode ErrorCode, format string, v ...interface{}) {
  162. err := NewPeerError(errorCode, format, v...)
  163. logger.Warnln(err)
  164. fmt.Println(self.peer, err)
  165. if self.peer != nil {
  166. self.peer.PeerErrorChan() <- err
  167. }
  168. }
  169. func (self *BaseProtocol) handlePeers(msg *Msg) {
  170. it := msg.Data().NewIterator()
  171. for it.Next() {
  172. ip := net.IP(it.Value().Get(0).Bytes())
  173. port := it.Value().Get(1).Uint()
  174. address := &net.TCPAddr{IP: ip, Port: int(port)}
  175. go self.peer.Server().PeerConnect(address)
  176. }
  177. }
  178. func (self *BaseProtocol) handleHandshake(msg *Msg) {
  179. self.stateLock.Lock()
  180. defer self.stateLock.Unlock()
  181. if self.state != nullState {
  182. self.peerError(ProtocolBreach, "extra handshake")
  183. return
  184. }
  185. c := msg.Data()
  186. var (
  187. p2pVersion = c.Get(0).Uint()
  188. id = c.Get(1).Str()
  189. caps = c.Get(2)
  190. port = c.Get(3).Uint()
  191. pubkey = c.Get(4).Bytes()
  192. )
  193. fmt.Printf("handshake received %v, %v, %v, %v, %v ", p2pVersion, id, caps, port, pubkey)
  194. // Check correctness of p2p protocol version
  195. if p2pVersion != P2PVersion {
  196. self.peerError(P2PVersionMismatch, "Require protocol %d, received %d\n", P2PVersion, p2pVersion)
  197. return
  198. }
  199. // Handle the pub key (validation, uniqueness)
  200. if len(pubkey) == 0 {
  201. self.peerError(PubkeyMissing, "not supplied in handshake.")
  202. return
  203. }
  204. if len(pubkey) != 64 {
  205. self.peerError(PubkeyInvalid, "require 512 bit, got %v", len(pubkey)*8)
  206. return
  207. }
  208. // Self connect detection
  209. if bytes.Compare(self.peer.Server().ClientIdentity().Pubkey()[1:], pubkey) == 0 {
  210. self.peerError(PubkeyForbidden, "not allowed to connect to self")
  211. return
  212. }
  213. // register pubkey on server. this also sets the pubkey on the peer (need lock)
  214. if err := self.peer.Server().RegisterPubkey(self.peer, pubkey); err != nil {
  215. self.peerError(PubkeyForbidden, err.Error())
  216. return
  217. }
  218. // check port
  219. if self.peer.Inbound {
  220. uint16port := uint16(port)
  221. if self.peer.Port > 0 && self.peer.Port != uint16port {
  222. self.peerError(PortMismatch, "port mismatch: %v != %v", self.peer.Port, port)
  223. return
  224. } else {
  225. self.peer.Port = uint16port
  226. }
  227. }
  228. capsIt := caps.NewIterator()
  229. for capsIt.Next() {
  230. cap := capsIt.Value().Str()
  231. self.peer.Caps = append(self.peer.Caps, cap)
  232. }
  233. sort.Strings(self.peer.Caps)
  234. self.peer.Messenger().AddProtocols(self.peer.Caps)
  235. self.peer.Id = id
  236. self.state = handshakeReceived
  237. //p.ethereum.PushPeer(p)
  238. // p.ethereum.reactor.Post("peerList", p.ethereum.Peers())
  239. return
  240. }