Skip to content

Commit

Permalink
Remove Connection.reconnectionTime internal field
Browse files Browse the repository at this point in the history
  • Loading branch information
tmaxmax committed Nov 19, 2023
1 parent 04f3693 commit 7d81c85
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 20 deletions.
16 changes: 11 additions & 5 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,15 +76,21 @@ func (c *Client) NewConnection(r *http.Request) *Connection {
return conn
}

func (c *Client) newBackoff(ctx context.Context) (backoff.BackOff, *time.Duration) {
func (c *Client) newBackoff(ctx context.Context) (b backoff.BackOff, setRetry func(time.Duration)) {
base := backoff.NewExponentialBackOff()
base.InitialInterval = c.DefaultReconnectionTime
initialReconnectionTime := &base.InitialInterval
b := backoff.WithContext(base, ctx)
b = backoff.WithContext(base, ctx)
if c.MaxRetries >= 0 {
return backoff.WithMaxRetries(b, uint64(c.MaxRetries)), initialReconnectionTime
rb := backoff.WithMaxRetries(b, uint64(c.MaxRetries))
return rb, func(d time.Duration) {
base.InitialInterval = d
rb.Reset()
}
}
return b, func(d time.Duration) {
base.InitialInterval = d
b.Reset()
}
return b, initialReconnectionTime
}

func contentType(header string) string {
Expand Down
27 changes: 12 additions & 15 deletions client_connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,14 @@ type EventCallbackRemover func()
//
// Connections must not be copied after they are created.
type Connection struct { //nolint:govet // The current order aids readability.
mu sync.RWMutex
request *http.Request
callbacks map[string]map[int]EventCallback
callbacksAll map[int]EventCallback
reconnectionTime *time.Duration
lastEventID string
client Client
callbackID int
isRetry bool
mu sync.RWMutex
request *http.Request
callbacks map[string]map[int]EventCallback
callbacksAll map[int]EventCallback
lastEventID string
client Client
callbackID int
isRetry bool
}

// SubscribeMessages subscribes the given callback to all events without type (without or with empty `event` field).
Expand Down Expand Up @@ -166,7 +165,7 @@ func (c *Connection) dispatch(ev Event) {
}
}

func (c *Connection) read(r io.Reader, reset func()) error {
func (c *Connection) read(r io.Reader, setRetry func(time.Duration)) error {
p := parser.New(r)
ev, dirty := Event{}, false

Expand All @@ -193,8 +192,7 @@ func (c *Connection) read(r io.Reader, reset func()) error {
break
}
if n > 0 {
*c.reconnectionTime = time.Duration(n) * time.Millisecond
reset()
setRetry(time.Duration(n) * time.Millisecond)
}
dirty = true
default:
Expand Down Expand Up @@ -225,9 +223,8 @@ func (c *Connection) read(r io.Reader, reset func()) error {
// inside a *ConnectionError.
func (c *Connection) Connect() error {
ctx := c.request.Context()
b, interval := c.client.newBackoff(ctx)
b, setRetry := c.client.newBackoff(ctx)

c.reconnectionTime = interval
c.request.Header.Set("Accept", "text/event-stream")
c.request.Header.Set("Connection", "keep-alive")
c.request.Header.Set("Cache", "no-cache")
Expand All @@ -254,7 +251,7 @@ func (c *Connection) Connect() error {

b.Reset()

err = c.read(res.Body, b.Reset)
err = c.read(res.Body, setRetry)
if errors.Is(err, ctx.Err()) {
return backoff.Permanent(err)
}
Expand Down

0 comments on commit 7d81c85

Please sign in to comment.