浏览代码

p2p: restore read/write timeouts

They got lost in the transition to rlpxFrameRW.
Felix Lange 10 年之前
父节点
当前提交
22659a7fea
共有 5 个文件被更改,包括 37 次插入37 次删除
  1. 3 5
      p2p/handshake.go
  2. 12 28
      p2p/message.go
  3. 2 1
      p2p/peer.go
  4. 5 0
      p2p/rlpx.go
  5. 15 3
      p2p/server.go

+ 3 - 5
p2p/handshake.go

@@ -37,7 +37,7 @@ const (
 //
 // The MsgReadWriter is usually layered as follows:
 //
-//     lockedRW         (thread-safety for ReadMsg, WriteMsg)
+//     netWrapper       (I/O timeouts, thread-safe ReadMsg, WriteMsg)
 //     rlpxFrameRW      (message encoding, encryption, authentication)
 //     bufio.ReadWriter (buffering)
 //     net.Conn         (network I/O)
@@ -83,7 +83,6 @@ func setupInboundConn(fd net.Conn, prv *ecdsa.PrivateKey, our *protoHandshake) (
 	}
 
 	// Run the protocol handshake using authenticated messages.
-	// TODO: move buffering setup here (out of newFrameRW)
 	rw := newRlpxFrameRW(fd, secrets)
 	rhs, err := readProtocolHandshake(rw, our)
 	if err != nil {
@@ -96,7 +95,7 @@ func setupInboundConn(fd net.Conn, prv *ecdsa.PrivateKey, our *protoHandshake) (
 	if err := writeProtocolHandshake(rw, our); err != nil {
 		return nil, fmt.Errorf("protocol write error: %v", err)
 	}
-	return &conn{&lockedRW{wrapped: rw}, rhs}, nil
+	return &conn{rw, rhs}, nil
 }
 
 func setupOutboundConn(fd net.Conn, prv *ecdsa.PrivateKey, our *protoHandshake, dial *discover.Node) (*conn, error) {
@@ -106,7 +105,6 @@ func setupOutboundConn(fd net.Conn, prv *ecdsa.PrivateKey, our *protoHandshake,
 	}
 
 	// Run the protocol handshake using authenticated messages.
-	// TODO: move buffering setup here (out of newFrameRW)
 	rw := newRlpxFrameRW(fd, secrets)
 	if err := writeProtocolHandshake(rw, our); err != nil {
 		return nil, fmt.Errorf("protocol write error: %v", err)
@@ -118,7 +116,7 @@ func setupOutboundConn(fd net.Conn, prv *ecdsa.PrivateKey, our *protoHandshake,
 	if rhs.ID != dial.ID {
 		return nil, errors.New("dialed node id mismatch")
 	}
-	return &conn{&lockedRW{wrapped: rw}, rhs}, nil
+	return &conn{rw, rhs}, nil
 }
 
 // encHandshake contains the state of the encryption handshake.

+ 12 - 28
p2p/message.go

@@ -6,6 +6,7 @@ import (
 	"fmt"
 	"io"
 	"io/ioutil"
+	"net"
 	"sync"
 	"sync/atomic"
 	"time"
@@ -14,28 +15,6 @@ import (
 	"github.com/ethereum/go-ethereum/rlp"
 )
 
-// parameters for frameRW
-const (
-	// maximum time allowed for reading a message header.
-	// this is effectively the amount of time a connection can be idle.
-	frameReadTimeout = 1 * time.Minute
-
-	// maximum time allowed for reading the payload data of a message.
-	// this is shorter than (and distinct from) frameReadTimeout because
-	// the connection is not considered idle while a message is transferred.
-	// this also limits the payload size of messages to how much the connection
-	// can transfer within the timeout.
-	payloadReadTimeout = 5 * time.Second
-
-	// maximum amount of time allowed for writing a complete message.
-	msgWriteTimeout = 5 * time.Second
-
-	// messages smaller than this many bytes will be read at
-	// once before passing them to a protocol. this increases
-	// concurrency in the processing.
-	wholePayloadSize = 64 * 1024
-)
-
 // Msg defines the structure of a p2p message.
 //
 // Note that a Msg can only be sent once since the Payload reader is
@@ -103,22 +82,27 @@ func EncodeMsg(w MsgWriter, code uint64, data ...interface{}) error {
 	return w.WriteMsg(NewMsg(code, data...))
 }
 
-// lockedRW wraps a MsgReadWriter with locks around
-// ReadMsg and WriteMsg.
-type lockedRW struct {
+// netWrapper wrapsa MsgReadWriter with locks around
+// ReadMsg/WriteMsg and applies read/write deadlines.
+type netWrapper struct {
 	rmu, wmu sync.Mutex
-	wrapped  MsgReadWriter
+
+	rtimeout, wtimeout time.Duration
+	conn               net.Conn
+	wrapped            MsgReadWriter
 }
 
-func (rw *lockedRW) ReadMsg() (Msg, error) {
+func (rw *netWrapper) ReadMsg() (Msg, error) {
 	rw.rmu.Lock()
 	defer rw.rmu.Unlock()
+	rw.conn.SetReadDeadline(time.Now().Add(rw.rtimeout))
 	return rw.wrapped.ReadMsg()
 }
 
-func (rw *lockedRW) WriteMsg(msg Msg) error {
+func (rw *netWrapper) WriteMsg(msg Msg) error {
 	rw.wmu.Lock()
 	defer rw.wmu.Unlock()
+	rw.conn.SetWriteDeadline(time.Now().Add(rw.wtimeout))
 	return rw.wrapped.WriteMsg(msg)
 }
 

+ 2 - 1
p2p/peer.go

@@ -20,8 +20,8 @@ const (
 	baseProtocolLength     = uint64(16)
 	baseProtocolMaxMsgSize = 10 * 1024 * 1024
 
-	disconnectGracePeriod = 2 * time.Second
 	pingInterval          = 15 * time.Second
+	disconnectGracePeriod = 2 * time.Second
 )
 
 const (
@@ -176,6 +176,7 @@ func (p *Peer) politeDisconnect(reason DiscReason) {
 
 func (p *Peer) readLoop() error {
 	for {
+		p.conn.SetDeadline(time.Now().Add(frameReadTimeout))
 		msg, err := p.rw.ReadMsg()
 		if err != nil {
 			return err

+ 5 - 0
p2p/rlpx.go

@@ -21,6 +21,11 @@ var (
 	zero16 = make([]byte, 16)
 )
 
+// rlpxFrameRW implements a simplified version of RLPx framing.
+// chunked messages are not supported and all headers are equal to
+// zeroHeader.
+//
+// rlpxFrameRW is not safe for concurrent use from multiple goroutines.
 type rlpxFrameRW struct {
 	conn io.ReadWriter
 	enc  cipher.Stream

+ 15 - 3
p2p/server.go

@@ -17,9 +17,17 @@ import (
 )
 
 const (
-	handshakeTimeout     = 5 * time.Second
 	defaultDialTimeout   = 10 * time.Second
 	refreshPeersInterval = 30 * time.Second
+
+	// total timeout for encryption handshake and protocol
+	// handshake in both directions.
+	handshakeTimeout = 5 * time.Second
+	// maximum time allowed for reading a complete message.
+	// this is effectively the amount of time a connection can be idle.
+	frameReadTimeout = 1 * time.Minute
+	// maximum amount of time allowed for writing a complete message.
+	frameWriteTimeout = 5 * time.Second
 )
 
 var srvlog = logger.NewLogger("P2P Server")
@@ -359,14 +367,18 @@ func (srv *Server) findPeers() {
 
 func (srv *Server) startPeer(fd net.Conn, dest *discover.Node) {
 	// TODO: handle/store session token
-	// TODO: reenable deadlines
-	// fd.SetDeadline(time.Now().Add(handshakeTimeout))
+	fd.SetDeadline(time.Now().Add(handshakeTimeout))
 	conn, err := srv.setupFunc(fd, srv.PrivateKey, srv.ourHandshake, dest)
 	if err != nil {
 		fd.Close()
 		srvlog.Debugf("Handshake with %v failed: %v", fd.RemoteAddr(), err)
 		return
 	}
+
+	conn.MsgReadWriter = &netWrapper{
+		wrapped: conn.MsgReadWriter,
+		conn:    fd, rtimeout: frameReadTimeout, wtimeout: frameWriteTimeout,
+	}
 	p := newPeer(fd, conn, srv.Protocols)
 	if ok, reason := srv.addPeer(conn.ID, p); !ok {
 		srvlog.DebugDetailf("Not adding %v (%v)\n", p, reason)