Skip to content

Commit

Permalink
Consolidate on one set state query + adapter/completer function [boil…
Browse files Browse the repository at this point in the history
…erplate reduction] (#101)

This one's a clean up aimed at doing a massive boilerplate reduction in
our sqlc queries, the adapter, and the completer(s). We have a few
current problems:

* Every "set state" query like `JobCompleteIfRunning`, `JobErrorIfRunning`,
  etc. are all separate sqlc queries, and worse yet, all very long ones
  that are all practically identical with very minor but important
  differences that are hard to spot without examining them character by
  character.

* Every one of these queries get exposed through the adapter as a
  separate function that's implemented in the interface, `StandardAdapter`,
  and the mock `TestAdapter`, requiring lots of LOC.

* They're all exposed yet again in the `Completer` interface, along with
  both implementations of `Completer`. Which again, is a lot of LOC.

Here, we show that with some fairly minor tweaks, we can consolidate all
of this state updating into a single query that's no longer than any one
of the originals, and then simplify both the adapter and completer to
model access to it as a general `JobSetState` function with a number of
params constructors that function to produce the necessary parameters
for each type of state of update (complete, error, snooze, etc.).

Total LOCs are reduced by *a lot*. `river_job.sql` just by itself has
40% fewer lines, and with many more removed in the adapters and
completers. This has the effect of a lot less noise, but also makes
refactoring easier because there's less that has to change. Furthermore,
I don't believe there's any significant reduction in API ergonomics or
safety.

It's still not perfect -- there's still way too much scaffolding in the
completers in particular for example, but more improvements to come.
  • Loading branch information
brandur authored Dec 7, 2023
1 parent f736d1d commit 13ecbd4
Show file tree
Hide file tree
Showing 12 changed files with 253 additions and 932 deletions.
18 changes: 13 additions & 5 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1952,7 +1952,12 @@ func Test_Client_JobCompletion(t *testing.T) {
var dbPool *pgxpool.Pool
now := time.Now().UTC()
config := newTestConfig(t, func(ctx context.Context, job *Job[callbackArgs]) error {
_, err := queries.JobSetCompleted(ctx, dbPool, dbsqlc.JobSetCompletedParams{ID: job.ID, FinalizedAt: now})
_, err := queries.JobSetState(ctx, dbPool, dbsqlc.JobSetStateParams{
ID: job.ID,
FinalizedAtDoUpdate: true,
FinalizedAt: &now,
State: dbsqlc.JobStateCompleted,
})
require.NoError(err)
return nil
})
Expand Down Expand Up @@ -2031,10 +2036,13 @@ func Test_Client_JobCompletion(t *testing.T) {
var dbPool *pgxpool.Pool
now := time.Now().UTC()
config := newTestConfig(t, func(ctx context.Context, job *Job[callbackArgs]) error {
_, err := queries.JobSetDiscarded(ctx, dbPool, dbsqlc.JobSetDiscardedParams{
ID: job.ID,
Error: []byte("{\"error\": \"oops\"}"),
FinalizedAt: now,
_, err := queries.JobSetState(ctx, dbPool, dbsqlc.JobSetStateParams{
ID: job.ID,
ErrorDoUpdate: true,
Error: []byte("{\"error\": \"oops\"}"),
FinalizedAtDoUpdate: true,
FinalizedAt: &now,
State: dbsqlc.JobStateDiscarded,
})
require.NoError(err)
return errors.New("oops")
Expand Down
112 changes: 37 additions & 75 deletions internal/dbadapter/db_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,6 @@ type JobInsertResult struct {
// expedience, but this should be converted to a more stable API if Adapter
// would be exported.
type Adapter interface {
JobCompleteMany(ctx context.Context, jobs ...JobToComplete) error
// TODO: should all dbsqlc need to implement this? Or is it a pro feature?
JobCompleteManyTx(ctx context.Context, tx pgx.Tx, jobs ...JobToComplete) error
JobInsert(ctx context.Context, params *JobInsertParams) (*JobInsertResult, error)
JobInsertTx(ctx context.Context, tx pgx.Tx, params *JobInsertParams) (*JobInsertResult, error)

Expand All @@ -94,12 +91,10 @@ type Adapter interface {
JobGetAvailable(ctx context.Context, queueName string, limit int32) ([]*dbsqlc.RiverJob, error)
JobGetAvailableTx(ctx context.Context, tx pgx.Tx, queueName string, limit int32) ([]*dbsqlc.RiverJob, error)

JobSetCancelledIfRunning(ctx context.Context, id int64, cancelledAt time.Time, err []byte) (*dbsqlc.RiverJob, error)
JobSetCompletedIfRunning(ctx context.Context, job JobToComplete) (*dbsqlc.RiverJob, error)
JobSetCompletedTx(ctx context.Context, tx pgx.Tx, id int64, completedAt time.Time) (*dbsqlc.RiverJob, error)
JobSetDiscardedIfRunning(ctx context.Context, id int64, discardedAt time.Time, err []byte) (*dbsqlc.RiverJob, error)
JobSetErroredIfRunning(ctx context.Context, id int64, scheduledAt time.Time, err []byte) (*dbsqlc.RiverJob, error)
JobSetSnoozedIfRunning(ctx context.Context, id int64, scheduledAt time.Time) (*dbsqlc.RiverJob, error)
// JobSetStateIfRunning sets the state of a currently running job. Jobs which are not
// running (i.e. which have already have had their state set to something
// new through an explicit snooze or cancellation), are ignored.
JobSetStateIfRunning(ctx context.Context, params *JobSetStateIfRunningParams) (*dbsqlc.RiverJob, error)

// LeadershipAttemptElect attempts to elect a leader for the given name. The
// bool alreadyElected indicates whether this is a potential reelection of
Expand Down Expand Up @@ -322,26 +317,6 @@ func (a *StandardAdapter) JobInsertManyTx(ctx context.Context, tx pgx.Tx, params
return numInserted, nil
}

func (a *StandardAdapter) JobCompleteMany(ctx context.Context, jobs ...JobToComplete) error {
ctx, cancel := context.WithTimeout(ctx, a.deadlineTimeout)
defer cancel()

return a.queries.JobCompleteMany(ctx, a.executor, dbsqlc.JobCompleteManyParams{
ID: sliceutil.Map(jobs, func(j JobToComplete) int64 { return j.ID }),
FinalizedAt: sliceutil.Map(jobs, func(j JobToComplete) time.Time { return j.FinalizedAt.UTC() }),
})
}

func (a *StandardAdapter) JobCompleteManyTx(ctx context.Context, tx pgx.Tx, jobs ...JobToComplete) error {
ctx, cancel := context.WithTimeout(ctx, a.deadlineTimeout)
defer cancel()

return a.queries.JobCompleteMany(ctx, tx, dbsqlc.JobCompleteManyParams{
ID: sliceutil.Map(jobs, func(j JobToComplete) int64 { return j.ID }),
FinalizedAt: sliceutil.Map(jobs, func(j JobToComplete) time.Time { return j.FinalizedAt.UTC() }),
})
}

func (a *StandardAdapter) JobGetAvailable(ctx context.Context, queueName string, limit int32) ([]*dbsqlc.RiverJob, error) {
ctx, cancel := context.WithTimeout(ctx, a.deadlineTimeout)
defer cancel()
Expand Down Expand Up @@ -377,66 +352,53 @@ func (a *StandardAdapter) JobGetAvailableTx(ctx context.Context, tx pgx.Tx, queu
return jobs, nil
}

func (a *StandardAdapter) JobSetCancelledIfRunning(ctx context.Context, id int64, finalizedAt time.Time, err []byte) (*dbsqlc.RiverJob, error) {
ctx, cancel := context.WithTimeout(ctx, a.deadlineTimeout)
defer cancel()

return a.queries.JobSetCancelledIfRunning(ctx, a.executor, dbsqlc.JobSetCancelledIfRunningParams{
ID: id,
Error: err,
FinalizedAt: finalizedAt,
})
// JobSetStateIfRunningParams are parameters to update the state of a currently running
// job. Use one of the constructors below to ensure a correct combination of
// parameters.
type JobSetStateIfRunningParams struct {
ID int64
errData []byte
finalizedAt *time.Time
maxAttempts *int
scheduledAt *time.Time
state dbsqlc.JobState
}

func (a *StandardAdapter) JobSetCompletedIfRunning(ctx context.Context, job JobToComplete) (*dbsqlc.RiverJob, error) {
ctx, cancel := context.WithTimeout(ctx, a.deadlineTimeout)
defer cancel()

return a.queries.JobSetCompletedIfRunning(ctx, a.executor, dbsqlc.JobSetCompletedIfRunningParams{
ID: job.ID,
FinalizedAt: job.FinalizedAt.UTC(),
})
func JobSetStateCancelled(id int64, finalizedAt time.Time, errData []byte) *JobSetStateIfRunningParams {
return &JobSetStateIfRunningParams{ID: id, errData: errData, finalizedAt: &finalizedAt, state: dbsqlc.JobStateCancelled}
}

func (a *StandardAdapter) JobSetCompletedTx(ctx context.Context, tx pgx.Tx, id int64, completedAt time.Time) (*dbsqlc.RiverJob, error) {
ctx, cancel := context.WithTimeout(ctx, a.deadlineTimeout)
defer cancel()

return a.queries.JobSetCompleted(ctx, tx, dbsqlc.JobSetCompletedParams{
ID: id,
FinalizedAt: completedAt.UTC(),
})
func JobSetStateCompleted(id int64, finalizedAt time.Time) *JobSetStateIfRunningParams {
return &JobSetStateIfRunningParams{ID: id, finalizedAt: &finalizedAt, state: dbsqlc.JobStateCompleted}
}

func (a *StandardAdapter) JobSetDiscardedIfRunning(ctx context.Context, id int64, finalizedAt time.Time, err []byte) (*dbsqlc.RiverJob, error) {
ctx, cancel := context.WithTimeout(ctx, a.deadlineTimeout)
defer cancel()

return a.queries.JobSetDiscardedIfRunning(ctx, a.executor, dbsqlc.JobSetDiscardedIfRunningParams{
ID: id,
Error: err,
FinalizedAt: finalizedAt,
})
func JobSetStateDiscarded(id int64, finalizedAt time.Time, errData []byte) *JobSetStateIfRunningParams {
return &JobSetStateIfRunningParams{ID: id, errData: errData, finalizedAt: &finalizedAt, state: dbsqlc.JobStateDiscarded}
}

func (a *StandardAdapter) JobSetErroredIfRunning(ctx context.Context, id int64, scheduledAt time.Time, err []byte) (*dbsqlc.RiverJob, error) {
ctx, cancel := context.WithTimeout(ctx, a.deadlineTimeout)
defer cancel()
func JobSetStateErrored(id int64, scheduledAt time.Time, errData []byte) *JobSetStateIfRunningParams {
return &JobSetStateIfRunningParams{ID: id, errData: errData, scheduledAt: &scheduledAt, state: dbsqlc.JobStateRetryable}
}

return a.queries.JobSetErroredIfRunning(ctx, a.executor, dbsqlc.JobSetErroredIfRunningParams{
ID: id,
Error: err,
ScheduledAt: scheduledAt,
})
func JobSetStateSnoozed(id int64, scheduledAt time.Time, maxAttempts int) *JobSetStateIfRunningParams {
return &JobSetStateIfRunningParams{ID: id, maxAttempts: &maxAttempts, scheduledAt: &scheduledAt, state: dbsqlc.JobStateScheduled}
}

func (a *StandardAdapter) JobSetSnoozedIfRunning(ctx context.Context, id int64, scheduledAt time.Time) (*dbsqlc.RiverJob, error) {
func (a *StandardAdapter) JobSetStateIfRunning(ctx context.Context, params *JobSetStateIfRunningParams) (*dbsqlc.RiverJob, error) {
ctx, cancel := context.WithTimeout(ctx, a.deadlineTimeout)
defer cancel()

return a.queries.JobSetSnoozedIfRunning(ctx, a.executor, dbsqlc.JobSetSnoozedIfRunningParams{
ID: id,
ScheduledAt: scheduledAt,
return a.queries.JobSetStateIfRunning(ctx, a.executor, dbsqlc.JobSetStateIfRunningParams{
ID: params.ID,
ErrorDoUpdate: params.errData != nil,
Error: params.errData,
FinalizedAtDoUpdate: params.finalizedAt != nil,
FinalizedAt: params.finalizedAt,
MaxAttemptsUpdate: params.maxAttempts != nil,
MaxAttempts: int16(ptrutil.ValOrDefault(params.maxAttempts, 0)), // default never used
ScheduledAtDoUpdate: params.scheduledAt != nil,
ScheduledAt: ptrutil.ValOrDefault(params.scheduledAt, time.Time{}), // default never used
State: params.state,
})
}

Expand Down
39 changes: 6 additions & 33 deletions internal/dbadapter/db_adapter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,7 @@ func Test_StandardAdapter_FetchIsPrioritized(t *testing.T) {
require.Equal(t, int16(3), jobs[0].Priority, "expected final job to have priority 2")
}

func Test_StandardAdapter_JobSetCompletedIfRunning(t *testing.T) {
func Test_StandardAdapter_JobSetStateCompleted(t *testing.T) {
t.Parallel()

ctx := context.Background()
Expand Down Expand Up @@ -535,7 +535,7 @@ func Test_StandardAdapter_JobSetCompletedIfRunning(t *testing.T) {
require.NoError(t, err)
require.Equal(t, dbsqlc.JobStateRunning, res.Job.State)

jAfter, err := adapter.JobSetCompletedIfRunning(ctx, JobToComplete{ID: res.Job.ID, FinalizedAt: bundle.baselineTime})
jAfter, err := adapter.JobSetStateIfRunning(ctx, JobSetStateCompleted(res.Job.ID, bundle.baselineTime))
require.NoError(t, err)
require.Equal(t, dbsqlc.JobStateCompleted, jAfter.State)
require.WithinDuration(t, bundle.baselineTime, *jAfter.FinalizedAt, time.Microsecond)
Expand All @@ -556,7 +556,7 @@ func Test_StandardAdapter_JobSetCompletedIfRunning(t *testing.T) {
require.NoError(t, err)
require.Equal(t, dbsqlc.JobStateRetryable, res.Job.State)

jAfter, err := adapter.JobSetCompletedIfRunning(ctx, JobToComplete{ID: res.Job.ID, FinalizedAt: bundle.baselineTime})
jAfter, err := adapter.JobSetStateIfRunning(ctx, JobSetStateCompleted(res.Job.ID, bundle.baselineTime))
require.NoError(t, err)
require.Equal(t, dbsqlc.JobStateRetryable, jAfter.State)
require.Nil(t, jAfter.FinalizedAt)
Expand All @@ -567,34 +567,7 @@ func Test_StandardAdapter_JobSetCompletedIfRunning(t *testing.T) {
})
}

func Test_StandardAdapter_JobCompleteMany(t *testing.T) {
t.Parallel()

ctx := context.Background()
tx := riverinternaltest.TestTx(ctx, t)

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

adapter := NewStandardAdapter(riverinternaltest.BaseServiceArchetype(t), testAdapterConfig(tx))
now := time.Now()
jobsToComplete := make([]JobToComplete, 3)
for i := 0; i < len(jobsToComplete); i++ {
res, err := adapter.JobInsert(ctx, makeFakeJobInsertParams(i))
require.NoError(t, err)
jobsToComplete[i] = JobToComplete{ID: res.Job.ID, FinalizedAt: now.Add(time.Duration(i) * time.Second)}
}

require.NoError(t, adapter.JobCompleteMany(ctx, jobsToComplete...))
for i := 0; i < len(jobsToComplete); i++ {
job, err := adapter.queries.JobGetByID(ctx, tx, jobsToComplete[i].ID)
require.NoError(t, err)
require.Equal(t, dbsqlc.JobStateCompleted, job.State)
require.WithinDuration(t, jobsToComplete[i].FinalizedAt, *job.FinalizedAt, time.Microsecond)
}
}

func Test_StandardAdapter_JobSetErroredIfRunning(t *testing.T) {
func Test_StandardAdapter_JobSetStateErrored(t *testing.T) {
t.Parallel()

ctx := context.Background()
Expand Down Expand Up @@ -643,7 +616,7 @@ func Test_StandardAdapter_JobSetErroredIfRunning(t *testing.T) {
require.NoError(t, err)
require.Equal(t, dbsqlc.JobStateRunning, res.Job.State)

jAfter, err := adapter.JobSetErroredIfRunning(ctx, res.Job.ID, bundle.baselineTime, bundle.errPayload)
jAfter, err := adapter.JobSetStateIfRunning(ctx, JobSetStateErrored(res.Job.ID, bundle.baselineTime, bundle.errPayload))
require.NoError(t, err)
require.Equal(t, dbsqlc.JobStateRetryable, jAfter.State)
require.WithinDuration(t, bundle.baselineTime, jAfter.ScheduledAt, time.Microsecond)
Expand Down Expand Up @@ -672,7 +645,7 @@ func Test_StandardAdapter_JobSetErroredIfRunning(t *testing.T) {
require.NoError(t, err)
require.Equal(t, dbsqlc.JobStateRetryable, res.Job.State)

jAfter, err := adapter.JobSetErroredIfRunning(ctx, res.Job.ID, bundle.baselineTime, bundle.errPayload)
jAfter, err := adapter.JobSetStateIfRunning(ctx, JobSetStateErrored(res.Job.ID, bundle.baselineTime, bundle.errPayload))
require.NoError(t, err)
require.Equal(t, dbsqlc.JobStateRetryable, jAfter.State)
require.WithinDuration(t, params.ScheduledAt, jAfter.ScheduledAt, time.Microsecond)
Expand Down
Loading

0 comments on commit 13ecbd4

Please sign in to comment.