peer.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454
  1. package p2p
  2. import (
  3. "bufio"
  4. "bytes"
  5. "fmt"
  6. "io"
  7. "io/ioutil"
  8. "net"
  9. "sort"
  10. "sync"
  11. "time"
  12. "github.com/ethereum/go-ethereum/event"
  13. "github.com/ethereum/go-ethereum/logger"
  14. )
  15. // peerAddr is the structure of a peer list element.
  16. // It is also a valid net.Addr.
  17. type peerAddr struct {
  18. IP net.IP
  19. Port uint64
  20. Pubkey []byte // optional
  21. }
  22. func newPeerAddr(addr net.Addr, pubkey []byte) *peerAddr {
  23. n := addr.Network()
  24. if n != "tcp" && n != "tcp4" && n != "tcp6" {
  25. // for testing with non-TCP
  26. return &peerAddr{net.ParseIP("127.0.0.1"), 30303, pubkey}
  27. }
  28. ta := addr.(*net.TCPAddr)
  29. return &peerAddr{ta.IP, uint64(ta.Port), pubkey}
  30. }
  31. func (d peerAddr) Network() string {
  32. if d.IP.To4() != nil {
  33. return "tcp4"
  34. } else {
  35. return "tcp6"
  36. }
  37. }
  38. func (d peerAddr) String() string {
  39. return fmt.Sprintf("%v:%d", d.IP, d.Port)
  40. }
  41. func (d peerAddr) RlpData() interface{} {
  42. return []interface{}{d.IP, d.Port, d.Pubkey}
  43. }
  44. // Peer represents a remote peer.
  45. type Peer struct {
  46. // Peers have all the log methods.
  47. // Use them to display messages related to the peer.
  48. *logger.Logger
  49. infolock sync.Mutex
  50. identity ClientIdentity
  51. caps []Cap
  52. listenAddr *peerAddr // what remote peer is listening on
  53. dialAddr *peerAddr // non-nil if dialing
  54. // The mutex protects the connection
  55. // so only one protocol can write at a time.
  56. writeMu sync.Mutex
  57. conn net.Conn
  58. bufconn *bufio.ReadWriter
  59. // These fields maintain the running protocols.
  60. protocols []Protocol
  61. runBaseProtocol bool // for testing
  62. runlock sync.RWMutex // protects running
  63. running map[string]*proto
  64. protoWG sync.WaitGroup
  65. protoErr chan error
  66. closed chan struct{}
  67. disc chan DiscReason
  68. activity event.TypeMux // for activity events
  69. slot int // index into Server peer list
  70. // These fields are kept so base protocol can access them.
  71. // TODO: this should be one or more interfaces
  72. ourID ClientIdentity // client id of the Server
  73. ourListenAddr *peerAddr // listen addr of Server, nil if not listening
  74. newPeerAddr chan<- *peerAddr // tell server about received peers
  75. otherPeers func() []*Peer // should return the list of all peers
  76. pubkeyHook func(*peerAddr) error // called at end of handshake to validate pubkey
  77. }
  78. // NewPeer returns a peer for testing purposes.
  79. func NewPeer(id ClientIdentity, caps []Cap) *Peer {
  80. conn, _ := net.Pipe()
  81. peer := newPeer(conn, nil, nil)
  82. peer.setHandshakeInfo(id, nil, caps)
  83. return peer
  84. }
  85. func newServerPeer(server *Server, conn net.Conn, dialAddr *peerAddr) *Peer {
  86. p := newPeer(conn, server.Protocols, dialAddr)
  87. p.ourID = server.Identity
  88. p.newPeerAddr = server.peerConnect
  89. p.otherPeers = server.Peers
  90. p.pubkeyHook = server.verifyPeer
  91. p.runBaseProtocol = true
  92. // laddr can be updated concurrently by NAT traversal.
  93. // newServerPeer must be called with the server lock held.
  94. if server.laddr != nil {
  95. p.ourListenAddr = newPeerAddr(server.laddr, server.Identity.Pubkey())
  96. }
  97. return p
  98. }
  99. func newPeer(conn net.Conn, protocols []Protocol, dialAddr *peerAddr) *Peer {
  100. p := &Peer{
  101. Logger: logger.NewLogger("P2P " + conn.RemoteAddr().String()),
  102. conn: conn,
  103. dialAddr: dialAddr,
  104. bufconn: bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn)),
  105. protocols: protocols,
  106. running: make(map[string]*proto),
  107. disc: make(chan DiscReason),
  108. protoErr: make(chan error),
  109. closed: make(chan struct{}),
  110. }
  111. return p
  112. }
  113. // Identity returns the client identity of the remote peer. The
  114. // identity can be nil if the peer has not yet completed the
  115. // handshake.
  116. func (p *Peer) Identity() ClientIdentity {
  117. p.infolock.Lock()
  118. defer p.infolock.Unlock()
  119. return p.identity
  120. }
  121. // Caps returns the capabilities (supported subprotocols) of the remote peer.
  122. func (p *Peer) Caps() []Cap {
  123. p.infolock.Lock()
  124. defer p.infolock.Unlock()
  125. return p.caps
  126. }
  127. func (p *Peer) setHandshakeInfo(id ClientIdentity, laddr *peerAddr, caps []Cap) {
  128. p.infolock.Lock()
  129. p.identity = id
  130. p.listenAddr = laddr
  131. p.caps = caps
  132. p.infolock.Unlock()
  133. }
  134. // RemoteAddr returns the remote address of the network connection.
  135. func (p *Peer) RemoteAddr() net.Addr {
  136. return p.conn.RemoteAddr()
  137. }
  138. // LocalAddr returns the local address of the network connection.
  139. func (p *Peer) LocalAddr() net.Addr {
  140. return p.conn.LocalAddr()
  141. }
  142. // Disconnect terminates the peer connection with the given reason.
  143. // It returns immediately and does not wait until the connection is closed.
  144. func (p *Peer) Disconnect(reason DiscReason) {
  145. select {
  146. case p.disc <- reason:
  147. case <-p.closed:
  148. }
  149. }
  150. // String implements fmt.Stringer.
  151. func (p *Peer) String() string {
  152. kind := "inbound"
  153. p.infolock.Lock()
  154. if p.dialAddr != nil {
  155. kind = "outbound"
  156. }
  157. p.infolock.Unlock()
  158. return fmt.Sprintf("Peer(%p %v %s)", p, p.conn.RemoteAddr(), kind)
  159. }
  160. const (
  161. // maximum amount of time allowed for reading a message
  162. msgReadTimeout = 5 * time.Second
  163. // maximum amount of time allowed for writing a message
  164. msgWriteTimeout = 5 * time.Second
  165. // messages smaller than this many bytes will be read at
  166. // once before passing them to a protocol.
  167. wholePayloadSize = 64 * 1024
  168. )
  169. var (
  170. inactivityTimeout = 2 * time.Second
  171. disconnectGracePeriod = 2 * time.Second
  172. )
  173. func (p *Peer) loop() (reason DiscReason, err error) {
  174. defer p.activity.Stop()
  175. defer p.closeProtocols()
  176. defer close(p.closed)
  177. defer p.conn.Close()
  178. // read loop
  179. readMsg := make(chan Msg)
  180. readErr := make(chan error)
  181. readNext := make(chan bool, 1)
  182. protoDone := make(chan struct{}, 1)
  183. go p.readLoop(readMsg, readErr, readNext)
  184. readNext <- true
  185. if p.runBaseProtocol {
  186. p.startBaseProtocol()
  187. }
  188. loop:
  189. for {
  190. select {
  191. case msg := <-readMsg:
  192. // a new message has arrived.
  193. var wait bool
  194. if wait, err = p.dispatch(msg, protoDone); err != nil {
  195. p.Errorf("msg dispatch error: %v\n", err)
  196. reason = discReasonForError(err)
  197. break loop
  198. }
  199. if !wait {
  200. // Msg has already been read completely, continue with next message.
  201. readNext <- true
  202. }
  203. p.activity.Post(time.Now())
  204. case <-protoDone:
  205. // protocol has consumed the message payload,
  206. // we can continue reading from the socket.
  207. readNext <- true
  208. case err := <-readErr:
  209. // read failed. there is no need to run the
  210. // polite disconnect sequence because the connection
  211. // is probably dead anyway.
  212. // TODO: handle write errors as well
  213. return DiscNetworkError, err
  214. case err = <-p.protoErr:
  215. reason = discReasonForError(err)
  216. break loop
  217. case reason = <-p.disc:
  218. break loop
  219. }
  220. }
  221. // wait for read loop to return.
  222. close(readNext)
  223. <-readErr
  224. // tell the remote end to disconnect
  225. done := make(chan struct{})
  226. go func() {
  227. p.conn.SetDeadline(time.Now().Add(disconnectGracePeriod))
  228. p.writeMsg(NewMsg(discMsg, reason), disconnectGracePeriod)
  229. io.Copy(ioutil.Discard, p.conn)
  230. close(done)
  231. }()
  232. select {
  233. case <-done:
  234. case <-time.After(disconnectGracePeriod):
  235. }
  236. return reason, err
  237. }
  238. func (p *Peer) readLoop(msgc chan<- Msg, errc chan<- error, unblock <-chan bool) {
  239. for _ = range unblock {
  240. p.conn.SetReadDeadline(time.Now().Add(msgReadTimeout))
  241. if msg, err := readMsg(p.bufconn); err != nil {
  242. errc <- err
  243. } else {
  244. msgc <- msg
  245. }
  246. }
  247. close(errc)
  248. }
  249. func (p *Peer) dispatch(msg Msg, protoDone chan struct{}) (wait bool, err error) {
  250. proto, err := p.getProto(msg.Code)
  251. if err != nil {
  252. return false, err
  253. }
  254. if msg.Size <= wholePayloadSize {
  255. // optimization: msg is small enough, read all
  256. // of it and move on to the next message
  257. buf, err := ioutil.ReadAll(msg.Payload)
  258. if err != nil {
  259. return false, err
  260. }
  261. msg.Payload = bytes.NewReader(buf)
  262. proto.in <- msg
  263. } else {
  264. wait = true
  265. pr := &eofSignal{msg.Payload, protoDone}
  266. msg.Payload = pr
  267. proto.in <- msg
  268. }
  269. return wait, nil
  270. }
  271. func (p *Peer) startBaseProtocol() {
  272. p.runlock.Lock()
  273. defer p.runlock.Unlock()
  274. p.running[""] = p.startProto(0, Protocol{
  275. Length: baseProtocolLength,
  276. Run: runBaseProtocol,
  277. })
  278. }
  279. // startProtocols starts matching named subprotocols.
  280. func (p *Peer) startSubprotocols(caps []Cap) {
  281. sort.Sort(capsByName(caps))
  282. p.runlock.Lock()
  283. defer p.runlock.Unlock()
  284. offset := baseProtocolLength
  285. outer:
  286. for _, cap := range caps {
  287. for _, proto := range p.protocols {
  288. if proto.Name == cap.Name &&
  289. proto.Version == cap.Version &&
  290. p.running[cap.Name] == nil {
  291. p.running[cap.Name] = p.startProto(offset, proto)
  292. offset += proto.Length
  293. continue outer
  294. }
  295. }
  296. }
  297. }
  298. func (p *Peer) startProto(offset uint64, impl Protocol) *proto {
  299. rw := &proto{
  300. in: make(chan Msg),
  301. offset: offset,
  302. maxcode: impl.Length,
  303. peer: p,
  304. }
  305. p.protoWG.Add(1)
  306. go func() {
  307. err := impl.Run(p, rw)
  308. if err == nil {
  309. p.Infof("protocol %q returned", impl.Name)
  310. err = newPeerError(errMisc, "protocol returned")
  311. } else {
  312. p.Errorf("protocol %q error: %v\n", impl.Name, err)
  313. }
  314. select {
  315. case p.protoErr <- err:
  316. case <-p.closed:
  317. }
  318. p.protoWG.Done()
  319. }()
  320. return rw
  321. }
  322. // getProto finds the protocol responsible for handling
  323. // the given message code.
  324. func (p *Peer) getProto(code uint64) (*proto, error) {
  325. p.runlock.RLock()
  326. defer p.runlock.RUnlock()
  327. for _, proto := range p.running {
  328. if code >= proto.offset && code < proto.offset+proto.maxcode {
  329. return proto, nil
  330. }
  331. }
  332. return nil, newPeerError(errInvalidMsgCode, "%d", code)
  333. }
  334. func (p *Peer) closeProtocols() {
  335. p.runlock.RLock()
  336. for _, p := range p.running {
  337. close(p.in)
  338. }
  339. p.runlock.RUnlock()
  340. p.protoWG.Wait()
  341. }
  342. // writeProtoMsg sends the given message on behalf of the given named protocol.
  343. func (p *Peer) writeProtoMsg(protoName string, msg Msg) error {
  344. p.runlock.RLock()
  345. proto, ok := p.running[protoName]
  346. p.runlock.RUnlock()
  347. if !ok {
  348. return fmt.Errorf("protocol %s not handled by peer", protoName)
  349. }
  350. if msg.Code >= proto.maxcode {
  351. return newPeerError(errInvalidMsgCode, "code %x is out of range for protocol %q", msg.Code, protoName)
  352. }
  353. msg.Code += proto.offset
  354. return p.writeMsg(msg, msgWriteTimeout)
  355. }
  356. // writeMsg writes a message to the connection.
  357. func (p *Peer) writeMsg(msg Msg, timeout time.Duration) error {
  358. p.writeMu.Lock()
  359. defer p.writeMu.Unlock()
  360. p.conn.SetWriteDeadline(time.Now().Add(timeout))
  361. if err := writeMsg(p.bufconn, msg); err != nil {
  362. return newPeerError(errWrite, "%v", err)
  363. }
  364. return p.bufconn.Flush()
  365. }
  366. type proto struct {
  367. name string
  368. in chan Msg
  369. maxcode, offset uint64
  370. peer *Peer
  371. }
  372. func (rw *proto) WriteMsg(msg Msg) error {
  373. if msg.Code >= rw.maxcode {
  374. return newPeerError(errInvalidMsgCode, "not handled")
  375. }
  376. msg.Code += rw.offset
  377. return rw.peer.writeMsg(msg, msgWriteTimeout)
  378. }
  379. func (rw *proto) EncodeMsg(code uint64, data ...interface{}) error {
  380. return rw.WriteMsg(NewMsg(code, data))
  381. }
  382. func (rw *proto) ReadMsg() (Msg, error) {
  383. msg, ok := <-rw.in
  384. if !ok {
  385. return msg, io.EOF
  386. }
  387. msg.Code -= rw.offset
  388. return msg, nil
  389. }
  390. // eofSignal wraps a reader with eof signaling.
  391. // the eof channel is closed when the wrapped reader
  392. // reaches EOF.
  393. type eofSignal struct {
  394. wrapped io.Reader
  395. eof chan<- struct{}
  396. }
  397. func (r *eofSignal) Read(buf []byte) (int, error) {
  398. n, err := r.wrapped.Read(buf)
  399. if err != nil {
  400. r.eof <- struct{}{} // tell Peer that msg has been consumed
  401. }
  402. return n, err
  403. }