Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix reconnect logic #3

Merged
merged 3 commits into from
Nov 20, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 58 additions & 50 deletions recws.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,30 +47,38 @@ type RecConn struct {
// NonVerbose suppress connecting/reconnecting messages.
NonVerbose bool

isConnected bool
isConnecting bool

mu sync.RWMutex
url string
reqHeader http.Header
httpResp *http.Response
dialErr error
dialer *websocket.Dialer
close chan (bool)

*websocket.Conn
}

const (
TextMessage = websocket.TextMessage
BinaryMessage = websocket.BinaryMessage
PingMessage = websocket.PingMessage
)

// CloseAndReconnect will try to reconnect.
func (rc *RecConn) CloseAndReconnect() {
if rc.IsConnecting() {
return
}
rc.Close()
go rc.connect()
}

// setIsConnected sets state for isConnected
func (rc *RecConn) setIsConnected(state bool) {
func (rc *RecConn) setIsConnecting(state bool) {
rc.mu.Lock()
defer rc.mu.Unlock()

rc.isConnected = state
rc.isConnecting = state
}

func (rc *RecConn) getConn() *websocket.Conn {
Expand All @@ -83,13 +91,12 @@ func (rc *RecConn) getConn() *websocket.Conn {
// Close closes the underlying network connection without
// sending or waiting for a close frame.
func (rc *RecConn) Close() {
if rc.getConn() != nil {
if rc.IsConnected() {
rc.mu.Lock()
rc.Conn.Close()
rc.Conn = nil
rc.mu.Unlock()
}
rc.close <- true
rc.setIsConnected(false)
}

// ReadMessage is a helper method for getting a reader
Expand Down Expand Up @@ -291,9 +298,6 @@ func (rc *RecConn) Dial(urlStr string, reqHeader http.Header) {
log.Fatalf("Dial: %v", err)
}

// Close channel
rc.close = make(chan bool, 1)

// Config
rc.setURL(urlStr)
rc.setReqHeader(reqHeader)
Expand All @@ -306,9 +310,6 @@ func (rc *RecConn) Dial(urlStr string, reqHeader http.Header) {

// Connect
go rc.connect()

// wait on first attempt
time.Sleep(rc.getHandshakeTimeout())
}

// GetURL returns current connection url
Expand Down Expand Up @@ -356,7 +357,7 @@ func (rc *RecConn) writeControlPingMessage() error {
rc.mu.Lock()
defer rc.mu.Unlock()

return rc.Conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(10*time.Second))
return rc.Conn.WriteControl(PingMessage, []byte{}, time.Now().Add(10*time.Second))
}

func (rc *RecConn) keepAlive() {
Expand Down Expand Up @@ -394,52 +395,52 @@ func (rc *RecConn) keepAlive() {
}

func (rc *RecConn) connect() {
if rc.IsConnecting() {
return
}

rc.setIsConnecting(true)
defer rc.setIsConnecting(false)

b := rc.getBackoff()
rand.Seed(time.Now().UTC().UnixNano())

for {
select {
case <-rc.close:
return
default:
nextItvl := b.Duration()
wsConn, httpResp, err := rc.dialer.Dial(rc.url, rc.reqHeader)

rc.mu.Lock()
rc.Conn = wsConn
rc.dialErr = err
rc.isConnected = err == nil
rc.httpResp = httpResp
rc.mu.Unlock()

if err == nil {
if !rc.getNonVerbose() {
log.Printf("Dial: connection was successfully established with %s\n", rc.url)
}
nextItvl := b.Duration()
rc.mu.Lock()
wsConn, httpResp, err := rc.dialer.Dial(rc.url, rc.reqHeader)
rc.Conn = wsConn
rc.dialErr = err
rc.httpResp = httpResp
rc.mu.Unlock()

if rc.hasSubscribeHandler() {
if err := rc.SubscribeHandler(); err != nil {
log.Fatalf("Dial: connect handler failed with %s", err.Error())
}
if !rc.getNonVerbose() {
log.Printf("Dial: connect handler was successfully established with %s\n", rc.url)
}
}
if err == nil {
if !rc.getNonVerbose() {
log.Printf("Dial: connection was successfully established with %s\n", rc.url)
}

if rc.getKeepAliveTimeout() != 0 {
rc.keepAlive()
if rc.hasSubscribeHandler() {
if err := rc.SubscribeHandler(); err != nil {
log.Fatalf("Dial: connect handler failed with %s", err.Error())
}
if !rc.getNonVerbose() {
log.Printf("Dial: connect handler was successfully established with %s\n", rc.url)
}

return
}

if !rc.getNonVerbose() {
log.Println(err)
log.Println("Dial: will try again in", nextItvl, "seconds.")
if rc.getKeepAliveTimeout() != 0 {
rc.keepAlive()
}

time.Sleep(nextItvl)
return
}

if !rc.getNonVerbose() {
log.Println(err)
log.Println("Dial: will try again in", nextItvl, "seconds.")
}

time.Sleep(nextItvl)
}
}

Expand Down Expand Up @@ -467,5 +468,12 @@ func (rc *RecConn) IsConnected() bool {
rc.mu.RLock()
defer rc.mu.RUnlock()

return rc.isConnected
return rc.Conn != nil
}

func (rc *RecConn) IsConnecting() bool {
rc.mu.RLock()
defer rc.mu.RUnlock()

return rc.isConnecting
}