Skip to content

Commit

Permalink
JobSchedule: handle duplicate uniques being scheduled at once
Browse files Browse the repository at this point in the history
If two unique jobs happen to be getting scheduled in the same batch,
only one of them can successfully be moved to `available`—the other must
be discarded. Pick the one that's first in line.
  • Loading branch information
bgentry committed Sep 26, 2024
1 parent 675b0d7 commit f9f3b94
Show file tree
Hide file tree
Showing 4 changed files with 208 additions and 72 deletions.
49 changes: 49 additions & 0 deletions internal/riverinternaltest/riverdrivertest/riverdrivertest.go
Original file line number Diff line number Diff line change
Expand Up @@ -1641,6 +1641,55 @@ func Exercise[TTx any](ctx context.Context, t *testing.T,
require.Equal(t, rivertype.JobStateAvailable, updatedJob3.State)
require.False(t, gjson.GetBytes(updatedJob3.Metadata, "unique_key_conflict").Exists())
})

t.Run("SchedulingTwoRetryableJobsThatWillConflictWithEachOther", func(t *testing.T) {
t.Parallel()

exec, _ := setup(ctx, t)

var (
horizon = time.Now()
beforeHorizon = horizon.Add(-1 * time.Minute)
)

// The default unique state list, minus retryable to allow for these conflicts:
nonRetryableUniqueStates := []rivertype.JobState{
rivertype.JobStateAvailable,
rivertype.JobStatePending,
rivertype.JobStateRunning,
rivertype.JobStateScheduled,
}

job1 := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{
ScheduledAt: &beforeHorizon,
State: ptrutil.Ptr(rivertype.JobStateRetryable),
UniqueKey: []byte("unique-key-1"),
UniqueStates: dbunique.UniqueStatesToBitmask(nonRetryableUniqueStates),
})
job2 := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{
ScheduledAt: &beforeHorizon,
State: ptrutil.Ptr(rivertype.JobStateRetryable),
UniqueKey: []byte("unique-key-1"),
UniqueStates: dbunique.UniqueStatesToBitmask(nonRetryableUniqueStates),
})

result, err := exec.JobSchedule(ctx, &riverdriver.JobScheduleParams{
Max: 100,
Now: horizon,
})
require.NoError(t, err)
require.Len(t, result, 2)

updatedJob1, err := exec.JobGetByID(ctx, job1.ID)
require.NoError(t, err)
require.Equal(t, rivertype.JobStateAvailable, updatedJob1.State)
require.False(t, gjson.GetBytes(updatedJob1.Metadata, "unique_key_conflict").Exists())

updatedJob2, err := exec.JobGetByID(ctx, job2.ID)
require.NoError(t, err)
require.Equal(t, rivertype.JobStateDiscarded, updatedJob2.State)
require.Equal(t, "scheduler_discarded", gjson.GetBytes(updatedJob2.Metadata, "unique_key_conflict").String())
})
})

t.Run("JobSetCompleteIfRunningMany", func(t *testing.T) {
Expand Down
77 changes: 53 additions & 24 deletions riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

77 changes: 53 additions & 24 deletions riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,12 @@ FROM updated_job;

-- name: JobSchedule :many
WITH jobs_to_schedule AS (
SELECT id, unique_key, unique_states
SELECT
id,
unique_key,
unique_states,
priority,
scheduled_at
FROM river_job
WHERE
state IN ('retryable', 'scheduled')
Expand All @@ -420,35 +425,59 @@ WITH jobs_to_schedule AS (
LIMIT @max::bigint
FOR UPDATE
),
conflicting_jobs AS (
SELECT id, unique_key
jobs_with_rownum AS (
SELECT
*,
CASE
WHEN unique_key IS NOT NULL AND unique_states IS NOT NULL THEN
ROW_NUMBER() OVER (
PARTITION BY unique_key
ORDER BY priority, scheduled_at, id
)
ELSE NULL
END AS row_num
FROM jobs_to_schedule
),
unique_conflicts AS (
SELECT river_job.unique_key
FROM river_job
JOIN jobs_with_rownum
ON river_job.unique_key = jobs_with_rownum.unique_key
AND river_job.id != jobs_with_rownum.id
WHERE
unique_key IS NOT NULL
AND unique_states IS NOT NULL
AND river_job_state_in_bitmask(unique_states, state)
AND unique_key IN (
SELECT unique_key
FROM jobs_to_schedule
WHERE
unique_key IS NOT NULL
AND unique_states IS NOT NULL
)
river_job.unique_key IS NOT NULL
AND river_job.unique_states IS NOT NULL
AND river_job_state_in_bitmask(river_job.unique_states, river_job.state)
),
job_updates AS (
SELECT
job.id,
job.unique_key,
job.unique_states,
CASE
WHEN job.row_num IS NULL THEN 'available'::river_job_state
WHEN uc.unique_key IS NOT NULL THEN 'discarded'::river_job_state
WHEN job.row_num = 1 THEN 'available'::river_job_state
ELSE 'discarded'::river_job_state
END AS new_state,
(job.row_num IS NOT NULL AND (uc.unique_key IS NOT NULL OR job.row_num > 1)) AS finalized_at_do_update,
(job.row_num IS NOT NULL AND (uc.unique_key IS NOT NULL OR job.row_num > 1)) AS metadata_do_update
FROM jobs_with_rownum job
LEFT JOIN unique_conflicts uc ON job.unique_key = uc.unique_key
),
updated_jobs AS (
UPDATE river_job
SET
state = CASE WHEN cj.unique_key IS NULL THEN 'available'::river_job_state
ELSE 'discarded'::river_job_state END,
finalized_at = CASE WHEN cj.unique_key IS NULL THEN finalized_at
ELSE @now::timestamptz END,
-- Purely for debugging to understand when this code path was used:
metadata = CASE WHEN cj.unique_key IS NULL THEN metadata
ELSE metadata || '{"unique_key_conflict": "scheduler_discarded"}'::jsonb END
FROM jobs_to_schedule jts
LEFT JOIN conflicting_jobs cj ON jts.unique_key = cj.unique_key AND jts.id != cj.id
WHERE river_job.id = jts.id
RETURNING river_job.id, state = 'discarded'::river_job_state AS conflict_discarded
state = job_updates.new_state,
finalized_at = CASE WHEN job_updates.finalized_at_do_update THEN @now::timestamptz
ELSE river_job.finalized_at END,
metadata = CASE WHEN job_updates.metadata_do_update THEN river_job.metadata || '{"unique_key_conflict": "scheduler_discarded"}'::jsonb
ELSE river_job.metadata END
FROM job_updates
WHERE river_job.id = job_updates.id
RETURNING
river_job.id,
job_updates.new_state = 'discarded'::river_job_state AS conflict_discarded
)
SELECT
sqlc.embed(river_job),
Expand Down
77 changes: 53 additions & 24 deletions riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit f9f3b94

Please sign in to comment.