From 146829826220ba9148ef346b48aac5ed3c5d01fb Mon Sep 17 00:00:00 2001 From: Brandur Leach Date: Tue, 12 Dec 2023 18:18:34 -0800 Subject: [PATCH] Eager retry for short retry delays (#105) An aspect of River that could currently be considered a bug is that retry delays below five seconds are effectively ignored. This is because when a job errors it always transitions to `retryable` state, and needs to wait for the scheduler to run to transition it back to `available`, and the scheduler run interval is five seconds. This is in practice a problem because it means that the first retry in a retry policy isn't actually one second as claimed in docs, etc., but actually ~5 seconds, and it won't be obvious why. Here, implement an "eager" retry system such that if we detect that the retry delay is smaller than the scheduler's run interval, we place the job immediately into an `available` state, allowing it to be worked sooner. To facilitate this we add a predicate on the "get available" query that checks for `scheduled_at <= now()` so that errored jobs with short retries can be `available` for a short time without being worked immediately. Most of the time this should have no effect because this really only applies to the first retry in any failure (the second is at 16 seconds, which is well above the scheduler's run interval). --- CHANGELOG.md | 4 + client.go | 10 +- client_test.go | 1 + example_subscription_test.go | 2 +- internal/dbadapter/db_adapter.go | 6 +- internal/dbadapter/db_adapter_test.go | 150 +++++++++++++++--- internal/dbsqlc/river_job.sql | 3 +- internal/dbsqlc/river_job.sql.go | 3 +- .../riverinternaltest/riverinternaltest.go | 10 ++ job_executor.go | 15 +- job_executor_test.go | 40 +++++ producer.go | 19 ++- producer_test.go | 3 + 13 files changed, 228 insertions(+), 38 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index af3feb7c..3c418530 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Changed + +- Errored jobs that have a very short duration before their next retry (<5 seconds) are set to `available` immediately instead of being made `scheduled` and having to wait for the scheduler to make a pass to make them workable. [PR #105](https://github.com/riverqueue/river/pull/105). + ## [0.0.12] - 2023-12-02 ### Added diff --git a/client.go b/client.go index 343a8c39..a8c6cde7 100644 --- a/client.go +++ b/client.go @@ -167,6 +167,10 @@ type Config struct { // Test-only property that allows sleep statements to be disable. Only // functions in cases where the CancellableSleep helper is used to sleep. disableSleep bool + + // Scheduler run interval. Shared between the scheduler and producer/job + // executors, but not currently exposed for configuration. + schedulerInterval time.Duration } func (c *Config) validate() error { @@ -388,6 +392,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client RetryPolicy: retryPolicy, Workers: config.Workers, disableSleep: config.disableSleep, + schedulerInterval: valutil.ValOrDefault(config.schedulerInterval, maintenance.SchedulerIntervalDefault), } if err := config.validate(); err != nil { @@ -524,7 +529,9 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client } { - scheduler := maintenance.NewScheduler(archetype, &maintenance.SchedulerConfig{}, driver.GetDBPool()) + scheduler := maintenance.NewScheduler(archetype, &maintenance.SchedulerConfig{ + Interval: config.schedulerInterval, + }, driver.GetDBPool()) maintenanceServices = append(maintenanceServices, scheduler) client.testSignals.scheduler = &scheduler.TestSignals } @@ -898,6 +905,7 @@ func (c *Client[TTx]) provisionProducers() error { Notifier: c.notifier, QueueName: queue, RetryPolicy: c.config.RetryPolicy, + SchedulerInterval: c.config.schedulerInterval, WorkerName: c.id, Workers: c.config.Workers, } diff --git a/client_test.go b/client_test.go index 4effd849..d8bd56b1 100644 --- a/client_test.go +++ b/client_test.go @@ -127,6 +127,7 @@ func newTestConfig(t *testing.T, callback callbackFunc) *Config { Queues: map[string]QueueConfig{QueueDefault: {MaxWorkers: 50}}, Workers: workers, disableSleep: true, + schedulerInterval: riverinternaltest.SchedulerShortInterval, } } diff --git a/example_subscription_test.go b/example_subscription_test.go index d0ee2cc4..ea6da59b 100644 --- a/example_subscription_test.go +++ b/example_subscription_test.go @@ -129,7 +129,7 @@ func Example_subscription() { // Output: // Got job with state: completed - // Got job with state: retryable + // Got job with state: available // Got job with state: cancelled // Client stopped // Channel is closed diff --git a/internal/dbadapter/db_adapter.go b/internal/dbadapter/db_adapter.go index 50838bff..b24e8926 100644 --- a/internal/dbadapter/db_adapter.go +++ b/internal/dbadapter/db_adapter.go @@ -376,7 +376,11 @@ func JobSetStateDiscarded(id int64, finalizedAt time.Time, errData []byte) *JobS return &JobSetStateIfRunningParams{ID: id, errData: errData, finalizedAt: &finalizedAt, state: dbsqlc.JobStateDiscarded} } -func JobSetStateErrored(id int64, scheduledAt time.Time, errData []byte) *JobSetStateIfRunningParams { +func JobSetStateErrorAvailable(id int64, scheduledAt time.Time, errData []byte) *JobSetStateIfRunningParams { + return &JobSetStateIfRunningParams{ID: id, errData: errData, scheduledAt: &scheduledAt, state: dbsqlc.JobStateAvailable} +} + +func JobSetStateErrorRetryable(id int64, scheduledAt time.Time, errData []byte) *JobSetStateIfRunningParams { return &JobSetStateIfRunningParams{ID: id, errData: errData, scheduledAt: &scheduledAt, state: dbsqlc.JobStateRetryable} } diff --git a/internal/dbadapter/db_adapter_test.go b/internal/dbadapter/db_adapter_test.go index 145a7f6e..0eac3dec 100644 --- a/internal/dbadapter/db_adapter_test.go +++ b/internal/dbadapter/db_adapter_test.go @@ -17,9 +17,99 @@ import ( "github.com/riverqueue/river/internal/rivercommon" "github.com/riverqueue/river/internal/riverinternaltest" "github.com/riverqueue/river/internal/util/dbutil" + "github.com/riverqueue/river/internal/util/ptrutil" ) -func Test_StandardAdapter_Insert(t *testing.T) { +func Test_StandardAdapter_JobGetAvailable(t *testing.T) { + t.Parallel() + + ctx := context.Background() + + type testBundle struct { + baselineTime time.Time // baseline time frozen at now when setup is called + tx pgx.Tx + } + + setup := func(t *testing.T) (*StandardAdapter, *testBundle) { + t.Helper() + + bundle := &testBundle{ + baselineTime: time.Now(), + tx: riverinternaltest.TestTx(ctx, t), + } + + adapter := NewStandardAdapter(riverinternaltest.BaseServiceArchetype(t), testAdapterConfig(bundle.tx)) + adapter.TimeNowUTC = func() time.Time { return bundle.baselineTime } + + return adapter, bundle + } + + t.Run("Success", func(t *testing.T) { + t.Parallel() + + adapter, bundle := setup(t) + + _, err := adapter.JobInsertTx(ctx, bundle.tx, makeFakeJobInsertParams(0, nil)) + require.NoError(t, err) + + jobRows, err := adapter.JobGetAvailableTx(ctx, bundle.tx, rivercommon.QueueDefault, 100) + require.NoError(t, err) + require.Len(t, jobRows, 1) + + jobRow := jobRows[0] + require.Equal(t, []string{adapter.workerName}, jobRow.AttemptedBy) + }) + + t.Run("ConstrainedToLimit", func(t *testing.T) { + t.Parallel() + + adapter, bundle := setup(t) + + _, err := adapter.JobInsertTx(ctx, bundle.tx, makeFakeJobInsertParams(0, nil)) + require.NoError(t, err) + _, err = adapter.JobInsertTx(ctx, bundle.tx, makeFakeJobInsertParams(1, nil)) + require.NoError(t, err) + + // Two rows inserted but only one found because of the added limit. + jobRows, err := adapter.JobGetAvailableTx(ctx, bundle.tx, rivercommon.QueueDefault, 1) + require.NoError(t, err) + require.Len(t, jobRows, 1) + }) + + t.Run("ConstrainedToQueue", func(t *testing.T) { + t.Parallel() + + adapter, bundle := setup(t) + + _, err := adapter.JobInsertTx(ctx, bundle.tx, makeFakeJobInsertParams(0, &makeFakeJobInsertParamsOpts{ + Queue: ptrutil.Ptr("other-queue"), + })) + require.NoError(t, err) + + // Job is in a non-default queue so it's not found. + jobRows, err := adapter.JobGetAvailableTx(ctx, bundle.tx, rivercommon.QueueDefault, 1) + require.NoError(t, err) + require.Empty(t, jobRows) + }) + + t.Run("ConstrainedToScheduledAtBeforeNow", func(t *testing.T) { + t.Parallel() + + adapter, bundle := setup(t) + + _, err := adapter.JobInsertTx(ctx, bundle.tx, makeFakeJobInsertParams(0, &makeFakeJobInsertParamsOpts{ + ScheduledAt: ptrutil.Ptr(time.Now().Add(1 * time.Minute)), + })) + require.NoError(t, err) + + // Job is scheduled a while from now so it's not found. + jobRows, err := adapter.JobGetAvailableTx(ctx, bundle.tx, rivercommon.QueueDefault, 1) + require.NoError(t, err) + require.Empty(t, jobRows) + }) +} + +func Test_StandardAdapter_JobInsert(t *testing.T) { t.Parallel() ctx := context.Background() @@ -53,7 +143,7 @@ func Test_StandardAdapter_Insert(t *testing.T) { adapter, _ := setupTx(t) - insertParams := makeFakeJobInsertParams(0) + insertParams := makeFakeJobInsertParams(0, nil) _, err := adapter.JobInsert(ctx, insertParams) require.NoError(t, err) }) @@ -65,7 +155,7 @@ func Test_StandardAdapter_Insert(t *testing.T) { const maxJobsToFetch = 8 - res, err := adapter.JobInsert(ctx, makeFakeJobInsertParams(0)) + res, err := adapter.JobInsert(ctx, makeFakeJobInsertParams(0, nil)) require.NoError(t, err) require.NotEqual(t, 0, res.Job.ID, "expected job ID to be set, got %d", res.Job.ID) require.WithinDuration(t, time.Now(), res.Job.ScheduledAt, 1*time.Second) @@ -78,7 +168,7 @@ func Test_StandardAdapter_Insert(t *testing.T) { "expected selected job to be in running state, got %q", jobs[0].State) for i := 1; i < 10; i++ { - _, err := adapter.JobInsert(ctx, makeFakeJobInsertParams(i)) + _, err := adapter.JobInsert(ctx, makeFakeJobInsertParams(i, nil)) require.NoError(t, err) } @@ -102,7 +192,7 @@ func Test_StandardAdapter_Insert(t *testing.T) { adapter, _ := setupTx(t) - insertParams := makeFakeJobInsertParams(0) + insertParams := makeFakeJobInsertParams(0, nil) insertParams.Unique = true insertParams.UniqueByArgs = true @@ -133,7 +223,7 @@ func Test_StandardAdapter_Insert(t *testing.T) { adapter, bundle := setupTx(t) - insertParams := makeFakeJobInsertParams(0) + insertParams := makeFakeJobInsertParams(0, nil) insertParams.Unique = true insertParams.UniqueByPeriod = 15 * time.Minute @@ -164,7 +254,7 @@ func Test_StandardAdapter_Insert(t *testing.T) { adapter, _ := setupTx(t) - insertParams := makeFakeJobInsertParams(0) + insertParams := makeFakeJobInsertParams(0, nil) insertParams.Unique = true insertParams.UniqueByQueue = true @@ -195,7 +285,7 @@ func Test_StandardAdapter_Insert(t *testing.T) { adapter, bundle := setupTx(t) - insertParams := makeFakeJobInsertParams(0) + insertParams := makeFakeJobInsertParams(0, nil) insertParams.Unique = true insertParams.UniqueByState = []dbsqlc.JobState{dbsqlc.JobStateAvailable, dbsqlc.JobStateRunning} @@ -246,7 +336,7 @@ func Test_StandardAdapter_Insert(t *testing.T) { adapter, bundle := setupTx(t) - insertParams := makeFakeJobInsertParams(0) + insertParams := makeFakeJobInsertParams(0, nil) insertParams.Unique = true insertParams.UniqueByQueue = true @@ -306,7 +396,7 @@ func Test_StandardAdapter_Insert(t *testing.T) { adapter, bundle := setupTx(t) - insertParams := makeFakeJobInsertParams(0) + insertParams := makeFakeJobInsertParams(0, nil) insertParams.Unique = true insertParams.UniqueByArgs = true insertParams.UniqueByPeriod = 15 * time.Minute @@ -388,7 +478,7 @@ func Test_StandardAdapter_Insert(t *testing.T) { adapter, bundle := setup(t, riverinternaltest.TestDB(ctx, t)) - insertParams := makeFakeJobInsertParams(0) + insertParams := makeFakeJobInsertParams(0, nil) insertParams.Unique = true insertParams.UniqueByPeriod = 15 * time.Minute @@ -442,7 +532,7 @@ func Test_Adapter_JobInsertMany(t *testing.T) { insertParams := make([]*JobInsertParams, 10) for i := 0; i < len(insertParams); i++ { - insertParams[i] = makeFakeJobInsertParams(i) + insertParams[i] = makeFakeJobInsertParams(i, nil) } count, err := adapter.JobInsertMany(ctx, insertParams) @@ -467,7 +557,7 @@ func Test_StandardAdapter_FetchIsPrioritized(t *testing.T) { for i := 3; i > 0; i-- { // Insert jobs with decreasing priority numbers (3, 2, 1) which means increasing priority. - insertParams := makeFakeJobInsertParams(i) + insertParams := makeFakeJobInsertParams(i, nil) insertParams.Priority = i _, err := adapter.JobInsert(ctx, insertParams) require.NoError(t, err) @@ -529,7 +619,7 @@ func Test_StandardAdapter_JobSetStateCompleted(t *testing.T) { adapter, bundle := setupTx(t) - params := makeFakeJobInsertParams(0) + params := makeFakeJobInsertParams(0, nil) params.State = dbsqlc.JobStateRunning res, err := adapter.JobInsert(ctx, params) require.NoError(t, err) @@ -550,7 +640,7 @@ func Test_StandardAdapter_JobSetStateCompleted(t *testing.T) { adapter, bundle := setupTx(t) - params := makeFakeJobInsertParams(0) + params := makeFakeJobInsertParams(0, nil) params.State = dbsqlc.JobStateRetryable res, err := adapter.JobInsert(ctx, params) require.NoError(t, err) @@ -610,13 +700,13 @@ func Test_StandardAdapter_JobSetStateErrored(t *testing.T) { adapter, bundle := setupTx(t) - params := makeFakeJobInsertParams(0) + params := makeFakeJobInsertParams(0, nil) params.State = dbsqlc.JobStateRunning res, err := adapter.JobInsert(ctx, params) require.NoError(t, err) require.Equal(t, dbsqlc.JobStateRunning, res.Job.State) - jAfter, err := adapter.JobSetStateIfRunning(ctx, JobSetStateErrored(res.Job.ID, bundle.baselineTime, bundle.errPayload)) + jAfter, err := adapter.JobSetStateIfRunning(ctx, JobSetStateErrorRetryable(res.Job.ID, bundle.baselineTime, bundle.errPayload)) require.NoError(t, err) require.Equal(t, dbsqlc.JobStateRetryable, jAfter.State) require.WithinDuration(t, bundle.baselineTime, jAfter.ScheduledAt, time.Microsecond) @@ -638,14 +728,14 @@ func Test_StandardAdapter_JobSetStateErrored(t *testing.T) { adapter, bundle := setupTx(t) - params := makeFakeJobInsertParams(0) + params := makeFakeJobInsertParams(0, nil) params.State = dbsqlc.JobStateRetryable params.ScheduledAt = bundle.baselineTime.Add(10 * time.Second) res, err := adapter.JobInsert(ctx, params) require.NoError(t, err) require.Equal(t, dbsqlc.JobStateRetryable, res.Job.State) - jAfter, err := adapter.JobSetStateIfRunning(ctx, JobSetStateErrored(res.Job.ID, bundle.baselineTime, bundle.errPayload)) + jAfter, err := adapter.JobSetStateIfRunning(ctx, JobSetStateErrorRetryable(res.Job.ID, bundle.baselineTime, bundle.errPayload)) require.NoError(t, err) require.Equal(t, dbsqlc.JobStateRetryable, jAfter.State) require.WithinDuration(t, params.ScheduledAt, jAfter.ScheduledAt, time.Microsecond) @@ -687,7 +777,7 @@ func Benchmark_StandardAdapter_Insert(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - if _, err := adapter.JobInsert(ctx, makeFakeJobInsertParams(i)); err != nil { + if _, err := adapter.JobInsert(ctx, makeFakeJobInsertParams(i, nil)); err != nil { b.Fatal(err) } } @@ -706,7 +796,7 @@ func Benchmark_StandardAdapter_Insert_Parallelized(b *testing.B) { b.RunParallel(func(pb *testing.PB) { i := 0 for pb.Next() { - if _, err := adapter.JobInsert(ctx, makeFakeJobInsertParams(i)); err != nil { + if _, err := adapter.JobInsert(ctx, makeFakeJobInsertParams(i, nil)); err != nil { b.Fatal(err) } i++ @@ -725,7 +815,7 @@ func Benchmark_StandardAdapter_Fetch_100(b *testing.B) { adapter := NewStandardAdapter(riverinternaltest.BaseServiceArchetype(b), testAdapterConfig(dbPool)) for i := 0; i < b.N*100; i++ { - insertParams := makeFakeJobInsertParams(i) + insertParams := makeFakeJobInsertParams(i, nil) if _, err := adapter.JobInsert(ctx, insertParams); err != nil { b.Fatal(err) } @@ -750,7 +840,7 @@ func Benchmark_StandardAdapter_Fetch_100_Parallelized(b *testing.B) { adapter := NewStandardAdapter(riverinternaltest.BaseServiceArchetype(b), testAdapterConfig(dbPool)) for i := 0; i < b.N*100*runtime.NumCPU(); i++ { - insertParams := makeFakeJobInsertParams(i) + insertParams := makeFakeJobInsertParams(i, nil) if _, err := adapter.JobInsert(ctx, insertParams); err != nil { b.Fatal(err) } @@ -775,14 +865,24 @@ func testAdapterConfig(ex dbutil.Executor) *StandardAdapterConfig { } } -func makeFakeJobInsertParams(i int) *JobInsertParams { +type makeFakeJobInsertParamsOpts struct { + Queue *string + ScheduledAt *time.Time +} + +func makeFakeJobInsertParams(i int, opts *makeFakeJobInsertParamsOpts) *JobInsertParams { + if opts == nil { + opts = &makeFakeJobInsertParamsOpts{} + } + return &JobInsertParams{ EncodedArgs: []byte(fmt.Sprintf(`{"job_num":%d}`, i)), Kind: "fake_job", MaxAttempts: rivercommon.MaxAttemptsDefault, Metadata: []byte("{}"), Priority: rivercommon.PriorityDefault, - Queue: rivercommon.QueueDefault, + Queue: ptrutil.ValOrDefault(opts.Queue, rivercommon.QueueDefault), + ScheduledAt: ptrutil.ValOrDefault(opts.ScheduledAt, time.Time{}), State: dbsqlc.JobStateAvailable, } } diff --git a/internal/dbsqlc/river_job.sql b/internal/dbsqlc/river_job.sql index 814cadfb..a8352144 100644 --- a/internal/dbsqlc/river_job.sql +++ b/internal/dbsqlc/river_job.sql @@ -72,6 +72,7 @@ WITH locked_jobs AS ( WHERE state = 'available'::river_job_state AND queue = @queue::text + AND scheduled_at <= now() ORDER BY priority ASC, scheduled_at ASC, @@ -84,7 +85,7 @@ UPDATE SET state = 'running'::river_job_state, attempt = river_job.attempt + 1, - attempted_at = NOW(), + attempted_at = now(), attempted_by = array_append(river_job.attempted_by, @worker::text) FROM locked_jobs diff --git a/internal/dbsqlc/river_job.sql.go b/internal/dbsqlc/river_job.sql.go index 83edf3c3..5ea259f3 100644 --- a/internal/dbsqlc/river_job.sql.go +++ b/internal/dbsqlc/river_job.sql.go @@ -79,6 +79,7 @@ WITH locked_jobs AS ( WHERE state = 'available'::river_job_state AND queue = $2::text + AND scheduled_at <= now() ORDER BY priority ASC, scheduled_at ASC, @@ -91,7 +92,7 @@ UPDATE SET state = 'running'::river_job_state, attempt = river_job.attempt + 1, - attempted_at = NOW(), + attempted_at = now(), attempted_by = array_append(river_job.attempted_by, $1::text) FROM locked_jobs diff --git a/internal/riverinternaltest/riverinternaltest.go b/internal/riverinternaltest/riverinternaltest.go index cc1edda4..9fd1f29d 100644 --- a/internal/riverinternaltest/riverinternaltest.go +++ b/internal/riverinternaltest/riverinternaltest.go @@ -26,6 +26,16 @@ import ( "github.com/riverqueue/river/internal/util/valutil" ) +// SchedulerShortInterval is an artificially short interval for the scheduler +// that's used in the tests of various components to make sure that errored jobs +// always end up in a `retryable` state rather than `available`. Normally, the +// job executor sets `available` if the retry delay is smaller than the +// scheduler's interval. To simplify things so errors are always `retryable`, +// this time is picked to be smaller than the any retry delay that the default +// retry policy will ever produce. It's shared so we can document/explain it all +// in one place. +const SchedulerShortInterval = 500 * time.Millisecond + var ( dbManager *testdb.Manager //nolint:gochecknoglobals diff --git a/job_executor.go b/job_executor.go index dc426b22..d873d4a8 100644 --- a/job_executor.go +++ b/job_executor.go @@ -122,6 +122,7 @@ type jobExecutor struct { ErrorHandler ErrorHandler InformProducerDoneFunc func(jobRow *rivertype.JobRow) JobRow *rivertype.JobRow + SchedulerInterval time.Duration WorkUnit workunit.WorkUnit // Meant to be used from within the job executor only. @@ -325,7 +326,19 @@ func (e *jobExecutor) reportError(ctx context.Context, res *jobExecutorResult) { nextRetryScheduledAt = (&DefaultClientRetryPolicy{}).NextRetry(e.JobRow) } - if err := e.Completer.JobSetStateIfRunning(e.stats, dbadapter.JobSetStateErrored(e.JobRow.ID, nextRetryScheduledAt, errData)); err != nil { + // Normally, errored jobs are set `retryable` for the future and it's the + // scheduler's job to set them back to `available` so they can be reworked. + // This isn't friendly for smaller retry times though because it means that + // effectively no retry time smaller than the scheduler's run interval is + // respected. Here, we offset that with a branch that makes jobs immediately + // `available` if their retry was smaller than the scheduler's run interval. + var params *dbadapter.JobSetStateIfRunningParams + if nextRetryScheduledAt.Sub(e.TimeNowUTC()) <= e.SchedulerInterval { + params = dbadapter.JobSetStateErrorAvailable(e.JobRow.ID, nextRetryScheduledAt, errData) + } else { + params = dbadapter.JobSetStateErrorRetryable(e.JobRow.ID, nextRetryScheduledAt, errData) + } + if err := e.Completer.JobSetStateIfRunning(e.stats, params); err != nil { e.Logger.ErrorContext(ctx, e.Name+": Failed to report error for job", logAttrs...) } } diff --git a/job_executor_test.go b/job_executor_test.go index 82446512..69b00b5c 100644 --- a/job_executor_test.go +++ b/job_executor_test.go @@ -190,6 +190,7 @@ func TestJobExecutor_Execute(t *testing.T) { ErrorHandler: bundle.errorHandler, InformProducerDoneFunc: func(job *rivertype.JobRow) {}, JobRow: bundle.jobRow, + SchedulerInterval: riverinternaltest.SchedulerShortInterval, WorkUnit: workUnitFactory.MakeUnit(bundle.jobRow), }) @@ -268,6 +269,45 @@ func TestJobExecutor_Execute(t *testing.T) { require.Equal(t, dbsqlc.JobStateRetryable, job.State) }) + t.Run("ErrorSetsJobAvailableBelowSchedulerIntervalThreshold", func(t *testing.T) { + t.Parallel() + + executor, bundle := setup(t) + + executor.SchedulerInterval = 3 * time.Second + + workerErr := errors.New("job error") + executor.WorkUnit = newWorkUnitFactoryWithCustomRetry(func() error { return workerErr }, nil).MakeUnit(bundle.jobRow) + + { + executor.Execute(ctx) + executor.Completer.Wait() + + job, err := queries.JobGetByID(ctx, bundle.tx, bundle.jobRow.ID) + require.NoError(t, err) + require.WithinDuration(t, executor.ClientRetryPolicy.NextRetry(bundle.jobRow), job.ScheduledAt, 1*time.Second) + require.Equal(t, dbsqlc.JobStateAvailable, job.State) + } + + _, err := queries.JobSetState(ctx, bundle.tx, dbsqlc.JobSetStateParams{ + ID: bundle.jobRow.ID, + State: dbsqlc.JobStateRunning, + }) + require.NoError(t, err) + + bundle.jobRow.Attempt = 2 + + { + executor.Execute(ctx) + executor.Completer.Wait() + + job, err := queries.JobGetByID(ctx, bundle.tx, bundle.jobRow.ID) + require.NoError(t, err) + require.WithinDuration(t, executor.ClientRetryPolicy.NextRetry(bundle.jobRow), job.ScheduledAt, 16*time.Second) + require.Equal(t, dbsqlc.JobStateRetryable, job.State) + } + }) + t.Run("ErrorDiscardsJobAfterTooManyAttempts", func(t *testing.T) { t.Parallel() diff --git a/producer.go b/producer.go index 8047b745..41ac946c 100644 --- a/producer.go +++ b/producer.go @@ -34,13 +34,14 @@ type producerConfig struct { // LISTEN/NOTIFY, but this provides a fallback. FetchPollInterval time.Duration - JobTimeout time.Duration - MaxWorkerCount uint16 - Notifier *notifier.Notifier - QueueName string - RetryPolicy ClientRetryPolicy - WorkerName string - Workers *Workers + JobTimeout time.Duration + MaxWorkerCount uint16 + Notifier *notifier.Notifier + QueueName string + RetryPolicy ClientRetryPolicy + SchedulerInterval time.Duration + WorkerName string + Workers *Workers } // producer manages a fleet of Workers up to a maximum size. It periodically fetches jobs @@ -105,6 +106,9 @@ func newProducer(archetype *baseservice.Archetype, adapter dbadapter.Adapter, co if config.RetryPolicy == nil { return nil, errors.New("RetryPolicy is required") } + if config.SchedulerInterval == 0 { + return nil, errors.New("SchedulerInterval is required") + } if config.WorkerName == "" { return nil, errors.New("WorkerName is required") } @@ -312,6 +316,7 @@ func (p *producer) startNewExecutors(workCtx context.Context, jobs []*rivertype. ErrorHandler: p.errorHandler, InformProducerDoneFunc: p.handleWorkerDone, JobRow: job, + SchedulerInterval: p.config.SchedulerInterval, WorkUnit: workUnit, }) p.addActiveJob(job.ID, executor) diff --git a/producer_test.go b/producer_test.go index 687393bc..81ea4ab7 100644 --- a/producer_test.go +++ b/producer_test.go @@ -15,6 +15,7 @@ import ( "github.com/riverqueue/river/internal/dbadapter" "github.com/riverqueue/river/internal/dbsqlc" "github.com/riverqueue/river/internal/jobcompleter" + "github.com/riverqueue/river/internal/maintenance" "github.com/riverqueue/river/internal/notifier" "github.com/riverqueue/river/internal/rivercommon" "github.com/riverqueue/river/internal/riverinternaltest" @@ -82,6 +83,7 @@ func Test_Producer_CanSafelyCompleteJobsWhileFetchingNewOnes(t *testing.T) { Notifier: notifier, QueueName: rivercommon.QueueDefault, RetryPolicy: &DefaultClientRetryPolicy{}, + SchedulerInterval: maintenance.SchedulerIntervalDefault, WorkerName: "fakeWorkerNameTODO", Workers: workers, } @@ -181,6 +183,7 @@ func Test_Producer_Run(t *testing.T) { Notifier: notifier, QueueName: rivercommon.QueueDefault, RetryPolicy: &DefaultClientRetryPolicy{}, + SchedulerInterval: riverinternaltest.SchedulerShortInterval, WorkerName: "fakeWorkerNameTODO", Workers: workers, }