-
Notifications
You must be signed in to change notification settings - Fork 325
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix panic: concurrent write to websocket connection #3908
Changes from 2 commits
6c95863
a171a02
7ae33e8
3d607fe
0613b0c
b8eb929
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,6 +3,7 @@ package api | |
import ( | ||
"context" | ||
"net/http" | ||
"sync" | ||
"time" | ||
|
||
"github.com/gorilla/websocket" | ||
|
@@ -36,6 +37,41 @@ var upgrader = websocket.Upgrader{ | |
WriteBufferSize: 1024, | ||
} | ||
|
||
// type safeWebsocketConn wraps websocket.Conn with a mutex | ||
// to avoid concurrent write to the connection | ||
type safeWebsocketConn struct { | ||
ws *websocket.Conn | ||
mu sync.Mutex | ||
} | ||
|
||
// WiteJSON writes a JSON message to the connection in a thread-safe way | ||
func (c *safeWebsocketConn) WriteJSON(message interface{}) error { | ||
c.mu.Lock() | ||
defer c.mu.Unlock() | ||
return c.ws.WriteJSON(message) | ||
} | ||
|
||
// WriteMessage writes a message to the connection in a thread-safe way | ||
func (c *safeWebsocketConn) WriteMessage(messageType int, data []byte) error { | ||
c.mu.Lock() | ||
defer c.mu.Unlock() | ||
return c.ws.WriteMessage(messageType, data) | ||
} | ||
|
||
millken marked this conversation as resolved.
Show resolved
Hide resolved
|
||
// Close closes the underlying network connection without sending or waiting for a close frame | ||
func (c *safeWebsocketConn) Close() error { | ||
dustinxie marked this conversation as resolved.
Show resolved
Hide resolved
|
||
c.mu.Lock() | ||
defer c.mu.Unlock() | ||
return c.ws.Close() | ||
} | ||
|
||
// SetWriteDeadline sets the write deadline on the underlying network connection | ||
func (c *safeWebsocketConn) SetWriteDeadline(t time.Time) error { | ||
c.mu.Lock() | ||
defer c.mu.Unlock() | ||
return c.ws.SetWriteDeadline(t) | ||
} | ||
|
||
// NewWebsocketHandler creates a new websocket handler | ||
func NewWebsocketHandler(web3Handler Web3Handler) *WebsocketHandler { | ||
return &WebsocketHandler{ | ||
|
@@ -70,7 +106,8 @@ func (wsSvr *WebsocketHandler) handleConnection(ctx context.Context, ws *websock | |
}) | ||
|
||
ctx, cancel := context.WithCancel(ctx) | ||
go ping(ctx, ws, cancel) | ||
safeWs := &safeWebsocketConn{ws: ws} | ||
go ping(ctx, safeWs, cancel) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
if c.isWriting {
panic("concurrent write to websocket connection")
}
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. but pkg.go.dev clearly stated: "Applications are responsible for ensuring that no more than one goroutine calls the write methods (NextWriter, SetWriteDeadline, WriteMessage, WriteJSON, EnableWriteCompression, SetCompressionLevel) concurrently and that no more than one goroutine calls the read methods (NextReader, SetReadDeadline, ReadMessage, ReadJSON, SetPongHandler, SetPingHandler) concurrently." |
||
|
||
for { | ||
select { | ||
|
@@ -87,10 +124,10 @@ func (wsSvr *WebsocketHandler) handleConnection(ctx context.Context, ws *websock | |
err = wsSvr.msgHandler.HandlePOSTReq(ctx, reader, | ||
apitypes.NewResponseWriter( | ||
func(resp interface{}) (int, error) { | ||
if err = ws.SetWriteDeadline(time.Now().Add(writeWait)); err != nil { | ||
if err = safeWs.SetWriteDeadline(time.Now().Add(writeWait)); err != nil { | ||
log.Logger("api").Warn("failed to set write deadline timeout.", zap.Error(err)) | ||
} | ||
return 0, ws.WriteJSON(resp) | ||
return 0, safeWs.WriteJSON(resp) | ||
}), | ||
) | ||
if err != nil { | ||
|
@@ -102,7 +139,7 @@ func (wsSvr *WebsocketHandler) handleConnection(ctx context.Context, ws *websock | |
} | ||
} | ||
|
||
func ping(ctx context.Context, ws *websocket.Conn, cancel context.CancelFunc) { | ||
func ping(ctx context.Context, ws *safeWebsocketConn, cancel context.CancelFunc) { | ||
pingTicker := time.NewTicker(pingPeriod) | ||
defer func() { | ||
pingTicker.Stop() | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the link could be added here
https://pkg.go.dev/github.com/gorilla/websocket#hdr-Concurrency