Jelajahi Sumber

rpc: send websocket ping when connection is idle (#21142)

* rpc: send websocket ping when connection is idle

* rpc: use non-blocking send for websocket pingReset
Felix Lange 5 tahun lalu
induk
melakukan
d98c42c0e3
1 mengubah file dengan 63 tambahan dan 3 penghapusan
  1. 63 3
      rpc/websocket.go

+ 63 - 3
rpc/websocket.go

@@ -25,6 +25,7 @@ import (
 	"os"
 	"strings"
 	"sync"
+	"time"
 
 	mapset "github.com/deckarep/golang-set"
 	"github.com/ethereum/go-ethereum/log"
@@ -32,8 +33,10 @@ import (
 )
 
 const (
-	wsReadBuffer  = 1024
-	wsWriteBuffer = 1024
+	wsReadBuffer       = 1024
+	wsWriteBuffer      = 1024
+	wsPingInterval     = 60 * time.Second
+	wsPingWriteTimeout = 5 * time.Second
 )
 
 var wsBufferPool = new(sync.Pool)
@@ -168,7 +171,64 @@ func wsClientHeaders(endpoint, origin string) (string, http.Header, error) {
 	return endpointURL.String(), header, nil
 }
 
+type websocketCodec struct {
+	*jsonCodec
+	conn *websocket.Conn
+
+	wg        sync.WaitGroup
+	pingReset chan struct{}
+}
+
 func newWebsocketCodec(conn *websocket.Conn) ServerCodec {
 	conn.SetReadLimit(maxRequestContentLength)
-	return NewFuncCodec(conn, conn.WriteJSON, conn.ReadJSON)
+	wc := &websocketCodec{
+		jsonCodec: NewFuncCodec(conn, conn.WriteJSON, conn.ReadJSON).(*jsonCodec),
+		conn:      conn,
+		pingReset: make(chan struct{}, 1),
+	}
+	wc.wg.Add(1)
+	go wc.pingLoop()
+	return wc
+}
+
+func (wc *websocketCodec) close() {
+	wc.jsonCodec.close()
+	wc.wg.Wait()
+}
+
+func (wc *websocketCodec) writeJSON(ctx context.Context, v interface{}) error {
+	err := wc.jsonCodec.writeJSON(ctx, v)
+	if err == nil {
+		// Notify pingLoop to delay the next idle ping.
+		select {
+		case wc.pingReset <- struct{}{}:
+		default:
+		}
+	}
+	return err
+}
+
+// pingLoop sends periodic ping frames when the connection is idle.
+func (wc *websocketCodec) pingLoop() {
+	var timer = time.NewTimer(wsPingInterval)
+	defer wc.wg.Done()
+	defer timer.Stop()
+
+	for {
+		select {
+		case <-wc.closed():
+			return
+		case <-wc.pingReset:
+			if !timer.Stop() {
+				<-timer.C
+			}
+			timer.Reset(wsPingInterval)
+		case <-timer.C:
+			wc.jsonCodec.encMu.Lock()
+			wc.conn.SetWriteDeadline(time.Now().Add(wsPingWriteTimeout))
+			wc.conn.WriteMessage(websocket.PingMessage, nil)
+			wc.jsonCodec.encMu.Unlock()
+			timer.Reset(wsPingInterval)
+		}
+	}
 }