Skip to content

Commit

Permalink
review fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
bartoszWojciechO committed Jan 21, 2025
1 parent 5c5556c commit b6e5cf9
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 5 deletions.
9 changes: 5 additions & 4 deletions nc/nc.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func (MqttClientBuilder) Build(opts *mqtt.ClientOptions) mqtt.Client {
return mqtt.NewClient(opts)
}

type TimeFunc func(int) time.Duration
type CalculateRetryDelayForAttempt func(attempt int) time.Duration

// Client is a client for Notification center
type Client struct {
Expand All @@ -128,7 +128,7 @@ type Client struct {
subjectErr events.Publisher[error]
subjectPeerUpdate events.Publisher[[]string]
credsFetcher CredentialsGetter
timeFunc TimeFunc
retryDelayFunc CalculateRetryDelayForAttempt

startMu sync.Mutex
started bool
Expand All @@ -150,7 +150,7 @@ func NewClient(
subjectErr: subjectErr,
subjectPeerUpdate: subjectPeerUpdate,
credsFetcher: credsFetcher,
timeFunc: network.ExponentialBackoff,
retryDelayFunc: network.ExponentialBackoff,
}
}

Expand Down Expand Up @@ -246,6 +246,7 @@ func (c *Client) tryConnect(
select {
case managementChan <- credentials.ExpirationDate:
case <-ctx.Done():
return client, connectionState
}

opts := c.createClientOptions(credentials, managementChan, ctx)
Expand Down Expand Up @@ -309,7 +310,7 @@ func (c *Client) connectWithBackoff(client mqtt.Client,
client.Disconnect(0)
}
return client
case <-time.After(c.timeFunc(tries)):
case <-time.After(c.retryDelayFunc(tries)):
}
}

Expand Down
8 changes: 7 additions & 1 deletion nc/nc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ func TestConnectionCancellation(t *testing.T) {
connectionErr error
fetchCredentialsErr error
tokenTimeout time.Duration // how long client will wait for connection to be established
delayBeforeCancel time.Duration
}{
{
name: "connection success",
Expand All @@ -224,6 +225,10 @@ func TestConnectionCancellation(t *testing.T) {
name: "cancel while waiting for connection",
tokenTimeout: 10 * time.Second,
},
{
name: "delay before cancel",
delayBeforeCancel: 10 * time.Millisecond,
},
}

for _, test := range tests {
Expand All @@ -248,7 +253,7 @@ func TestConnectionCancellation(t *testing.T) {
subjectErr: &subs.Subject[error]{},
subjectPeerUpdate: &subs.Subject[[]string]{},
credsFetcher: credsFetcher,
timeFunc: func(i int) time.Duration { return test.tokenTimeout },
retryDelayFunc: func(i int) time.Duration { return test.tokenTimeout },
}

t.Run(test.name, func(t *testing.T) {
Expand All @@ -259,6 +264,7 @@ func TestConnectionCancellation(t *testing.T) {
connectedChan <- true
}()

time.Sleep(test.delayBeforeCancel)
cancelFunc()

select {
Expand Down

0 comments on commit b6e5cf9

Please sign in to comment.