Skip to content

Commit

Permalink
fix panic:concurrent write to websocket connection
Browse files Browse the repository at this point in the history
  • Loading branch information
millken committed Jul 27, 2023
1 parent b666fd5 commit 6c95863
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 6 deletions.
33 changes: 28 additions & 5 deletions api/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package api
import (
"context"
"net/http"
"sync"
"time"

"github.com/gorilla/websocket"
Expand Down Expand Up @@ -36,6 +37,27 @@ var upgrader = websocket.Upgrader{
WriteBufferSize: 1024,
}

// type safeWebsocketConn wraps websocket.Conn with a mutex
// to avoid concurrent write to the connection
type safeWebsocketConn struct {
ws *websocket.Conn
mu sync.Mutex
}

// WiteJSON writes a JSON message to the connection in a thread-safe way
func (c *safeWebsocketConn) WriteJSON(message interface{}) error {
c.mu.Lock()
defer c.mu.Unlock()
return c.ws.WriteJSON(message)
}

// WriteMessage writes a message to the connection in a thread-safe way
func (c *safeWebsocketConn) WriteMessage(messageType int, data []byte) error {
c.mu.Lock()
defer c.mu.Unlock()
return c.ws.WriteMessage(messageType, data)
}

// NewWebsocketHandler creates a new websocket handler
func NewWebsocketHandler(web3Handler Web3Handler) *WebsocketHandler {
return &WebsocketHandler{
Expand Down Expand Up @@ -70,7 +92,8 @@ func (wsSvr *WebsocketHandler) handleConnection(ctx context.Context, ws *websock
})

ctx, cancel := context.WithCancel(ctx)
go ping(ctx, ws, cancel)
safeWs := &safeWebsocketConn{ws: ws}
go ping(ctx, safeWs, cancel)

for {
select {
Expand All @@ -90,7 +113,7 @@ func (wsSvr *WebsocketHandler) handleConnection(ctx context.Context, ws *websock
if err = ws.SetWriteDeadline(time.Now().Add(writeWait)); err != nil {
log.Logger("api").Warn("failed to set write deadline timeout.", zap.Error(err))
}
return 0, ws.WriteJSON(resp)
return 0, safeWs.WriteJSON(resp)
}),
)
if err != nil {
Expand All @@ -102,11 +125,11 @@ func (wsSvr *WebsocketHandler) handleConnection(ctx context.Context, ws *websock
}
}

func ping(ctx context.Context, ws *websocket.Conn, cancel context.CancelFunc) {
func ping(ctx context.Context, ws *safeWebsocketConn, cancel context.CancelFunc) {
pingTicker := time.NewTicker(pingPeriod)
defer func() {
pingTicker.Stop()
if err := ws.Close(); err != nil {
if err := ws.ws.Close(); err != nil {
log.Logger("api").Warn("fail to close websocket connection.", zap.Error(err))
}
}()
Expand All @@ -116,7 +139,7 @@ func ping(ctx context.Context, ws *websocket.Conn, cancel context.CancelFunc) {
case <-ctx.Done():
return
case <-pingTicker.C:
if err := ws.SetWriteDeadline(time.Now().Add(writeWait)); err != nil {
if err := ws.ws.SetWriteDeadline(time.Now().Add(writeWait)); err != nil {
log.Logger("api").Warn("failed to set write deadline timeout.", zap.Error(err))
}
if err := ws.WriteMessage(websocket.PingMessage, []byte{}); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/recovery/recovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ import (
"runtime/pprof"
"time"

"github.com/shirou/gopsutil/v3/load"
"github.com/shirou/gopsutil/v3/cpu"
"github.com/shirou/gopsutil/v3/disk"
"github.com/shirou/gopsutil/v3/load"
"github.com/shirou/gopsutil/v3/mem"

"github.com/iotexproject/iotex-core/pkg/log"
Expand Down

0 comments on commit 6c95863

Please sign in to comment.