From 13ecbd4646b749526356e23216c12b76047b06a6 Mon Sep 17 00:00:00 2001 From: Brandur Leach Date: Wed, 6 Dec 2023 21:00:37 -0800 Subject: [PATCH] Consolidate on one set state query + adapter/completer function [boilerplate reduction] (#101) This one's a clean up aimed at doing a massive boilerplate reduction in our sqlc queries, the adapter, and the completer(s). We have a few current problems: * Every "set state" query like `JobCompleteIfRunning`, `JobErrorIfRunning`, etc. are all separate sqlc queries, and worse yet, all very long ones that are all practically identical with very minor but important differences that are hard to spot without examining them character by character. * Every one of these queries get exposed through the adapter as a separate function that's implemented in the interface, `StandardAdapter`, and the mock `TestAdapter`, requiring lots of LOC. * They're all exposed yet again in the `Completer` interface, along with both implementations of `Completer`. Which again, is a lot of LOC. Here, we show that with some fairly minor tweaks, we can consolidate all of this state updating into a single query that's no longer than any one of the originals, and then simplify both the adapter and completer to model access to it as a general `JobSetState` function with a number of params constructors that function to produce the necessary parameters for each type of state of update (complete, error, snooze, etc.). Total LOCs are reduced by *a lot*. `river_job.sql` just by itself has 40% fewer lines, and with many more removed in the adapters and completers. This has the effect of a lot less noise, but also makes refactoring easier because there's less that has to change. Furthermore, I don't believe there's any significant reduction in API ergonomics or safety. It's still not perfect -- there's still way too much scaffolding in the completers in particular for example, but more improvements to come. --- client_test.go | 18 +- internal/dbadapter/db_adapter.go | 112 ++--- internal/dbadapter/db_adapter_test.go | 39 +- internal/dbadaptertest/test_adapter.go | 132 +----- internal/dbsqlc/river_job.sql | 257 ++--------- internal/dbsqlc/river_job.sql.go | 475 ++++---------------- internal/jobcompleter/job_completer.go | 73 +-- internal/jobcompleter/job_completer_test.go | 24 +- internal/util/ptrutil/ptr_util.go | 18 + internal/util/ptrutil/ptr_util_test.go | 16 + job.go | 8 +- job_executor.go | 13 +- 12 files changed, 253 insertions(+), 932 deletions(-) diff --git a/client_test.go b/client_test.go index 27f6f27b..4effd849 100644 --- a/client_test.go +++ b/client_test.go @@ -1952,7 +1952,12 @@ func Test_Client_JobCompletion(t *testing.T) { var dbPool *pgxpool.Pool now := time.Now().UTC() config := newTestConfig(t, func(ctx context.Context, job *Job[callbackArgs]) error { - _, err := queries.JobSetCompleted(ctx, dbPool, dbsqlc.JobSetCompletedParams{ID: job.ID, FinalizedAt: now}) + _, err := queries.JobSetState(ctx, dbPool, dbsqlc.JobSetStateParams{ + ID: job.ID, + FinalizedAtDoUpdate: true, + FinalizedAt: &now, + State: dbsqlc.JobStateCompleted, + }) require.NoError(err) return nil }) @@ -2031,10 +2036,13 @@ func Test_Client_JobCompletion(t *testing.T) { var dbPool *pgxpool.Pool now := time.Now().UTC() config := newTestConfig(t, func(ctx context.Context, job *Job[callbackArgs]) error { - _, err := queries.JobSetDiscarded(ctx, dbPool, dbsqlc.JobSetDiscardedParams{ - ID: job.ID, - Error: []byte("{\"error\": \"oops\"}"), - FinalizedAt: now, + _, err := queries.JobSetState(ctx, dbPool, dbsqlc.JobSetStateParams{ + ID: job.ID, + ErrorDoUpdate: true, + Error: []byte("{\"error\": \"oops\"}"), + FinalizedAtDoUpdate: true, + FinalizedAt: &now, + State: dbsqlc.JobStateDiscarded, }) require.NoError(err) return errors.New("oops") diff --git a/internal/dbadapter/db_adapter.go b/internal/dbadapter/db_adapter.go index b68f8751..50838bff 100644 --- a/internal/dbadapter/db_adapter.go +++ b/internal/dbadapter/db_adapter.go @@ -81,9 +81,6 @@ type JobInsertResult struct { // expedience, but this should be converted to a more stable API if Adapter // would be exported. type Adapter interface { - JobCompleteMany(ctx context.Context, jobs ...JobToComplete) error - // TODO: should all dbsqlc need to implement this? Or is it a pro feature? - JobCompleteManyTx(ctx context.Context, tx pgx.Tx, jobs ...JobToComplete) error JobInsert(ctx context.Context, params *JobInsertParams) (*JobInsertResult, error) JobInsertTx(ctx context.Context, tx pgx.Tx, params *JobInsertParams) (*JobInsertResult, error) @@ -94,12 +91,10 @@ type Adapter interface { JobGetAvailable(ctx context.Context, queueName string, limit int32) ([]*dbsqlc.RiverJob, error) JobGetAvailableTx(ctx context.Context, tx pgx.Tx, queueName string, limit int32) ([]*dbsqlc.RiverJob, error) - JobSetCancelledIfRunning(ctx context.Context, id int64, cancelledAt time.Time, err []byte) (*dbsqlc.RiverJob, error) - JobSetCompletedIfRunning(ctx context.Context, job JobToComplete) (*dbsqlc.RiverJob, error) - JobSetCompletedTx(ctx context.Context, tx pgx.Tx, id int64, completedAt time.Time) (*dbsqlc.RiverJob, error) - JobSetDiscardedIfRunning(ctx context.Context, id int64, discardedAt time.Time, err []byte) (*dbsqlc.RiverJob, error) - JobSetErroredIfRunning(ctx context.Context, id int64, scheduledAt time.Time, err []byte) (*dbsqlc.RiverJob, error) - JobSetSnoozedIfRunning(ctx context.Context, id int64, scheduledAt time.Time) (*dbsqlc.RiverJob, error) + // JobSetStateIfRunning sets the state of a currently running job. Jobs which are not + // running (i.e. which have already have had their state set to something + // new through an explicit snooze or cancellation), are ignored. + JobSetStateIfRunning(ctx context.Context, params *JobSetStateIfRunningParams) (*dbsqlc.RiverJob, error) // LeadershipAttemptElect attempts to elect a leader for the given name. The // bool alreadyElected indicates whether this is a potential reelection of @@ -322,26 +317,6 @@ func (a *StandardAdapter) JobInsertManyTx(ctx context.Context, tx pgx.Tx, params return numInserted, nil } -func (a *StandardAdapter) JobCompleteMany(ctx context.Context, jobs ...JobToComplete) error { - ctx, cancel := context.WithTimeout(ctx, a.deadlineTimeout) - defer cancel() - - return a.queries.JobCompleteMany(ctx, a.executor, dbsqlc.JobCompleteManyParams{ - ID: sliceutil.Map(jobs, func(j JobToComplete) int64 { return j.ID }), - FinalizedAt: sliceutil.Map(jobs, func(j JobToComplete) time.Time { return j.FinalizedAt.UTC() }), - }) -} - -func (a *StandardAdapter) JobCompleteManyTx(ctx context.Context, tx pgx.Tx, jobs ...JobToComplete) error { - ctx, cancel := context.WithTimeout(ctx, a.deadlineTimeout) - defer cancel() - - return a.queries.JobCompleteMany(ctx, tx, dbsqlc.JobCompleteManyParams{ - ID: sliceutil.Map(jobs, func(j JobToComplete) int64 { return j.ID }), - FinalizedAt: sliceutil.Map(jobs, func(j JobToComplete) time.Time { return j.FinalizedAt.UTC() }), - }) -} - func (a *StandardAdapter) JobGetAvailable(ctx context.Context, queueName string, limit int32) ([]*dbsqlc.RiverJob, error) { ctx, cancel := context.WithTimeout(ctx, a.deadlineTimeout) defer cancel() @@ -377,66 +352,53 @@ func (a *StandardAdapter) JobGetAvailableTx(ctx context.Context, tx pgx.Tx, queu return jobs, nil } -func (a *StandardAdapter) JobSetCancelledIfRunning(ctx context.Context, id int64, finalizedAt time.Time, err []byte) (*dbsqlc.RiverJob, error) { - ctx, cancel := context.WithTimeout(ctx, a.deadlineTimeout) - defer cancel() - - return a.queries.JobSetCancelledIfRunning(ctx, a.executor, dbsqlc.JobSetCancelledIfRunningParams{ - ID: id, - Error: err, - FinalizedAt: finalizedAt, - }) +// JobSetStateIfRunningParams are parameters to update the state of a currently running +// job. Use one of the constructors below to ensure a correct combination of +// parameters. +type JobSetStateIfRunningParams struct { + ID int64 + errData []byte + finalizedAt *time.Time + maxAttempts *int + scheduledAt *time.Time + state dbsqlc.JobState } -func (a *StandardAdapter) JobSetCompletedIfRunning(ctx context.Context, job JobToComplete) (*dbsqlc.RiverJob, error) { - ctx, cancel := context.WithTimeout(ctx, a.deadlineTimeout) - defer cancel() - - return a.queries.JobSetCompletedIfRunning(ctx, a.executor, dbsqlc.JobSetCompletedIfRunningParams{ - ID: job.ID, - FinalizedAt: job.FinalizedAt.UTC(), - }) +func JobSetStateCancelled(id int64, finalizedAt time.Time, errData []byte) *JobSetStateIfRunningParams { + return &JobSetStateIfRunningParams{ID: id, errData: errData, finalizedAt: &finalizedAt, state: dbsqlc.JobStateCancelled} } -func (a *StandardAdapter) JobSetCompletedTx(ctx context.Context, tx pgx.Tx, id int64, completedAt time.Time) (*dbsqlc.RiverJob, error) { - ctx, cancel := context.WithTimeout(ctx, a.deadlineTimeout) - defer cancel() - - return a.queries.JobSetCompleted(ctx, tx, dbsqlc.JobSetCompletedParams{ - ID: id, - FinalizedAt: completedAt.UTC(), - }) +func JobSetStateCompleted(id int64, finalizedAt time.Time) *JobSetStateIfRunningParams { + return &JobSetStateIfRunningParams{ID: id, finalizedAt: &finalizedAt, state: dbsqlc.JobStateCompleted} } -func (a *StandardAdapter) JobSetDiscardedIfRunning(ctx context.Context, id int64, finalizedAt time.Time, err []byte) (*dbsqlc.RiverJob, error) { - ctx, cancel := context.WithTimeout(ctx, a.deadlineTimeout) - defer cancel() - - return a.queries.JobSetDiscardedIfRunning(ctx, a.executor, dbsqlc.JobSetDiscardedIfRunningParams{ - ID: id, - Error: err, - FinalizedAt: finalizedAt, - }) +func JobSetStateDiscarded(id int64, finalizedAt time.Time, errData []byte) *JobSetStateIfRunningParams { + return &JobSetStateIfRunningParams{ID: id, errData: errData, finalizedAt: &finalizedAt, state: dbsqlc.JobStateDiscarded} } -func (a *StandardAdapter) JobSetErroredIfRunning(ctx context.Context, id int64, scheduledAt time.Time, err []byte) (*dbsqlc.RiverJob, error) { - ctx, cancel := context.WithTimeout(ctx, a.deadlineTimeout) - defer cancel() +func JobSetStateErrored(id int64, scheduledAt time.Time, errData []byte) *JobSetStateIfRunningParams { + return &JobSetStateIfRunningParams{ID: id, errData: errData, scheduledAt: &scheduledAt, state: dbsqlc.JobStateRetryable} +} - return a.queries.JobSetErroredIfRunning(ctx, a.executor, dbsqlc.JobSetErroredIfRunningParams{ - ID: id, - Error: err, - ScheduledAt: scheduledAt, - }) +func JobSetStateSnoozed(id int64, scheduledAt time.Time, maxAttempts int) *JobSetStateIfRunningParams { + return &JobSetStateIfRunningParams{ID: id, maxAttempts: &maxAttempts, scheduledAt: &scheduledAt, state: dbsqlc.JobStateScheduled} } -func (a *StandardAdapter) JobSetSnoozedIfRunning(ctx context.Context, id int64, scheduledAt time.Time) (*dbsqlc.RiverJob, error) { +func (a *StandardAdapter) JobSetStateIfRunning(ctx context.Context, params *JobSetStateIfRunningParams) (*dbsqlc.RiverJob, error) { ctx, cancel := context.WithTimeout(ctx, a.deadlineTimeout) defer cancel() - return a.queries.JobSetSnoozedIfRunning(ctx, a.executor, dbsqlc.JobSetSnoozedIfRunningParams{ - ID: id, - ScheduledAt: scheduledAt, + return a.queries.JobSetStateIfRunning(ctx, a.executor, dbsqlc.JobSetStateIfRunningParams{ + ID: params.ID, + ErrorDoUpdate: params.errData != nil, + Error: params.errData, + FinalizedAtDoUpdate: params.finalizedAt != nil, + FinalizedAt: params.finalizedAt, + MaxAttemptsUpdate: params.maxAttempts != nil, + MaxAttempts: int16(ptrutil.ValOrDefault(params.maxAttempts, 0)), // default never used + ScheduledAtDoUpdate: params.scheduledAt != nil, + ScheduledAt: ptrutil.ValOrDefault(params.scheduledAt, time.Time{}), // default never used + State: params.state, }) } diff --git a/internal/dbadapter/db_adapter_test.go b/internal/dbadapter/db_adapter_test.go index 24fb8f39..145a7f6e 100644 --- a/internal/dbadapter/db_adapter_test.go +++ b/internal/dbadapter/db_adapter_test.go @@ -495,7 +495,7 @@ func Test_StandardAdapter_FetchIsPrioritized(t *testing.T) { require.Equal(t, int16(3), jobs[0].Priority, "expected final job to have priority 2") } -func Test_StandardAdapter_JobSetCompletedIfRunning(t *testing.T) { +func Test_StandardAdapter_JobSetStateCompleted(t *testing.T) { t.Parallel() ctx := context.Background() @@ -535,7 +535,7 @@ func Test_StandardAdapter_JobSetCompletedIfRunning(t *testing.T) { require.NoError(t, err) require.Equal(t, dbsqlc.JobStateRunning, res.Job.State) - jAfter, err := adapter.JobSetCompletedIfRunning(ctx, JobToComplete{ID: res.Job.ID, FinalizedAt: bundle.baselineTime}) + jAfter, err := adapter.JobSetStateIfRunning(ctx, JobSetStateCompleted(res.Job.ID, bundle.baselineTime)) require.NoError(t, err) require.Equal(t, dbsqlc.JobStateCompleted, jAfter.State) require.WithinDuration(t, bundle.baselineTime, *jAfter.FinalizedAt, time.Microsecond) @@ -556,7 +556,7 @@ func Test_StandardAdapter_JobSetCompletedIfRunning(t *testing.T) { require.NoError(t, err) require.Equal(t, dbsqlc.JobStateRetryable, res.Job.State) - jAfter, err := adapter.JobSetCompletedIfRunning(ctx, JobToComplete{ID: res.Job.ID, FinalizedAt: bundle.baselineTime}) + jAfter, err := adapter.JobSetStateIfRunning(ctx, JobSetStateCompleted(res.Job.ID, bundle.baselineTime)) require.NoError(t, err) require.Equal(t, dbsqlc.JobStateRetryable, jAfter.State) require.Nil(t, jAfter.FinalizedAt) @@ -567,34 +567,7 @@ func Test_StandardAdapter_JobSetCompletedIfRunning(t *testing.T) { }) } -func Test_StandardAdapter_JobCompleteMany(t *testing.T) { - t.Parallel() - - ctx := context.Background() - tx := riverinternaltest.TestTx(ctx, t) - - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - - adapter := NewStandardAdapter(riverinternaltest.BaseServiceArchetype(t), testAdapterConfig(tx)) - now := time.Now() - jobsToComplete := make([]JobToComplete, 3) - for i := 0; i < len(jobsToComplete); i++ { - res, err := adapter.JobInsert(ctx, makeFakeJobInsertParams(i)) - require.NoError(t, err) - jobsToComplete[i] = JobToComplete{ID: res.Job.ID, FinalizedAt: now.Add(time.Duration(i) * time.Second)} - } - - require.NoError(t, adapter.JobCompleteMany(ctx, jobsToComplete...)) - for i := 0; i < len(jobsToComplete); i++ { - job, err := adapter.queries.JobGetByID(ctx, tx, jobsToComplete[i].ID) - require.NoError(t, err) - require.Equal(t, dbsqlc.JobStateCompleted, job.State) - require.WithinDuration(t, jobsToComplete[i].FinalizedAt, *job.FinalizedAt, time.Microsecond) - } -} - -func Test_StandardAdapter_JobSetErroredIfRunning(t *testing.T) { +func Test_StandardAdapter_JobSetStateErrored(t *testing.T) { t.Parallel() ctx := context.Background() @@ -643,7 +616,7 @@ func Test_StandardAdapter_JobSetErroredIfRunning(t *testing.T) { require.NoError(t, err) require.Equal(t, dbsqlc.JobStateRunning, res.Job.State) - jAfter, err := adapter.JobSetErroredIfRunning(ctx, res.Job.ID, bundle.baselineTime, bundle.errPayload) + jAfter, err := adapter.JobSetStateIfRunning(ctx, JobSetStateErrored(res.Job.ID, bundle.baselineTime, bundle.errPayload)) require.NoError(t, err) require.Equal(t, dbsqlc.JobStateRetryable, jAfter.State) require.WithinDuration(t, bundle.baselineTime, jAfter.ScheduledAt, time.Microsecond) @@ -672,7 +645,7 @@ func Test_StandardAdapter_JobSetErroredIfRunning(t *testing.T) { require.NoError(t, err) require.Equal(t, dbsqlc.JobStateRetryable, res.Job.State) - jAfter, err := adapter.JobSetErroredIfRunning(ctx, res.Job.ID, bundle.baselineTime, bundle.errPayload) + jAfter, err := adapter.JobSetStateIfRunning(ctx, JobSetStateErrored(res.Job.ID, bundle.baselineTime, bundle.errPayload)) require.NoError(t, err) require.Equal(t, dbsqlc.JobStateRetryable, jAfter.State) require.WithinDuration(t, params.ScheduledAt, jAfter.ScheduledAt, time.Microsecond) diff --git a/internal/dbadaptertest/test_adapter.go b/internal/dbadaptertest/test_adapter.go index e5449764..1a7e40e4 100644 --- a/internal/dbadaptertest/test_adapter.go +++ b/internal/dbadaptertest/test_adapter.go @@ -18,59 +18,25 @@ type TestAdapter struct { fallthroughAdapter dbadapter.Adapter mu sync.Mutex - JobCompleteManyCalled bool - JobCompleteManyTxCalled bool - JobInsertCalled bool - JobInsertTxCalled bool - JobInsertManyCalled bool - JobInsertManyTxCalled bool - JobGetAvailableCalled bool - JobGetAvailableTxCalled bool - JobSetCancelledIfRunningCalled bool - JobSetCompletedIfRunningCalled bool - JobSetCompletedTxCalled bool - JobSetDiscardedIfRunningCalled bool - JobSetErroredIfRunningCalled bool - JobSetSnoozedIfRunningCalled bool - LeadershipAttemptElectCalled bool - LeadershipResignedCalled bool - - JobCompleteManyFunc func(ctx context.Context, jobs ...dbadapter.JobToComplete) error - JobCompleteManyTxFunc func(ctx context.Context, tx pgx.Tx, jobs ...dbadapter.JobToComplete) error - JobInsertFunc func(ctx context.Context, params *dbadapter.JobInsertParams) (*dbadapter.JobInsertResult, error) - JobInsertTxFunc func(ctx context.Context, tx pgx.Tx, params *dbadapter.JobInsertParams) (*dbadapter.JobInsertResult, error) - JobInsertManyFunc func(ctx context.Context, params []*dbadapter.JobInsertParams) (int64, error) - JobInsertManyTxFunc func(ctx context.Context, tx pgx.Tx, params []*dbadapter.JobInsertParams) (int64, error) - JobGetAvailableFunc func(ctx context.Context, queueName string, limit int32) ([]*dbsqlc.RiverJob, error) - JobGetAvailableTxFunc func(ctx context.Context, tx pgx.Tx, queueName string, limit int32) ([]*dbsqlc.RiverJob, error) - JobSetCancelledIfRunningFunc func(ctx context.Context, id int64, finalizedAt time.Time, err []byte) (*dbsqlc.RiverJob, error) - JobSetCompletedIfRunningFunc func(ctx context.Context, job dbadapter.JobToComplete) (*dbsqlc.RiverJob, error) - JobSetCompletedTxFunc func(ctx context.Context, tx pgx.Tx, id int64, completedAt time.Time) (*dbsqlc.RiverJob, error) - JobSetDiscardedIfRunningFunc func(ctx context.Context, id int64, finalizedAt time.Time, err []byte) (*dbsqlc.RiverJob, error) - JobSetErroredIfRunningFunc func(ctx context.Context, id int64, scheduledAt time.Time, err []byte) (*dbsqlc.RiverJob, error) - JobSetSnoozedIfRunningFunc func(ctx context.Context, id int64, scheduledAt time.Time) (*dbsqlc.RiverJob, error) - LeadershipAttemptElectFunc func(ctx context.Context) (bool, error) - LeadershipResignFunc func(ctx context.Context, name string, leaderID string) error -} - -func (ta *TestAdapter) JobCompleteMany(ctx context.Context, jobs ...dbadapter.JobToComplete) error { - ta.atomicSetBoolTrue(&ta.JobCompleteManyCalled) - - if ta.JobCompleteManyFunc != nil { - return ta.JobCompleteManyFunc(ctx, jobs...) - } - - return ta.fallthroughAdapter.JobCompleteMany(ctx, jobs...) -} - -func (ta *TestAdapter) JobCompleteManyTx(ctx context.Context, tx pgx.Tx, jobs ...dbadapter.JobToComplete) error { - ta.atomicSetBoolTrue(&ta.JobCompleteManyTxCalled) - - if ta.JobCompleteManyTxFunc != nil { - return ta.JobCompleteManyTxFunc(ctx, tx, jobs...) - } - - return ta.fallthroughAdapter.JobCompleteManyTx(ctx, tx, jobs...) + JobInsertCalled bool + JobInsertTxCalled bool + JobInsertManyCalled bool + JobInsertManyTxCalled bool + JobGetAvailableCalled bool + JobGetAvailableTxCalled bool + JobSetStateIfRunningCalled bool + LeadershipAttemptElectCalled bool + LeadershipResignedCalled bool + + JobInsertFunc func(ctx context.Context, params *dbadapter.JobInsertParams) (*dbadapter.JobInsertResult, error) + JobInsertTxFunc func(ctx context.Context, tx pgx.Tx, params *dbadapter.JobInsertParams) (*dbadapter.JobInsertResult, error) + JobInsertManyFunc func(ctx context.Context, params []*dbadapter.JobInsertParams) (int64, error) + JobInsertManyTxFunc func(ctx context.Context, tx pgx.Tx, params []*dbadapter.JobInsertParams) (int64, error) + JobGetAvailableFunc func(ctx context.Context, queueName string, limit int32) ([]*dbsqlc.RiverJob, error) + JobGetAvailableTxFunc func(ctx context.Context, tx pgx.Tx, queueName string, limit int32) ([]*dbsqlc.RiverJob, error) + JobSetStateIfRunningFunc func(ctx context.Context, params *dbadapter.JobSetStateIfRunningParams) (*dbsqlc.RiverJob, error) + LeadershipAttemptElectFunc func(ctx context.Context) (bool, error) + LeadershipResignFunc func(ctx context.Context, name string, leaderID string) error } func (ta *TestAdapter) JobInsert(ctx context.Context, params *dbadapter.JobInsertParams) (*dbadapter.JobInsertResult, error) { @@ -133,64 +99,14 @@ func (ta *TestAdapter) JobGetAvailableTx(ctx context.Context, tx pgx.Tx, queueNa return ta.fallthroughAdapter.JobGetAvailableTx(ctx, tx, queueName, limit) } -func (ta *TestAdapter) JobSetCancelledIfRunning(ctx context.Context, id int64, finalizedAt time.Time, err []byte) (*dbsqlc.RiverJob, error) { - ta.atomicSetBoolTrue(&ta.JobSetCancelledIfRunningCalled) - - if ta.JobSetCancelledIfRunningFunc != nil { - return ta.JobSetCancelledIfRunningFunc(ctx, id, finalizedAt, err) - } - - return ta.fallthroughAdapter.JobSetCancelledIfRunning(ctx, id, finalizedAt, err) -} - -func (ta *TestAdapter) JobSetCompletedIfRunning(ctx context.Context, job dbadapter.JobToComplete) (*dbsqlc.RiverJob, error) { - ta.atomicSetBoolTrue(&ta.JobSetCompletedIfRunningCalled) - - if ta.JobSetCompletedIfRunningFunc != nil { - return ta.JobSetCompletedIfRunningFunc(ctx, job) - } - - return ta.fallthroughAdapter.JobSetCompletedIfRunning(ctx, job) -} - -func (ta *TestAdapter) JobSetCompletedTx(ctx context.Context, tx pgx.Tx, id int64, completedAt time.Time) (*dbsqlc.RiverJob, error) { - ta.atomicSetBoolTrue(&ta.JobSetCompletedTxCalled) - - if ta.JobSetCompletedTxFunc != nil { - return ta.JobSetCompletedTxFunc(ctx, tx, id, completedAt) - } - - return ta.fallthroughAdapter.JobSetCompletedTx(ctx, tx, id, completedAt) -} - -func (ta *TestAdapter) JobSetDiscardedIfRunning(ctx context.Context, id int64, finalizedAt time.Time, err []byte) (*dbsqlc.RiverJob, error) { - ta.atomicSetBoolTrue(&ta.JobSetDiscardedIfRunningCalled) - - if ta.JobSetDiscardedIfRunningFunc != nil { - return ta.JobSetDiscardedIfRunningFunc(ctx, id, finalizedAt, err) - } - - return ta.fallthroughAdapter.JobSetDiscardedIfRunning(ctx, id, finalizedAt, err) -} - -func (ta *TestAdapter) JobSetErroredIfRunning(ctx context.Context, id int64, scheduledAt time.Time, err []byte) (*dbsqlc.RiverJob, error) { - ta.atomicSetBoolTrue(&ta.JobSetErroredIfRunningCalled) - - if ta.JobSetErroredIfRunningFunc != nil { - return ta.JobSetErroredIfRunningFunc(ctx, id, scheduledAt, err) - } - - return ta.fallthroughAdapter.JobSetErroredIfRunning(ctx, id, scheduledAt, err) -} - -func (ta *TestAdapter) JobSetSnoozedIfRunning(ctx context.Context, id int64, scheduledAt time.Time) (*dbsqlc.RiverJob, error) { - ta.atomicSetBoolTrue(&ta.JobSetSnoozedIfRunningCalled) +func (ta *TestAdapter) JobSetStateIfRunning(ctx context.Context, params *dbadapter.JobSetStateIfRunningParams) (*dbsqlc.RiverJob, error) { + ta.atomicSetBoolTrue(&ta.JobSetStateIfRunningCalled) - if ta.JobSetErroredIfRunningFunc != nil { - return ta.JobSetSnoozedIfRunningFunc(ctx, id, scheduledAt) + if ta.JobSetStateIfRunningFunc != nil { + return ta.JobSetStateIfRunningFunc(ctx, params) } - return ta.fallthroughAdapter.JobSetSnoozedIfRunning(ctx, id, scheduledAt) + return ta.fallthroughAdapter.JobSetStateIfRunning(ctx, params) } func (ta *TestAdapter) LeadershipAttemptElect(ctx context.Context, alreadyElected bool, name, leaderID string, ttl time.Duration) (bool, error) { diff --git a/internal/dbsqlc/river_job.sql b/internal/dbsqlc/river_job.sql index 8f9d18ca..814cadfb 100644 --- a/internal/dbsqlc/river_job.sql +++ b/internal/dbsqlc/river_job.sql @@ -31,19 +31,6 @@ CREATE TABLE river_job( CONSTRAINT kind_length CHECK (char_length(kind) > 0 AND char_length(kind) < 128) ); --- name: JobCompleteMany :exec -UPDATE river_job -SET - finalized_at = updated.finalized_at, - state = updated.state -FROM ( - SELECT - unnest(@id::bigint[]) AS id, - unnest(@finalized_at::timestamptz[]) AS finalized_at, - 'completed'::river_job_state AS state -) AS updated -WHERE river_job.id = updated.id; - -- name: JobCountRunning :one SELECT count(*) @@ -238,226 +225,42 @@ FROM ( SELECT pg_notify('river_insert', json_build_object('queue', queue)::text) FROM river_job_scheduled) AS notifications_sent; --- name: JobSetCancelledIfRunning :one -WITH job_to_update AS ( - SELECT - id, - finalized_at, - state - FROM - river_job - WHERE - id = @id::bigint - FOR UPDATE -), -updated_job AS ( - UPDATE - river_job - SET - finalized_at = @finalized_at::timestamptz, - errors = array_append(errors, @error::jsonb), - state = 'cancelled'::river_job_state - FROM - job_to_update - WHERE - river_job.id = job_to_update.id - AND river_job.state = 'running'::river_job_state - RETURNING river_job.* -) -( - SELECT - river_job.* - FROM - river_job - WHERE - river_job.id = @id::bigint - UNION - SELECT - updated_job.* - FROM - updated_job -) ORDER BY finalized_at DESC NULLS LAST LIMIT 1; - --- name: JobSetCompleted :one -UPDATE - river_job -SET - finalized_at = @finalized_at::timestamptz, - state = 'completed'::river_job_state -WHERE - id = @id::bigint -RETURNING *; - --- name: JobSetCompletedIfRunning :one -WITH job_to_update AS ( - SELECT - id, - finalized_at, - state - FROM - river_job - WHERE - id = @id::bigint - FOR UPDATE -), -updated_job AS ( - UPDATE - river_job - SET - finalized_at = @finalized_at::timestamptz, - state = 'completed'::river_job_state - FROM - job_to_update - WHERE - river_job.id = job_to_update.id - AND river_job.state = 'running'::river_job_state - RETURNING river_job.* -) -( - SELECT - river_job.* - FROM - river_job - WHERE - river_job.id = @id::bigint - UNION - SELECT - updated_job.* - FROM - updated_job -) ORDER BY finalized_at DESC NULLS LAST LIMIT 1; - --- name: JobSetDiscarded :one -UPDATE - river_job -SET - finalized_at = @finalized_at::timestamptz, - errors = array_append(errors, @error::jsonb), - state = 'discarded'::river_job_state -WHERE - id = @id::bigint +-- name: JobSetState :one +UPDATE river_job +SET errors = CASE WHEN @error_do_update::boolean THEN array_append(errors, @error::jsonb) ELSE errors END, + finalized_at = CASE WHEN @finalized_at_do_update::boolean THEN @finalized_at ELSE finalized_at END, + max_attempts = CASE WHEN @max_attempts_update::boolean THEN @max_attempts ELSE max_attempts END, + scheduled_at = CASE WHEN @scheduled_at_do_update::boolean THEN @scheduled_at ELSE scheduled_at END, + state = @state +WHERE id = @id RETURNING *; --- name: JobSetDiscardedIfRunning :one +-- name: JobSetStateIfRunning :one WITH job_to_update AS ( - SELECT - id, - finalized_at, - state - FROM - river_job - WHERE - id = @id::bigint - FOR UPDATE -), -updated_job AS ( - UPDATE - river_job - SET - finalized_at = @finalized_at::timestamptz, - errors = array_append(errors, @error::jsonb), - state = 'discarded'::river_job_state - FROM - job_to_update - WHERE - river_job.id = job_to_update.id - AND river_job.state = 'running'::river_job_state - RETURNING river_job.* -) -( - SELECT - river_job.* - FROM - river_job - WHERE - river_job.id = @id::bigint - UNION - SELECT - updated_job.* - FROM - updated_job -) ORDER BY finalized_at DESC NULLS LAST LIMIT 1; - --- name: JobSetErroredIfRunning :one -WITH job_to_update AS ( - SELECT - id, - finalized_at, - state - FROM - river_job - WHERE - id = @id::bigint - FOR UPDATE -), -updated_job AS ( - UPDATE - river_job - SET - scheduled_at = @scheduled_at::timestamptz, - errors = array_append(errors, @error::jsonb), - state = 'retryable'::river_job_state - FROM - job_to_update - WHERE - river_job.id = job_to_update.id - AND river_job.state = 'running'::river_job_state - RETURNING river_job.* -) -( - SELECT - river_job.* - FROM - river_job - WHERE - river_job.id = @id::bigint - UNION - SELECT - updated_job.* - FROM - updated_job -) ORDER BY scheduled_at DESC NULLS LAST LIMIT 1; - --- name: JobSetSnoozedIfRunning :one -WITH job_to_update AS ( - SELECT - id, - finalized_at, - state - FROM - river_job - WHERE - id = @id::bigint - FOR UPDATE + SELECT id + FROM river_job + WHERE id = @id::bigint + FOR UPDATE ), updated_job AS ( - UPDATE - river_job - SET - scheduled_at = @scheduled_at::timestamptz, - state = 'scheduled'::river_job_state, - max_attempts = max_attempts + 1 - FROM - job_to_update - WHERE - river_job.id = job_to_update.id - AND river_job.state = 'running'::river_job_state - RETURNING river_job.* + UPDATE river_job + SET errors = CASE WHEN @error_do_update::boolean THEN array_append(errors, @error::jsonb) ELSE errors END, + finalized_at = CASE WHEN @finalized_at_do_update::boolean THEN @finalized_at ELSE finalized_at END, + max_attempts = CASE WHEN @max_attempts_update::boolean THEN @max_attempts ELSE max_attempts END, + scheduled_at = CASE WHEN @scheduled_at_do_update::boolean THEN @scheduled_at ELSE scheduled_at END, + state = @state + FROM job_to_update + WHERE river_job.id = job_to_update.id + AND river_job.state = 'running'::river_job_state + RETURNING river_job.* ) -( - SELECT - river_job.* - FROM - river_job - WHERE - river_job.id = @id::bigint - UNION - SELECT - updated_job.* - FROM - updated_job -) ORDER BY scheduled_at DESC NULLS LAST LIMIT 1; - +SELECT * +FROM river_job +WHERE id = @id::bigint + AND id NOT IN (SELECT id FROM updated_job) +UNION +SELECT * +FROM updated_job; -- Run by the rescuer to queue for retry or discard depending on job state. -- name: JobRescueMany :exec diff --git a/internal/dbsqlc/river_job.sql.go b/internal/dbsqlc/river_job.sql.go index 1c758f76..83edf3c3 100644 --- a/internal/dbsqlc/river_job.sql.go +++ b/internal/dbsqlc/river_job.sql.go @@ -10,30 +10,6 @@ import ( "time" ) -const jobCompleteMany = `-- name: JobCompleteMany :exec -UPDATE river_job -SET - finalized_at = updated.finalized_at, - state = updated.state -FROM ( - SELECT - unnest($1::bigint[]) AS id, - unnest($2::timestamptz[]) AS finalized_at, - 'completed'::river_job_state AS state -) AS updated -WHERE river_job.id = updated.id -` - -type JobCompleteManyParams struct { - ID []int64 - FinalizedAt []time.Time -} - -func (q *Queries) JobCompleteMany(ctx context.Context, db DBTX, arg JobCompleteManyParams) error { - _, err := db.Exec(ctx, jobCompleteMany, arg.ID, arg.FinalizedAt) - return err -} - const jobCountRunning = `-- name: JobCountRunning :one SELECT count(*) @@ -628,277 +604,43 @@ func (q *Queries) JobSchedule(ctx context.Context, db DBTX, arg JobScheduleParam return count, err } -const jobSetCancelledIfRunning = `-- name: JobSetCancelledIfRunning :one -WITH job_to_update AS ( - SELECT - id, - finalized_at, - state - FROM - river_job - WHERE - id = $1::bigint - FOR UPDATE -), -updated_job AS ( - UPDATE - river_job - SET - finalized_at = $2::timestamptz, - errors = array_append(errors, $3::jsonb), - state = 'cancelled'::river_job_state - FROM - job_to_update - WHERE - river_job.id = job_to_update.id - AND river_job.state = 'running'::river_job_state - RETURNING river_job.id, river_job.args, river_job.attempt, river_job.attempted_at, river_job.attempted_by, river_job.created_at, river_job.errors, river_job.finalized_at, river_job.kind, river_job.max_attempts, river_job.metadata, river_job.priority, river_job.queue, river_job.state, river_job.scheduled_at, river_job.tags -) -( - SELECT - river_job.id, river_job.args, river_job.attempt, river_job.attempted_at, river_job.attempted_by, river_job.created_at, river_job.errors, river_job.finalized_at, river_job.kind, river_job.max_attempts, river_job.metadata, river_job.priority, river_job.queue, river_job.state, river_job.scheduled_at, river_job.tags - FROM - river_job - WHERE - river_job.id = $1::bigint - UNION - SELECT - updated_job.id, updated_job.args, updated_job.attempt, updated_job.attempted_at, updated_job.attempted_by, updated_job.created_at, updated_job.errors, updated_job.finalized_at, updated_job.kind, updated_job.max_attempts, updated_job.metadata, updated_job.priority, updated_job.queue, updated_job.state, updated_job.scheduled_at, updated_job.tags - FROM - updated_job -) ORDER BY finalized_at DESC NULLS LAST LIMIT 1 -` - -type JobSetCancelledIfRunningParams struct { - ID int64 - FinalizedAt time.Time - Error []byte -} - -func (q *Queries) JobSetCancelledIfRunning(ctx context.Context, db DBTX, arg JobSetCancelledIfRunningParams) (*RiverJob, error) { - row := db.QueryRow(ctx, jobSetCancelledIfRunning, arg.ID, arg.FinalizedAt, arg.Error) - var i RiverJob - err := row.Scan( - &i.ID, - &i.Args, - &i.Attempt, - &i.AttemptedAt, - &i.AttemptedBy, - &i.CreatedAt, - &i.Errors, - &i.FinalizedAt, - &i.Kind, - &i.MaxAttempts, - &i.Metadata, - &i.Priority, - &i.Queue, - &i.State, - &i.ScheduledAt, - &i.Tags, - ) - return &i, err -} - -const jobSetCompleted = `-- name: JobSetCompleted :one -UPDATE - river_job -SET - finalized_at = $1::timestamptz, - state = 'completed'::river_job_state -WHERE - id = $2::bigint -RETURNING id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags -` - -type JobSetCompletedParams struct { - FinalizedAt time.Time - ID int64 -} - -func (q *Queries) JobSetCompleted(ctx context.Context, db DBTX, arg JobSetCompletedParams) (*RiverJob, error) { - row := db.QueryRow(ctx, jobSetCompleted, arg.FinalizedAt, arg.ID) - var i RiverJob - err := row.Scan( - &i.ID, - &i.Args, - &i.Attempt, - &i.AttemptedAt, - &i.AttemptedBy, - &i.CreatedAt, - &i.Errors, - &i.FinalizedAt, - &i.Kind, - &i.MaxAttempts, - &i.Metadata, - &i.Priority, - &i.Queue, - &i.State, - &i.ScheduledAt, - &i.Tags, - ) - return &i, err -} - -const jobSetCompletedIfRunning = `-- name: JobSetCompletedIfRunning :one -WITH job_to_update AS ( - SELECT - id, - finalized_at, - state - FROM - river_job - WHERE - id = $1::bigint - FOR UPDATE -), -updated_job AS ( - UPDATE - river_job - SET - finalized_at = $2::timestamptz, - state = 'completed'::river_job_state - FROM - job_to_update - WHERE - river_job.id = job_to_update.id - AND river_job.state = 'running'::river_job_state - RETURNING river_job.id, river_job.args, river_job.attempt, river_job.attempted_at, river_job.attempted_by, river_job.created_at, river_job.errors, river_job.finalized_at, river_job.kind, river_job.max_attempts, river_job.metadata, river_job.priority, river_job.queue, river_job.state, river_job.scheduled_at, river_job.tags -) -( - SELECT - river_job.id, river_job.args, river_job.attempt, river_job.attempted_at, river_job.attempted_by, river_job.created_at, river_job.errors, river_job.finalized_at, river_job.kind, river_job.max_attempts, river_job.metadata, river_job.priority, river_job.queue, river_job.state, river_job.scheduled_at, river_job.tags - FROM - river_job - WHERE - river_job.id = $1::bigint - UNION - SELECT - updated_job.id, updated_job.args, updated_job.attempt, updated_job.attempted_at, updated_job.attempted_by, updated_job.created_at, updated_job.errors, updated_job.finalized_at, updated_job.kind, updated_job.max_attempts, updated_job.metadata, updated_job.priority, updated_job.queue, updated_job.state, updated_job.scheduled_at, updated_job.tags - FROM - updated_job -) ORDER BY finalized_at DESC NULLS LAST LIMIT 1 -` - -type JobSetCompletedIfRunningParams struct { - ID int64 - FinalizedAt time.Time -} - -func (q *Queries) JobSetCompletedIfRunning(ctx context.Context, db DBTX, arg JobSetCompletedIfRunningParams) (*RiverJob, error) { - row := db.QueryRow(ctx, jobSetCompletedIfRunning, arg.ID, arg.FinalizedAt) - var i RiverJob - err := row.Scan( - &i.ID, - &i.Args, - &i.Attempt, - &i.AttemptedAt, - &i.AttemptedBy, - &i.CreatedAt, - &i.Errors, - &i.FinalizedAt, - &i.Kind, - &i.MaxAttempts, - &i.Metadata, - &i.Priority, - &i.Queue, - &i.State, - &i.ScheduledAt, - &i.Tags, - ) - return &i, err -} - -const jobSetDiscarded = `-- name: JobSetDiscarded :one -UPDATE - river_job -SET - finalized_at = $1::timestamptz, - errors = array_append(errors, $2::jsonb), - state = 'discarded'::river_job_state -WHERE - id = $3::bigint +const jobSetState = `-- name: JobSetState :one +UPDATE river_job +SET errors = CASE WHEN $1::boolean THEN array_append(errors, $2::jsonb) ELSE errors END, + finalized_at = CASE WHEN $3::boolean THEN $4 ELSE finalized_at END, + max_attempts = CASE WHEN $5::boolean THEN $6 ELSE max_attempts END, + scheduled_at = CASE WHEN $7::boolean THEN $8 ELSE scheduled_at END, + state = $9 +WHERE id = $10 RETURNING id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags ` -type JobSetDiscardedParams struct { - FinalizedAt time.Time - Error []byte - ID int64 +type JobSetStateParams struct { + ErrorDoUpdate bool + Error []byte + FinalizedAtDoUpdate bool + FinalizedAt *time.Time + MaxAttemptsUpdate bool + MaxAttempts int16 + ScheduledAtDoUpdate bool + ScheduledAt time.Time + State JobState + ID int64 } -func (q *Queries) JobSetDiscarded(ctx context.Context, db DBTX, arg JobSetDiscardedParams) (*RiverJob, error) { - row := db.QueryRow(ctx, jobSetDiscarded, arg.FinalizedAt, arg.Error, arg.ID) - var i RiverJob - err := row.Scan( - &i.ID, - &i.Args, - &i.Attempt, - &i.AttemptedAt, - &i.AttemptedBy, - &i.CreatedAt, - &i.Errors, - &i.FinalizedAt, - &i.Kind, - &i.MaxAttempts, - &i.Metadata, - &i.Priority, - &i.Queue, - &i.State, - &i.ScheduledAt, - &i.Tags, +func (q *Queries) JobSetState(ctx context.Context, db DBTX, arg JobSetStateParams) (*RiverJob, error) { + row := db.QueryRow(ctx, jobSetState, + arg.ErrorDoUpdate, + arg.Error, + arg.FinalizedAtDoUpdate, + arg.FinalizedAt, + arg.MaxAttemptsUpdate, + arg.MaxAttempts, + arg.ScheduledAtDoUpdate, + arg.ScheduledAt, + arg.State, + arg.ID, ) - return &i, err -} - -const jobSetDiscardedIfRunning = `-- name: JobSetDiscardedIfRunning :one -WITH job_to_update AS ( - SELECT - id, - finalized_at, - state - FROM - river_job - WHERE - id = $1::bigint - FOR UPDATE -), -updated_job AS ( - UPDATE - river_job - SET - finalized_at = $2::timestamptz, - errors = array_append(errors, $3::jsonb), - state = 'discarded'::river_job_state - FROM - job_to_update - WHERE - river_job.id = job_to_update.id - AND river_job.state = 'running'::river_job_state - RETURNING river_job.id, river_job.args, river_job.attempt, river_job.attempted_at, river_job.attempted_by, river_job.created_at, river_job.errors, river_job.finalized_at, river_job.kind, river_job.max_attempts, river_job.metadata, river_job.priority, river_job.queue, river_job.state, river_job.scheduled_at, river_job.tags -) -( - SELECT - river_job.id, river_job.args, river_job.attempt, river_job.attempted_at, river_job.attempted_by, river_job.created_at, river_job.errors, river_job.finalized_at, river_job.kind, river_job.max_attempts, river_job.metadata, river_job.priority, river_job.queue, river_job.state, river_job.scheduled_at, river_job.tags - FROM - river_job - WHERE - river_job.id = $1::bigint - UNION - SELECT - updated_job.id, updated_job.args, updated_job.attempt, updated_job.attempted_at, updated_job.attempted_by, updated_job.created_at, updated_job.errors, updated_job.finalized_at, updated_job.kind, updated_job.max_attempts, updated_job.metadata, updated_job.priority, updated_job.queue, updated_job.state, updated_job.scheduled_at, updated_job.tags - FROM - updated_job -) ORDER BY finalized_at DESC NULLS LAST LIMIT 1 -` - -type JobSetDiscardedIfRunningParams struct { - ID int64 - FinalizedAt time.Time - Error []byte -} - -func (q *Queries) JobSetDiscardedIfRunning(ctx context.Context, db DBTX, arg JobSetDiscardedIfRunningParams) (*RiverJob, error) { - row := db.QueryRow(ctx, jobSetDiscardedIfRunning, arg.ID, arg.FinalizedAt, arg.Error) var i RiverJob err := row.Scan( &i.ID, @@ -921,125 +663,60 @@ func (q *Queries) JobSetDiscardedIfRunning(ctx context.Context, db DBTX, arg Job return &i, err } -const jobSetErroredIfRunning = `-- name: JobSetErroredIfRunning :one +const jobSetStateIfRunning = `-- name: JobSetStateIfRunning :one WITH job_to_update AS ( - SELECT - id, - finalized_at, - state - FROM - river_job - WHERE - id = $1::bigint - FOR UPDATE + SELECT id + FROM river_job + WHERE id = $1::bigint + FOR UPDATE ), updated_job AS ( - UPDATE - river_job - SET - scheduled_at = $2::timestamptz, - errors = array_append(errors, $3::jsonb), - state = 'retryable'::river_job_state - FROM - job_to_update - WHERE - river_job.id = job_to_update.id - AND river_job.state = 'running'::river_job_state - RETURNING river_job.id, river_job.args, river_job.attempt, river_job.attempted_at, river_job.attempted_by, river_job.created_at, river_job.errors, river_job.finalized_at, river_job.kind, river_job.max_attempts, river_job.metadata, river_job.priority, river_job.queue, river_job.state, river_job.scheduled_at, river_job.tags + UPDATE river_job + SET errors = CASE WHEN $2::boolean THEN array_append(errors, $3::jsonb) ELSE errors END, + finalized_at = CASE WHEN $4::boolean THEN $5 ELSE finalized_at END, + max_attempts = CASE WHEN $6::boolean THEN $7 ELSE max_attempts END, + scheduled_at = CASE WHEN $8::boolean THEN $9 ELSE scheduled_at END, + state = $10 + FROM job_to_update + WHERE river_job.id = job_to_update.id + AND river_job.state = 'running'::river_job_state + RETURNING river_job.id, river_job.args, river_job.attempt, river_job.attempted_at, river_job.attempted_by, river_job.created_at, river_job.errors, river_job.finalized_at, river_job.kind, river_job.max_attempts, river_job.metadata, river_job.priority, river_job.queue, river_job.state, river_job.scheduled_at, river_job.tags ) -( - SELECT - river_job.id, river_job.args, river_job.attempt, river_job.attempted_at, river_job.attempted_by, river_job.created_at, river_job.errors, river_job.finalized_at, river_job.kind, river_job.max_attempts, river_job.metadata, river_job.priority, river_job.queue, river_job.state, river_job.scheduled_at, river_job.tags - FROM - river_job - WHERE - river_job.id = $1::bigint - UNION - SELECT - updated_job.id, updated_job.args, updated_job.attempt, updated_job.attempted_at, updated_job.attempted_by, updated_job.created_at, updated_job.errors, updated_job.finalized_at, updated_job.kind, updated_job.max_attempts, updated_job.metadata, updated_job.priority, updated_job.queue, updated_job.state, updated_job.scheduled_at, updated_job.tags - FROM - updated_job -) ORDER BY scheduled_at DESC NULLS LAST LIMIT 1 +SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags +FROM river_job +WHERE id = $1::bigint + AND id NOT IN (SELECT id FROM updated_job) +UNION +SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags +FROM updated_job ` -type JobSetErroredIfRunningParams struct { - ID int64 - ScheduledAt time.Time - Error []byte +type JobSetStateIfRunningParams struct { + ID int64 + ErrorDoUpdate bool + Error []byte + FinalizedAtDoUpdate bool + FinalizedAt *time.Time + MaxAttemptsUpdate bool + MaxAttempts int16 + ScheduledAtDoUpdate bool + ScheduledAt time.Time + State JobState } -func (q *Queries) JobSetErroredIfRunning(ctx context.Context, db DBTX, arg JobSetErroredIfRunningParams) (*RiverJob, error) { - row := db.QueryRow(ctx, jobSetErroredIfRunning, arg.ID, arg.ScheduledAt, arg.Error) - var i RiverJob - err := row.Scan( - &i.ID, - &i.Args, - &i.Attempt, - &i.AttemptedAt, - &i.AttemptedBy, - &i.CreatedAt, - &i.Errors, - &i.FinalizedAt, - &i.Kind, - &i.MaxAttempts, - &i.Metadata, - &i.Priority, - &i.Queue, - &i.State, - &i.ScheduledAt, - &i.Tags, +func (q *Queries) JobSetStateIfRunning(ctx context.Context, db DBTX, arg JobSetStateIfRunningParams) (*RiverJob, error) { + row := db.QueryRow(ctx, jobSetStateIfRunning, + arg.ID, + arg.ErrorDoUpdate, + arg.Error, + arg.FinalizedAtDoUpdate, + arg.FinalizedAt, + arg.MaxAttemptsUpdate, + arg.MaxAttempts, + arg.ScheduledAtDoUpdate, + arg.ScheduledAt, + arg.State, ) - return &i, err -} - -const jobSetSnoozedIfRunning = `-- name: JobSetSnoozedIfRunning :one -WITH job_to_update AS ( - SELECT - id, - finalized_at, - state - FROM - river_job - WHERE - id = $1::bigint - FOR UPDATE -), -updated_job AS ( - UPDATE - river_job - SET - scheduled_at = $2::timestamptz, - state = 'scheduled'::river_job_state, - max_attempts = max_attempts + 1 - FROM - job_to_update - WHERE - river_job.id = job_to_update.id - AND river_job.state = 'running'::river_job_state - RETURNING river_job.id, river_job.args, river_job.attempt, river_job.attempted_at, river_job.attempted_by, river_job.created_at, river_job.errors, river_job.finalized_at, river_job.kind, river_job.max_attempts, river_job.metadata, river_job.priority, river_job.queue, river_job.state, river_job.scheduled_at, river_job.tags -) -( - SELECT - river_job.id, river_job.args, river_job.attempt, river_job.attempted_at, river_job.attempted_by, river_job.created_at, river_job.errors, river_job.finalized_at, river_job.kind, river_job.max_attempts, river_job.metadata, river_job.priority, river_job.queue, river_job.state, river_job.scheduled_at, river_job.tags - FROM - river_job - WHERE - river_job.id = $1::bigint - UNION - SELECT - updated_job.id, updated_job.args, updated_job.attempt, updated_job.attempted_at, updated_job.attempted_by, updated_job.created_at, updated_job.errors, updated_job.finalized_at, updated_job.kind, updated_job.max_attempts, updated_job.metadata, updated_job.priority, updated_job.queue, updated_job.state, updated_job.scheduled_at, updated_job.tags - FROM - updated_job -) ORDER BY scheduled_at DESC NULLS LAST LIMIT 1 -` - -type JobSetSnoozedIfRunningParams struct { - ID int64 - ScheduledAt time.Time -} - -func (q *Queries) JobSetSnoozedIfRunning(ctx context.Context, db DBTX, arg JobSetSnoozedIfRunningParams) (*RiverJob, error) { - row := db.QueryRow(ctx, jobSetSnoozedIfRunning, arg.ID, arg.ScheduledAt) var i RiverJob err := row.Scan( &i.ID, diff --git a/internal/jobcompleter/job_completer.go b/internal/jobcompleter/job_completer.go index d8210b95..4eb35455 100644 --- a/internal/jobcompleter/job_completer.go +++ b/internal/jobcompleter/job_completer.go @@ -16,20 +16,9 @@ import ( ) type JobCompleter interface { - // JobSetCancelled marks a job as cancelled. - JobSetCancelled(id int64, stats *jobstats.JobStatistics, finalizedAt time.Time, errData []byte) error - - // JobSetCompleted marks a job as completed. - JobSetCompleted(id int64, stats *jobstats.JobStatistics, finalizedAt time.Time) error - - // JobSetDiscarded marks a job as discarded. - JobSetDiscarded(id int64, stats *jobstats.JobStatistics, finalizedAt time.Time, errData []byte) error - - // JobSetErrored marks a job as errored (but retryable). - JobSetErrored(id int64, stats *jobstats.JobStatistics, scheduledAt time.Time, errData []byte) error - - // JobSetSnoozed reschedules a job for the future and increments its max attempts. - JobSetSnoozed(id int64, stats *jobstats.JobStatistics, scheduledAt time.Time) error + // JobSetState sets a new state for the given job, as long as it's + // still running (i.e. its state has not changed to something else already). + JobSetStateIfRunning(stats *jobstats.JobStatistics, params *dbadapter.JobSetStateIfRunningParams) error // Subscribe injects a callback which will be invoked whenever a job is // updated. @@ -66,33 +55,9 @@ func NewInlineCompleter(archetype *baseservice.Archetype, adapter dbadapter.Adap }) } -func (c *InlineJobCompleter) JobSetCancelled(id int64, stats *jobstats.JobStatistics, finalizedAt time.Time, errData []byte) error { - return c.doOperation(stats, func(ctx context.Context) (*dbsqlc.RiverJob, error) { - return c.adapter.JobSetCancelledIfRunning(ctx, id, finalizedAt, errData) - }) -} - -func (c *InlineJobCompleter) JobSetCompleted(id int64, stats *jobstats.JobStatistics, finalizedAt time.Time) error { - return c.doOperation(stats, func(ctx context.Context) (*dbsqlc.RiverJob, error) { - return c.adapter.JobSetCompletedIfRunning(ctx, dbadapter.JobToComplete{ID: id, FinalizedAt: finalizedAt}) - }) -} - -func (c *InlineJobCompleter) JobSetDiscarded(id int64, stats *jobstats.JobStatistics, finalizedAt time.Time, errData []byte) error { - return c.doOperation(stats, func(ctx context.Context) (*dbsqlc.RiverJob, error) { - return c.adapter.JobSetDiscardedIfRunning(ctx, id, finalizedAt, errData) - }) -} - -func (c *InlineJobCompleter) JobSetErrored(id int64, stats *jobstats.JobStatistics, scheduledAt time.Time, errData []byte) error { - return c.doOperation(stats, func(ctx context.Context) (*dbsqlc.RiverJob, error) { - return c.adapter.JobSetErroredIfRunning(ctx, id, scheduledAt, errData) - }) -} - -func (c *InlineJobCompleter) JobSetSnoozed(id int64, stats *jobstats.JobStatistics, scheduledAt time.Time) error { +func (c *InlineJobCompleter) JobSetStateIfRunning(stats *jobstats.JobStatistics, params *dbadapter.JobSetStateIfRunningParams) error { return c.doOperation(stats, func(ctx context.Context) (*dbsqlc.RiverJob, error) { - return c.adapter.JobSetSnoozedIfRunning(ctx, id, scheduledAt) + return c.adapter.JobSetStateIfRunning(ctx, params) }) } @@ -154,33 +119,9 @@ func NewAsyncCompleter(archetype *baseservice.Archetype, adapter dbadapter.Adapt }) } -func (c *AsyncJobCompleter) JobSetCancelled(id int64, stats *jobstats.JobStatistics, finalizedAt time.Time, errData []byte) error { - return c.doOperation(stats, func(ctx context.Context) (*dbsqlc.RiverJob, error) { - return c.adapter.JobSetCancelledIfRunning(ctx, id, finalizedAt, errData) - }) -} - -func (c *AsyncJobCompleter) JobSetCompleted(id int64, stats *jobstats.JobStatistics, finalizedAt time.Time) error { - return c.doOperation(stats, func(ctx context.Context) (*dbsqlc.RiverJob, error) { - return c.adapter.JobSetCompletedIfRunning(ctx, dbadapter.JobToComplete{ID: id, FinalizedAt: finalizedAt}) - }) -} - -func (c *AsyncJobCompleter) JobSetDiscarded(id int64, stats *jobstats.JobStatistics, finalizedAt time.Time, errData []byte) error { - return c.doOperation(stats, func(ctx context.Context) (*dbsqlc.RiverJob, error) { - return c.adapter.JobSetDiscardedIfRunning(ctx, id, finalizedAt, errData) - }) -} - -func (c *AsyncJobCompleter) JobSetErrored(id int64, stats *jobstats.JobStatistics, scheduledAt time.Time, errData []byte) error { - return c.doOperation(stats, func(ctx context.Context) (*dbsqlc.RiverJob, error) { - return c.adapter.JobSetErroredIfRunning(ctx, id, scheduledAt, errData) - }) -} - -func (c *AsyncJobCompleter) JobSetSnoozed(id int64, stats *jobstats.JobStatistics, scheduledAt time.Time) error { +func (c *AsyncJobCompleter) JobSetStateIfRunning(stats *jobstats.JobStatistics, params *dbadapter.JobSetStateIfRunningParams) error { return c.doOperation(stats, func(ctx context.Context) (*dbsqlc.RiverJob, error) { - return c.adapter.JobSetSnoozedIfRunning(ctx, id, scheduledAt) + return c.adapter.JobSetStateIfRunning(ctx, params) }) } diff --git a/internal/jobcompleter/job_completer_test.go b/internal/jobcompleter/job_completer_test.go index c6085fdb..abd3b390 100644 --- a/internal/jobcompleter/job_completer_test.go +++ b/internal/jobcompleter/job_completer_test.go @@ -21,8 +21,8 @@ func TestInlineJobCompleter_Complete(t *testing.T) { var attempt int expectedErr := errors.New("an error from the completer") adapter := &dbadaptertest.TestAdapter{ - JobSetCompletedIfRunningFunc: func(ctx context.Context, job dbadapter.JobToComplete) (*dbsqlc.RiverJob, error) { - require.Equal(t, int64(1), job.ID) + JobSetStateIfRunningFunc: func(ctx context.Context, params *dbadapter.JobSetStateIfRunningParams) (*dbsqlc.RiverJob, error) { + require.Equal(t, int64(1), params.ID) attempt++ return nil, expectedErr }, @@ -31,12 +31,12 @@ func TestInlineJobCompleter_Complete(t *testing.T) { completer := NewInlineCompleter(riverinternaltest.BaseServiceArchetype(t), adapter) t.Cleanup(completer.Wait) - err := completer.JobSetCompleted(1, &jobstats.JobStatistics{}, time.Now()) + err := completer.JobSetStateIfRunning(&jobstats.JobStatistics{}, dbadapter.JobSetStateCompleted(1, time.Now())) if !errors.Is(err, expectedErr) { t.Errorf("expected %v, got %v", expectedErr, err) } - require.True(t, adapter.JobSetCompletedIfRunningCalled) + require.True(t, adapter.JobSetStateIfRunningCalled) require.Equal(t, numRetries, attempt) } @@ -76,8 +76,8 @@ func TestAsyncJobCompleter_Complete(t *testing.T) { }() adapter := &dbadaptertest.TestAdapter{ - JobSetCompletedIfRunningFunc: func(ctx context.Context, job dbadapter.JobToComplete) (*dbsqlc.RiverJob, error) { - inputCh <- jobInput{ctx: ctx, jobID: job.ID} + JobSetStateIfRunningFunc: func(ctx context.Context, params *dbadapter.JobSetStateIfRunningParams) (*dbsqlc.RiverJob, error) { + inputCh <- jobInput{ctx: ctx, jobID: params.ID} err := <-resultCh return nil, err }, @@ -87,14 +87,14 @@ func TestAsyncJobCompleter_Complete(t *testing.T) { // launch 4 completions, only 2 can be inline due to the concurrency limit: for i := int64(0); i < 2; i++ { - if err := completer.JobSetCompleted(i, &jobstats.JobStatistics{}, time.Now()); err != nil { + if err := completer.JobSetStateIfRunning(&jobstats.JobStatistics{}, dbadapter.JobSetStateCompleted(i, time.Now())); err != nil { t.Errorf("expected nil err, got %v", err) } } bgCompletionsStarted := make(chan struct{}) go func() { for i := int64(2); i < 4; i++ { - if err := completer.JobSetCompleted(i, &jobstats.JobStatistics{}, time.Now()); err != nil { + if err := completer.JobSetStateIfRunning(&jobstats.JobStatistics{}, dbadapter.JobSetStateCompleted(i, time.Now())); err != nil { t.Errorf("expected nil err, got %v", err) } } @@ -161,7 +161,7 @@ func testCompleterSubscribe(t *testing.T, constructor func(dbadapter.Adapter) Jo t.Helper() adapter := &dbadaptertest.TestAdapter{ - JobSetCompletedIfRunningFunc: func(ctx context.Context, job dbadapter.JobToComplete) (*dbsqlc.RiverJob, error) { + JobSetStateIfRunningFunc: func(ctx context.Context, params *dbadapter.JobSetStateIfRunningParams) (*dbsqlc.RiverJob, error) { return &dbsqlc.RiverJob{ State: dbsqlc.JobStateCompleted, }, nil @@ -176,7 +176,7 @@ func testCompleterSubscribe(t *testing.T, constructor func(dbadapter.Adapter) Jo }) for i := 0; i < 4; i++ { - require.NoError(t, completer.JobSetCompleted(int64(i), &jobstats.JobStatistics{}, time.Now())) + require.NoError(t, completer.JobSetStateIfRunning(&jobstats.JobStatistics{}, dbadapter.JobSetStateCompleted(int64(i), time.Now()))) } completer.Wait() @@ -193,7 +193,7 @@ func testCompleterWait(t *testing.T, constructor func(dbadapter.Adapter) JobComp resultCh := make(chan error) completeStartedCh := make(chan struct{}) adapter := &dbadaptertest.TestAdapter{ - JobSetCompletedIfRunningFunc: func(ctx context.Context, job dbadapter.JobToComplete) (*dbsqlc.RiverJob, error) { + JobSetStateIfRunningFunc: func(ctx context.Context, params *dbadapter.JobSetStateIfRunningParams) (*dbsqlc.RiverJob, error) { completeStartedCh <- struct{}{} err := <-resultCh return nil, err @@ -206,7 +206,7 @@ func testCompleterWait(t *testing.T, constructor func(dbadapter.Adapter) JobComp for i := 0; i < 4; i++ { i := i go func() { - require.NoError(t, completer.JobSetCompleted(int64(i), &jobstats.JobStatistics{}, time.Now())) + require.NoError(t, completer.JobSetStateIfRunning(&jobstats.JobStatistics{}, dbadapter.JobSetStateCompleted(int64(i), time.Now()))) }() <-completeStartedCh // wait for func to actually start } diff --git a/internal/util/ptrutil/ptr_util.go b/internal/util/ptrutil/ptr_util.go index cb11b991..109e8c13 100644 --- a/internal/util/ptrutil/ptr_util.go +++ b/internal/util/ptrutil/ptr_util.go @@ -4,3 +4,21 @@ package ptrutil func Ptr[T any](v T) *T { return &v } + +// ValOrDefault returns the value of the given pointer as long as it's non-nil, +// and the specified default value otherwise. +func ValOrDefault[T any](ptr *T, defaultVal T) T { + if ptr != nil { + return *ptr + } + return defaultVal +} + +// ValOrDefaultFunc returns the value of the given pointer as long as it's +// non-nil, or invokes the given function to produce a default value otherwise. +func ValOrDefaultFunc[T any](ptr *T, defaultFunc func() T) T { + if ptr != nil { + return *ptr + } + return defaultFunc() +} diff --git a/internal/util/ptrutil/ptr_util_test.go b/internal/util/ptrutil/ptr_util_test.go index 460c4450..6557b307 100644 --- a/internal/util/ptrutil/ptr_util_test.go +++ b/internal/util/ptrutil/ptr_util_test.go @@ -19,3 +19,19 @@ func TestPtr(t *testing.T) { require.Equal(t, &v, Ptr(7)) } } + +func TestValOrDefault(t *testing.T) { + t.Parallel() + + val := "val" + require.Equal(t, val, ValOrDefault(&val, "default")) + require.Equal(t, "default", ValOrDefault((*string)(nil), "default")) +} + +func TestValOrDefaultFunc(t *testing.T) { + t.Parallel() + + val := "val" + require.Equal(t, val, ValOrDefaultFunc(&val, func() string { return "default" })) + require.Equal(t, "default", ValOrDefaultFunc((*string)(nil), func() string { return "default" })) +} diff --git a/job.go b/job.go index 8eeeb208..9f9952b6 100644 --- a/job.go +++ b/job.go @@ -7,6 +7,7 @@ import ( "time" "github.com/riverqueue/river/internal/dbsqlc" + "github.com/riverqueue/river/internal/util/ptrutil" "github.com/riverqueue/river/riverdriver" "github.com/riverqueue/river/rivertype" ) @@ -59,7 +60,12 @@ func JobCompleteTx[TDriver riverdriver.Driver[TTx], TTx any, TArgs JobArgs](ctx queries = &dbsqlc.Queries{} ) - internal, err := queries.JobSetCompleted(ctx, driver.UnwrapTx(tx), dbsqlc.JobSetCompletedParams{ID: job.ID, FinalizedAt: time.Now()}) + internal, err := queries.JobSetState(ctx, driver.UnwrapTx(tx), dbsqlc.JobSetStateParams{ + ID: job.ID, + FinalizedAtDoUpdate: true, + FinalizedAt: ptrutil.Ptr(time.Now()), + State: dbsqlc.JobStateCompleted, + }) if err != nil { return nil, err } diff --git a/job_executor.go b/job_executor.go index b2505e3c..dc426b22 100644 --- a/job_executor.go +++ b/job_executor.go @@ -232,7 +232,7 @@ func (e *jobExecutor) reportResult(ctx context.Context, res *jobExecutorResult) slog.Duration("duration", snoozeErr.duration), ) nextAttemptScheduledAt := time.Now().Add(snoozeErr.duration) - if err := e.Completer.JobSetSnoozed(e.JobRow.ID, e.stats, nextAttemptScheduledAt); err != nil { + if err := e.Completer.JobSetStateIfRunning(e.stats, dbadapter.JobSetStateSnoozed(e.JobRow.ID, nextAttemptScheduledAt, e.JobRow.MaxAttempts+1)); err != nil { e.Logger.ErrorContext(ctx, e.Name+": Error snoozing job", slog.Int64("job_id", e.JobRow.ID), ) @@ -245,7 +245,7 @@ func (e *jobExecutor) reportResult(ctx context.Context, res *jobExecutorResult) return } - if err := e.Completer.JobSetCompleted(e.JobRow.ID, e.stats, e.TimeNowUTC()); err != nil { + if err := e.Completer.JobSetStateIfRunning(e.stats, dbadapter.JobSetStateCompleted(e.JobRow.ID, e.TimeNowUTC())); err != nil { e.Logger.ErrorContext(ctx, e.Name+": Error completing job", slog.String("err", err.Error()), slog.Int64("job_id", e.JobRow.ID), @@ -293,15 +293,17 @@ func (e *jobExecutor) reportError(ctx context.Context, res *jobExecutorResult) { return } + now := time.Now() + if cancelJob { - if err := e.Completer.JobSetCancelled(e.JobRow.ID, e.stats, time.Now(), errData); err != nil { + if err := e.Completer.JobSetStateIfRunning(e.stats, dbadapter.JobSetStateCancelled(e.JobRow.ID, now, errData)); err != nil { e.Logger.ErrorContext(ctx, e.Name+": Failed to cancel job and report error", logAttrs...) } return } if e.JobRow.Attempt >= e.JobRow.MaxAttempts { - if err := e.Completer.JobSetDiscarded(e.JobRow.ID, e.stats, time.Now(), errData); err != nil { + if err := e.Completer.JobSetStateIfRunning(e.stats, dbadapter.JobSetStateDiscarded(e.JobRow.ID, now, errData)); err != nil { e.Logger.ErrorContext(ctx, e.Name+": Failed to discard job and report error", logAttrs...) } return @@ -314,7 +316,6 @@ func (e *jobExecutor) reportError(ctx context.Context, res *jobExecutorResult) { if nextRetryScheduledAt.IsZero() { nextRetryScheduledAt = e.ClientRetryPolicy.NextRetry(e.JobRow) } - now := time.Now() if nextRetryScheduledAt.Before(now) { e.Logger.WarnContext(ctx, e.Name+": Retry policy returned invalid next retry before current time; using default retry policy instead", @@ -324,7 +325,7 @@ func (e *jobExecutor) reportError(ctx context.Context, res *jobExecutorResult) { nextRetryScheduledAt = (&DefaultClientRetryPolicy{}).NextRetry(e.JobRow) } - if err := e.Completer.JobSetErrored(e.JobRow.ID, e.stats, nextRetryScheduledAt, errData); err != nil { + if err := e.Completer.JobSetStateIfRunning(e.stats, dbadapter.JobSetStateErrored(e.JobRow.ID, nextRetryScheduledAt, errData)); err != nil { e.Logger.ErrorContext(ctx, e.Name+": Failed to report error for job", logAttrs...) } }