From f9f3b9406c01d0578ab51d8095c4290e9edb3912 Mon Sep 17 00:00:00 2001 From: Blake Gentry Date: Thu, 26 Sep 2024 10:53:04 -0500 Subject: [PATCH] JobSchedule: handle duplicate uniques being scheduled at once MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit If two unique jobs happen to be getting scheduled in the same batch, only one of them can successfully be moved to `available`—the other must be discarded. Pick the one that's first in line. --- .../riverdrivertest/riverdrivertest.go | 49 ++++++++++++ .../internal/dbsqlc/river_job.sql.go | 77 +++++++++++++------ .../riverpgxv5/internal/dbsqlc/river_job.sql | 77 +++++++++++++------ .../internal/dbsqlc/river_job.sql.go | 77 +++++++++++++------ 4 files changed, 208 insertions(+), 72 deletions(-) diff --git a/internal/riverinternaltest/riverdrivertest/riverdrivertest.go b/internal/riverinternaltest/riverdrivertest/riverdrivertest.go index 0db6254c..42695d33 100644 --- a/internal/riverinternaltest/riverdrivertest/riverdrivertest.go +++ b/internal/riverinternaltest/riverdrivertest/riverdrivertest.go @@ -1641,6 +1641,55 @@ func Exercise[TTx any](ctx context.Context, t *testing.T, require.Equal(t, rivertype.JobStateAvailable, updatedJob3.State) require.False(t, gjson.GetBytes(updatedJob3.Metadata, "unique_key_conflict").Exists()) }) + + t.Run("SchedulingTwoRetryableJobsThatWillConflictWithEachOther", func(t *testing.T) { + t.Parallel() + + exec, _ := setup(ctx, t) + + var ( + horizon = time.Now() + beforeHorizon = horizon.Add(-1 * time.Minute) + ) + + // The default unique state list, minus retryable to allow for these conflicts: + nonRetryableUniqueStates := []rivertype.JobState{ + rivertype.JobStateAvailable, + rivertype.JobStatePending, + rivertype.JobStateRunning, + rivertype.JobStateScheduled, + } + + job1 := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{ + ScheduledAt: &beforeHorizon, + State: ptrutil.Ptr(rivertype.JobStateRetryable), + UniqueKey: []byte("unique-key-1"), + UniqueStates: dbunique.UniqueStatesToBitmask(nonRetryableUniqueStates), + }) + job2 := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{ + ScheduledAt: &beforeHorizon, + State: ptrutil.Ptr(rivertype.JobStateRetryable), + UniqueKey: []byte("unique-key-1"), + UniqueStates: dbunique.UniqueStatesToBitmask(nonRetryableUniqueStates), + }) + + result, err := exec.JobSchedule(ctx, &riverdriver.JobScheduleParams{ + Max: 100, + Now: horizon, + }) + require.NoError(t, err) + require.Len(t, result, 2) + + updatedJob1, err := exec.JobGetByID(ctx, job1.ID) + require.NoError(t, err) + require.Equal(t, rivertype.JobStateAvailable, updatedJob1.State) + require.False(t, gjson.GetBytes(updatedJob1.Metadata, "unique_key_conflict").Exists()) + + updatedJob2, err := exec.JobGetByID(ctx, job2.ID) + require.NoError(t, err) + require.Equal(t, rivertype.JobStateDiscarded, updatedJob2.State) + require.Equal(t, "scheduler_discarded", gjson.GetBytes(updatedJob2.Metadata, "unique_key_conflict").String()) + }) }) t.Run("JobSetCompleteIfRunningMany", func(t *testing.T) { diff --git a/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go b/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go index 874f1e71..cb339c1a 100644 --- a/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go +++ b/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go @@ -1013,7 +1013,12 @@ func (q *Queries) JobRetry(ctx context.Context, db DBTX, id int64) (*RiverJob, e const jobSchedule = `-- name: JobSchedule :many WITH jobs_to_schedule AS ( - SELECT id, unique_key, unique_states + SELECT + id, + unique_key, + unique_states, + priority, + scheduled_at FROM river_job WHERE state IN ('retryable', 'scheduled') @@ -1027,35 +1032,59 @@ WITH jobs_to_schedule AS ( LIMIT $2::bigint FOR UPDATE ), -conflicting_jobs AS ( - SELECT id, unique_key +jobs_with_rownum AS ( + SELECT + id, unique_key, unique_states, priority, scheduled_at, + CASE + WHEN unique_key IS NOT NULL AND unique_states IS NOT NULL THEN + ROW_NUMBER() OVER ( + PARTITION BY unique_key + ORDER BY priority, scheduled_at, id + ) + ELSE NULL + END AS row_num + FROM jobs_to_schedule +), +unique_conflicts AS ( + SELECT river_job.unique_key FROM river_job + JOIN jobs_with_rownum + ON river_job.unique_key = jobs_with_rownum.unique_key + AND river_job.id != jobs_with_rownum.id WHERE - unique_key IS NOT NULL - AND unique_states IS NOT NULL - AND river_job_state_in_bitmask(unique_states, state) - AND unique_key IN ( - SELECT unique_key - FROM jobs_to_schedule - WHERE - unique_key IS NOT NULL - AND unique_states IS NOT NULL - ) + river_job.unique_key IS NOT NULL + AND river_job.unique_states IS NOT NULL + AND river_job_state_in_bitmask(river_job.unique_states, river_job.state) +), +job_updates AS ( + SELECT + job.id, + job.unique_key, + job.unique_states, + CASE + WHEN job.row_num IS NULL THEN 'available'::river_job_state + WHEN uc.unique_key IS NOT NULL THEN 'discarded'::river_job_state + WHEN job.row_num = 1 THEN 'available'::river_job_state + ELSE 'discarded'::river_job_state + END AS new_state, + (job.row_num IS NOT NULL AND (uc.unique_key IS NOT NULL OR job.row_num > 1)) AS finalized_at_do_update, + (job.row_num IS NOT NULL AND (uc.unique_key IS NOT NULL OR job.row_num > 1)) AS metadata_do_update + FROM jobs_with_rownum job + LEFT JOIN unique_conflicts uc ON job.unique_key = uc.unique_key ), updated_jobs AS ( UPDATE river_job SET - state = CASE WHEN cj.unique_key IS NULL THEN 'available'::river_job_state - ELSE 'discarded'::river_job_state END, - finalized_at = CASE WHEN cj.unique_key IS NULL THEN finalized_at - ELSE $1::timestamptz END, - -- Purely for debugging to understand when this code path was used: - metadata = CASE WHEN cj.unique_key IS NULL THEN metadata - ELSE metadata || '{"unique_key_conflict": "scheduler_discarded"}'::jsonb END - FROM jobs_to_schedule jts - LEFT JOIN conflicting_jobs cj ON jts.unique_key = cj.unique_key AND jts.id != cj.id - WHERE river_job.id = jts.id - RETURNING river_job.id, state = 'discarded'::river_job_state AS conflict_discarded + state = job_updates.new_state, + finalized_at = CASE WHEN job_updates.finalized_at_do_update THEN $1::timestamptz + ELSE river_job.finalized_at END, + metadata = CASE WHEN job_updates.metadata_do_update THEN river_job.metadata || '{"unique_key_conflict": "scheduler_discarded"}'::jsonb + ELSE river_job.metadata END + FROM job_updates + WHERE river_job.id = job_updates.id + RETURNING + river_job.id, + job_updates.new_state = 'discarded'::river_job_state AS conflict_discarded ) SELECT river_job.id, river_job.args, river_job.attempt, river_job.attempted_at, river_job.attempted_by, river_job.created_at, river_job.errors, river_job.finalized_at, river_job.kind, river_job.max_attempts, river_job.metadata, river_job.priority, river_job.queue, river_job.state, river_job.scheduled_at, river_job.tags, river_job.unique_key, river_job.unique_states, diff --git a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql index 48785b7d..c1ac0bdf 100644 --- a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql +++ b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql @@ -406,7 +406,12 @@ FROM updated_job; -- name: JobSchedule :many WITH jobs_to_schedule AS ( - SELECT id, unique_key, unique_states + SELECT + id, + unique_key, + unique_states, + priority, + scheduled_at FROM river_job WHERE state IN ('retryable', 'scheduled') @@ -420,35 +425,59 @@ WITH jobs_to_schedule AS ( LIMIT @max::bigint FOR UPDATE ), -conflicting_jobs AS ( - SELECT id, unique_key +jobs_with_rownum AS ( + SELECT + *, + CASE + WHEN unique_key IS NOT NULL AND unique_states IS NOT NULL THEN + ROW_NUMBER() OVER ( + PARTITION BY unique_key + ORDER BY priority, scheduled_at, id + ) + ELSE NULL + END AS row_num + FROM jobs_to_schedule +), +unique_conflicts AS ( + SELECT river_job.unique_key FROM river_job + JOIN jobs_with_rownum + ON river_job.unique_key = jobs_with_rownum.unique_key + AND river_job.id != jobs_with_rownum.id WHERE - unique_key IS NOT NULL - AND unique_states IS NOT NULL - AND river_job_state_in_bitmask(unique_states, state) - AND unique_key IN ( - SELECT unique_key - FROM jobs_to_schedule - WHERE - unique_key IS NOT NULL - AND unique_states IS NOT NULL - ) + river_job.unique_key IS NOT NULL + AND river_job.unique_states IS NOT NULL + AND river_job_state_in_bitmask(river_job.unique_states, river_job.state) +), +job_updates AS ( + SELECT + job.id, + job.unique_key, + job.unique_states, + CASE + WHEN job.row_num IS NULL THEN 'available'::river_job_state + WHEN uc.unique_key IS NOT NULL THEN 'discarded'::river_job_state + WHEN job.row_num = 1 THEN 'available'::river_job_state + ELSE 'discarded'::river_job_state + END AS new_state, + (job.row_num IS NOT NULL AND (uc.unique_key IS NOT NULL OR job.row_num > 1)) AS finalized_at_do_update, + (job.row_num IS NOT NULL AND (uc.unique_key IS NOT NULL OR job.row_num > 1)) AS metadata_do_update + FROM jobs_with_rownum job + LEFT JOIN unique_conflicts uc ON job.unique_key = uc.unique_key ), updated_jobs AS ( UPDATE river_job SET - state = CASE WHEN cj.unique_key IS NULL THEN 'available'::river_job_state - ELSE 'discarded'::river_job_state END, - finalized_at = CASE WHEN cj.unique_key IS NULL THEN finalized_at - ELSE @now::timestamptz END, - -- Purely for debugging to understand when this code path was used: - metadata = CASE WHEN cj.unique_key IS NULL THEN metadata - ELSE metadata || '{"unique_key_conflict": "scheduler_discarded"}'::jsonb END - FROM jobs_to_schedule jts - LEFT JOIN conflicting_jobs cj ON jts.unique_key = cj.unique_key AND jts.id != cj.id - WHERE river_job.id = jts.id - RETURNING river_job.id, state = 'discarded'::river_job_state AS conflict_discarded + state = job_updates.new_state, + finalized_at = CASE WHEN job_updates.finalized_at_do_update THEN @now::timestamptz + ELSE river_job.finalized_at END, + metadata = CASE WHEN job_updates.metadata_do_update THEN river_job.metadata || '{"unique_key_conflict": "scheduler_discarded"}'::jsonb + ELSE river_job.metadata END + FROM job_updates + WHERE river_job.id = job_updates.id + RETURNING + river_job.id, + job_updates.new_state = 'discarded'::river_job_state AS conflict_discarded ) SELECT sqlc.embed(river_job), diff --git a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go index 6335d008..1fff4859 100644 --- a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go +++ b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go @@ -997,7 +997,12 @@ func (q *Queries) JobRetry(ctx context.Context, db DBTX, id int64) (*RiverJob, e const jobSchedule = `-- name: JobSchedule :many WITH jobs_to_schedule AS ( - SELECT id, unique_key, unique_states + SELECT + id, + unique_key, + unique_states, + priority, + scheduled_at FROM river_job WHERE state IN ('retryable', 'scheduled') @@ -1011,35 +1016,59 @@ WITH jobs_to_schedule AS ( LIMIT $2::bigint FOR UPDATE ), -conflicting_jobs AS ( - SELECT id, unique_key +jobs_with_rownum AS ( + SELECT + id, unique_key, unique_states, priority, scheduled_at, + CASE + WHEN unique_key IS NOT NULL AND unique_states IS NOT NULL THEN + ROW_NUMBER() OVER ( + PARTITION BY unique_key + ORDER BY priority, scheduled_at, id + ) + ELSE NULL + END AS row_num + FROM jobs_to_schedule +), +unique_conflicts AS ( + SELECT river_job.unique_key FROM river_job + JOIN jobs_with_rownum + ON river_job.unique_key = jobs_with_rownum.unique_key + AND river_job.id != jobs_with_rownum.id WHERE - unique_key IS NOT NULL - AND unique_states IS NOT NULL - AND river_job_state_in_bitmask(unique_states, state) - AND unique_key IN ( - SELECT unique_key - FROM jobs_to_schedule - WHERE - unique_key IS NOT NULL - AND unique_states IS NOT NULL - ) + river_job.unique_key IS NOT NULL + AND river_job.unique_states IS NOT NULL + AND river_job_state_in_bitmask(river_job.unique_states, river_job.state) +), +job_updates AS ( + SELECT + job.id, + job.unique_key, + job.unique_states, + CASE + WHEN job.row_num IS NULL THEN 'available'::river_job_state + WHEN uc.unique_key IS NOT NULL THEN 'discarded'::river_job_state + WHEN job.row_num = 1 THEN 'available'::river_job_state + ELSE 'discarded'::river_job_state + END AS new_state, + (job.row_num IS NOT NULL AND (uc.unique_key IS NOT NULL OR job.row_num > 1)) AS finalized_at_do_update, + (job.row_num IS NOT NULL AND (uc.unique_key IS NOT NULL OR job.row_num > 1)) AS metadata_do_update + FROM jobs_with_rownum job + LEFT JOIN unique_conflicts uc ON job.unique_key = uc.unique_key ), updated_jobs AS ( UPDATE river_job SET - state = CASE WHEN cj.unique_key IS NULL THEN 'available'::river_job_state - ELSE 'discarded'::river_job_state END, - finalized_at = CASE WHEN cj.unique_key IS NULL THEN finalized_at - ELSE $1::timestamptz END, - -- Purely for debugging to understand when this code path was used: - metadata = CASE WHEN cj.unique_key IS NULL THEN metadata - ELSE metadata || '{"unique_key_conflict": "scheduler_discarded"}'::jsonb END - FROM jobs_to_schedule jts - LEFT JOIN conflicting_jobs cj ON jts.unique_key = cj.unique_key AND jts.id != cj.id - WHERE river_job.id = jts.id - RETURNING river_job.id, state = 'discarded'::river_job_state AS conflict_discarded + state = job_updates.new_state, + finalized_at = CASE WHEN job_updates.finalized_at_do_update THEN $1::timestamptz + ELSE river_job.finalized_at END, + metadata = CASE WHEN job_updates.metadata_do_update THEN river_job.metadata || '{"unique_key_conflict": "scheduler_discarded"}'::jsonb + ELSE river_job.metadata END + FROM job_updates + WHERE river_job.id = job_updates.id + RETURNING + river_job.id, + job_updates.new_state = 'discarded'::river_job_state AS conflict_discarded ) SELECT river_job.id, river_job.args, river_job.attempt, river_job.attempted_at, river_job.attempted_by, river_job.created_at, river_job.errors, river_job.finalized_at, river_job.kind, river_job.max_attempts, river_job.metadata, river_job.priority, river_job.queue, river_job.state, river_job.scheduled_at, river_job.tags, river_job.unique_key, river_job.unique_states,