Skip to content

Commit

Permalink
Preinitialize workCancel function in case StopAndCancel called be…
Browse files Browse the repository at this point in the history
…fore start (#557)

This one's related to #549. Although the most proximate problem in that
repro code is that there was no error check when calling `Start`, it did
reveal a legitimate problem in that the River client will panic in case
`StopAndCancel` is called before `Start` because `workCancel` was never
set.

Here, initialize `workCancel` in the client's constructor. During normal
operation this will be overwritten almost immediately on `Start` as the
client starts up, but in case `Start` was never called or didn't run
successfully, it provides a function for `StopAndCancel` to call so that
it doesn't panic.

Fixes #549.
  • Loading branch information
brandur authored Aug 24, 2024
1 parent 394aa68 commit b4b778c
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 13 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- A new `river migrate-list` command is available which lists available migrations and which version a target database is migrated to. [PR #534](https://github.com/riverqueue/river/pull/534).
- `river version` or `river --version` now prints River version information. [PR #537](https://github.com/riverqueue/river/pull/537).

## Fixed

- Fixed a panic that'd occur if `StopAndCancel` was invoked before a client was started. [PR #557](https://github.com/riverqueue/river/pull/557).

## [0.11.4] - 2024-08-20

### Fixed
Expand Down
1 change: 1 addition & 0 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
uniqueInserter: baseservice.Init(archetype, &dbunique.UniqueInserter{
AdvisoryLockPrefix: config.AdvisoryLockPrefix,
}),
workCancel: func(cause error) {}, // replaced on start, but here in case StopAndCancel is called before start up
}
client.queues = &QueueBundle{addProducer: client.addProducer}

Expand Down
37 changes: 24 additions & 13 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1050,18 +1050,6 @@ func Test_Client_StopAndCancel(t *testing.T) {

client := runNewTestClient(ctx, t, config)

insertRes, err := client.Insert(ctx, &callbackArgs{}, nil)
require.NoError(t, err)

startedJobID := riversharedtest.WaitOrTimeout(t, jobStartedChan)
require.Equal(t, insertRes.Job.ID, startedJobID)

select {
case <-client.Stopped():
t.Fatal("expected client to not be stopped yet")
default:
}

return client, &testBundle{
jobDoneChan: jobDoneChan,
jobStartedChan: jobStartedChan,
Expand All @@ -1071,17 +1059,40 @@ func Test_Client_StopAndCancel(t *testing.T) {
t.Run("OnItsOwn", func(t *testing.T) {
t.Parallel()

client, _ := setup(t)
client, bundle := setup(t)

startClient(ctx, t, client)

_, err := client.Insert(ctx, &callbackArgs{}, nil)
require.NoError(t, err)

riversharedtest.WaitOrTimeout(t, bundle.jobStartedChan)

require.NoError(t, client.StopAndCancel(ctx))
riversharedtest.WaitOrTimeout(t, client.Stopped())
})

t.Run("BeforeStart", func(t *testing.T) {
t.Parallel()

client, _ := setup(t)

require.NoError(t, client.StopAndCancel(ctx))
riversharedtest.WaitOrTimeout(t, client.Stopped()) // this works because Stopped is nil
})

t.Run("AfterStop", func(t *testing.T) {
t.Parallel()

client, bundle := setup(t)

startClient(ctx, t, client)

_, err := client.Insert(ctx, &callbackArgs{}, nil)
require.NoError(t, err)

riversharedtest.WaitOrTimeout(t, bundle.jobStartedChan)

go func() {
require.NoError(t, client.Stop(ctx))
}()
Expand Down

0 comments on commit b4b778c

Please sign in to comment.