From 7d81c85b0a318b391ea5ee5bb949c77d668ed702 Mon Sep 17 00:00:00 2001 From: Teodor Maxim <57960185+tmaxmax@users.noreply.github.com> Date: Sun, 19 Nov 2023 18:36:38 +0200 Subject: [PATCH] Remove Connection.reconnectionTime internal field --- client.go | 16 +++++++++++----- client_connection.go | 27 ++++++++++++--------------- 2 files changed, 23 insertions(+), 20 deletions(-) diff --git a/client.go b/client.go index 7c48c66..2fb8b58 100644 --- a/client.go +++ b/client.go @@ -76,15 +76,21 @@ func (c *Client) NewConnection(r *http.Request) *Connection { return conn } -func (c *Client) newBackoff(ctx context.Context) (backoff.BackOff, *time.Duration) { +func (c *Client) newBackoff(ctx context.Context) (b backoff.BackOff, setRetry func(time.Duration)) { base := backoff.NewExponentialBackOff() base.InitialInterval = c.DefaultReconnectionTime - initialReconnectionTime := &base.InitialInterval - b := backoff.WithContext(base, ctx) + b = backoff.WithContext(base, ctx) if c.MaxRetries >= 0 { - return backoff.WithMaxRetries(b, uint64(c.MaxRetries)), initialReconnectionTime + rb := backoff.WithMaxRetries(b, uint64(c.MaxRetries)) + return rb, func(d time.Duration) { + base.InitialInterval = d + rb.Reset() + } + } + return b, func(d time.Duration) { + base.InitialInterval = d + b.Reset() } - return b, initialReconnectionTime } func contentType(header string) string { diff --git a/client_connection.go b/client_connection.go index f1d282f..3b1c9e1 100644 --- a/client_connection.go +++ b/client_connection.go @@ -40,15 +40,14 @@ type EventCallbackRemover func() // // Connections must not be copied after they are created. type Connection struct { //nolint:govet // The current order aids readability. - mu sync.RWMutex - request *http.Request - callbacks map[string]map[int]EventCallback - callbacksAll map[int]EventCallback - reconnectionTime *time.Duration - lastEventID string - client Client - callbackID int - isRetry bool + mu sync.RWMutex + request *http.Request + callbacks map[string]map[int]EventCallback + callbacksAll map[int]EventCallback + lastEventID string + client Client + callbackID int + isRetry bool } // SubscribeMessages subscribes the given callback to all events without type (without or with empty `event` field). @@ -166,7 +165,7 @@ func (c *Connection) dispatch(ev Event) { } } -func (c *Connection) read(r io.Reader, reset func()) error { +func (c *Connection) read(r io.Reader, setRetry func(time.Duration)) error { p := parser.New(r) ev, dirty := Event{}, false @@ -193,8 +192,7 @@ func (c *Connection) read(r io.Reader, reset func()) error { break } if n > 0 { - *c.reconnectionTime = time.Duration(n) * time.Millisecond - reset() + setRetry(time.Duration(n) * time.Millisecond) } dirty = true default: @@ -225,9 +223,8 @@ func (c *Connection) read(r io.Reader, reset func()) error { // inside a *ConnectionError. func (c *Connection) Connect() error { ctx := c.request.Context() - b, interval := c.client.newBackoff(ctx) + b, setRetry := c.client.newBackoff(ctx) - c.reconnectionTime = interval c.request.Header.Set("Accept", "text/event-stream") c.request.Header.Set("Connection", "keep-alive") c.request.Header.Set("Cache", "no-cache") @@ -254,7 +251,7 @@ func (c *Connection) Connect() error { b.Reset() - err = c.read(res.Body, b.Reset) + err = c.read(res.Body, setRetry) if errors.Is(err, ctx.Err()) { return backoff.Permanent(err) }