From b4b778cd2f822488f8a127cbaa6e8a4167f51fdf Mon Sep 17 00:00:00 2001 From: Brandur Leach Date: Sat, 24 Aug 2024 14:44:53 -0700 Subject: [PATCH] Preinitialize `workCancel` function in case `StopAndCancel` called before start (#557) This one's related to #549. Although the most proximate problem in that repro code is that there was no error check when calling `Start`, it did reveal a legitimate problem in that the River client will panic in case `StopAndCancel` is called before `Start` because `workCancel` was never set. Here, initialize `workCancel` in the client's constructor. During normal operation this will be overwritten almost immediately on `Start` as the client starts up, but in case `Start` was never called or didn't run successfully, it provides a function for `StopAndCancel` to call so that it doesn't panic. Fixes #549. --- CHANGELOG.md | 4 ++++ client.go | 1 + client_test.go | 37 ++++++++++++++++++++++++------------- 3 files changed, 29 insertions(+), 13 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7a2fe3dd..c2b8b07b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - A new `river migrate-list` command is available which lists available migrations and which version a target database is migrated to. [PR #534](https://github.com/riverqueue/river/pull/534). - `river version` or `river --version` now prints River version information. [PR #537](https://github.com/riverqueue/river/pull/537). +## Fixed + +- Fixed a panic that'd occur if `StopAndCancel` was invoked before a client was started. [PR #557](https://github.com/riverqueue/river/pull/557). + ## [0.11.4] - 2024-08-20 ### Fixed diff --git a/client.go b/client.go index abbc4772..4a83bb03 100644 --- a/client.go +++ b/client.go @@ -491,6 +491,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client uniqueInserter: baseservice.Init(archetype, &dbunique.UniqueInserter{ AdvisoryLockPrefix: config.AdvisoryLockPrefix, }), + workCancel: func(cause error) {}, // replaced on start, but here in case StopAndCancel is called before start up } client.queues = &QueueBundle{addProducer: client.addProducer} diff --git a/client_test.go b/client_test.go index d2151493..f118e435 100644 --- a/client_test.go +++ b/client_test.go @@ -1050,18 +1050,6 @@ func Test_Client_StopAndCancel(t *testing.T) { client := runNewTestClient(ctx, t, config) - insertRes, err := client.Insert(ctx, &callbackArgs{}, nil) - require.NoError(t, err) - - startedJobID := riversharedtest.WaitOrTimeout(t, jobStartedChan) - require.Equal(t, insertRes.Job.ID, startedJobID) - - select { - case <-client.Stopped(): - t.Fatal("expected client to not be stopped yet") - default: - } - return client, &testBundle{ jobDoneChan: jobDoneChan, jobStartedChan: jobStartedChan, @@ -1071,17 +1059,40 @@ func Test_Client_StopAndCancel(t *testing.T) { t.Run("OnItsOwn", func(t *testing.T) { t.Parallel() - client, _ := setup(t) + client, bundle := setup(t) + + startClient(ctx, t, client) + + _, err := client.Insert(ctx, &callbackArgs{}, nil) + require.NoError(t, err) + + riversharedtest.WaitOrTimeout(t, bundle.jobStartedChan) require.NoError(t, client.StopAndCancel(ctx)) riversharedtest.WaitOrTimeout(t, client.Stopped()) }) + t.Run("BeforeStart", func(t *testing.T) { + t.Parallel() + + client, _ := setup(t) + + require.NoError(t, client.StopAndCancel(ctx)) + riversharedtest.WaitOrTimeout(t, client.Stopped()) // this works because Stopped is nil + }) + t.Run("AfterStop", func(t *testing.T) { t.Parallel() client, bundle := setup(t) + startClient(ctx, t, client) + + _, err := client.Insert(ctx, &callbackArgs{}, nil) + require.NoError(t, err) + + riversharedtest.WaitOrTimeout(t, bundle.jobStartedChan) + go func() { require.NoError(t, client.Stop(ctx)) }()