diff --git a/CHANGELOG.md b/CHANGELOG.md index af3feb7c..3c418530 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Changed + +- Errored jobs that have a very short duration before their next retry (<5 seconds) are set to `available` immediately instead of being made `scheduled` and having to wait for the scheduler to make a pass to make them workable. [PR #105](https://github.com/riverqueue/river/pull/105). + ## [0.0.12] - 2023-12-02 ### Added diff --git a/client.go b/client.go index 343a8c39..a8c6cde7 100644 --- a/client.go +++ b/client.go @@ -167,6 +167,10 @@ type Config struct { // Test-only property that allows sleep statements to be disable. Only // functions in cases where the CancellableSleep helper is used to sleep. disableSleep bool + + // Scheduler run interval. Shared between the scheduler and producer/job + // executors, but not currently exposed for configuration. + schedulerInterval time.Duration } func (c *Config) validate() error { @@ -388,6 +392,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client RetryPolicy: retryPolicy, Workers: config.Workers, disableSleep: config.disableSleep, + schedulerInterval: valutil.ValOrDefault(config.schedulerInterval, maintenance.SchedulerIntervalDefault), } if err := config.validate(); err != nil { @@ -524,7 +529,9 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client } { - scheduler := maintenance.NewScheduler(archetype, &maintenance.SchedulerConfig{}, driver.GetDBPool()) + scheduler := maintenance.NewScheduler(archetype, &maintenance.SchedulerConfig{ + Interval: config.schedulerInterval, + }, driver.GetDBPool()) maintenanceServices = append(maintenanceServices, scheduler) client.testSignals.scheduler = &scheduler.TestSignals } @@ -898,6 +905,7 @@ func (c *Client[TTx]) provisionProducers() error { Notifier: c.notifier, QueueName: queue, RetryPolicy: c.config.RetryPolicy, + SchedulerInterval: c.config.schedulerInterval, WorkerName: c.id, Workers: c.config.Workers, } diff --git a/client_test.go b/client_test.go index 4effd849..d8bd56b1 100644 --- a/client_test.go +++ b/client_test.go @@ -127,6 +127,7 @@ func newTestConfig(t *testing.T, callback callbackFunc) *Config { Queues: map[string]QueueConfig{QueueDefault: {MaxWorkers: 50}}, Workers: workers, disableSleep: true, + schedulerInterval: riverinternaltest.SchedulerShortInterval, } } diff --git a/example_subscription_test.go b/example_subscription_test.go index d0ee2cc4..ea6da59b 100644 --- a/example_subscription_test.go +++ b/example_subscription_test.go @@ -129,7 +129,7 @@ func Example_subscription() { // Output: // Got job with state: completed - // Got job with state: retryable + // Got job with state: available // Got job with state: cancelled // Client stopped // Channel is closed diff --git a/internal/dbadapter/db_adapter.go b/internal/dbadapter/db_adapter.go index 50838bff..b24e8926 100644 --- a/internal/dbadapter/db_adapter.go +++ b/internal/dbadapter/db_adapter.go @@ -376,7 +376,11 @@ func JobSetStateDiscarded(id int64, finalizedAt time.Time, errData []byte) *JobS return &JobSetStateIfRunningParams{ID: id, errData: errData, finalizedAt: &finalizedAt, state: dbsqlc.JobStateDiscarded} } -func JobSetStateErrored(id int64, scheduledAt time.Time, errData []byte) *JobSetStateIfRunningParams { +func JobSetStateErrorAvailable(id int64, scheduledAt time.Time, errData []byte) *JobSetStateIfRunningParams { + return &JobSetStateIfRunningParams{ID: id, errData: errData, scheduledAt: &scheduledAt, state: dbsqlc.JobStateAvailable} +} + +func JobSetStateErrorRetryable(id int64, scheduledAt time.Time, errData []byte) *JobSetStateIfRunningParams { return &JobSetStateIfRunningParams{ID: id, errData: errData, scheduledAt: &scheduledAt, state: dbsqlc.JobStateRetryable} } diff --git a/internal/dbadapter/db_adapter_test.go b/internal/dbadapter/db_adapter_test.go index 145a7f6e..0eac3dec 100644 --- a/internal/dbadapter/db_adapter_test.go +++ b/internal/dbadapter/db_adapter_test.go @@ -17,9 +17,99 @@ import ( "github.com/riverqueue/river/internal/rivercommon" "github.com/riverqueue/river/internal/riverinternaltest" "github.com/riverqueue/river/internal/util/dbutil" + "github.com/riverqueue/river/internal/util/ptrutil" ) -func Test_StandardAdapter_Insert(t *testing.T) { +func Test_StandardAdapter_JobGetAvailable(t *testing.T) { + t.Parallel() + + ctx := context.Background() + + type testBundle struct { + baselineTime time.Time // baseline time frozen at now when setup is called + tx pgx.Tx + } + + setup := func(t *testing.T) (*StandardAdapter, *testBundle) { + t.Helper() + + bundle := &testBundle{ + baselineTime: time.Now(), + tx: riverinternaltest.TestTx(ctx, t), + } + + adapter := NewStandardAdapter(riverinternaltest.BaseServiceArchetype(t), testAdapterConfig(bundle.tx)) + adapter.TimeNowUTC = func() time.Time { return bundle.baselineTime } + + return adapter, bundle + } + + t.Run("Success", func(t *testing.T) { + t.Parallel() + + adapter, bundle := setup(t) + + _, err := adapter.JobInsertTx(ctx, bundle.tx, makeFakeJobInsertParams(0, nil)) + require.NoError(t, err) + + jobRows, err := adapter.JobGetAvailableTx(ctx, bundle.tx, rivercommon.QueueDefault, 100) + require.NoError(t, err) + require.Len(t, jobRows, 1) + + jobRow := jobRows[0] + require.Equal(t, []string{adapter.workerName}, jobRow.AttemptedBy) + }) + + t.Run("ConstrainedToLimit", func(t *testing.T) { + t.Parallel() + + adapter, bundle := setup(t) + + _, err := adapter.JobInsertTx(ctx, bundle.tx, makeFakeJobInsertParams(0, nil)) + require.NoError(t, err) + _, err = adapter.JobInsertTx(ctx, bundle.tx, makeFakeJobInsertParams(1, nil)) + require.NoError(t, err) + + // Two rows inserted but only one found because of the added limit. + jobRows, err := adapter.JobGetAvailableTx(ctx, bundle.tx, rivercommon.QueueDefault, 1) + require.NoError(t, err) + require.Len(t, jobRows, 1) + }) + + t.Run("ConstrainedToQueue", func(t *testing.T) { + t.Parallel() + + adapter, bundle := setup(t) + + _, err := adapter.JobInsertTx(ctx, bundle.tx, makeFakeJobInsertParams(0, &makeFakeJobInsertParamsOpts{ + Queue: ptrutil.Ptr("other-queue"), + })) + require.NoError(t, err) + + // Job is in a non-default queue so it's not found. + jobRows, err := adapter.JobGetAvailableTx(ctx, bundle.tx, rivercommon.QueueDefault, 1) + require.NoError(t, err) + require.Empty(t, jobRows) + }) + + t.Run("ConstrainedToScheduledAtBeforeNow", func(t *testing.T) { + t.Parallel() + + adapter, bundle := setup(t) + + _, err := adapter.JobInsertTx(ctx, bundle.tx, makeFakeJobInsertParams(0, &makeFakeJobInsertParamsOpts{ + ScheduledAt: ptrutil.Ptr(time.Now().Add(1 * time.Minute)), + })) + require.NoError(t, err) + + // Job is scheduled a while from now so it's not found. + jobRows, err := adapter.JobGetAvailableTx(ctx, bundle.tx, rivercommon.QueueDefault, 1) + require.NoError(t, err) + require.Empty(t, jobRows) + }) +} + +func Test_StandardAdapter_JobInsert(t *testing.T) { t.Parallel() ctx := context.Background() @@ -53,7 +143,7 @@ func Test_StandardAdapter_Insert(t *testing.T) { adapter, _ := setupTx(t) - insertParams := makeFakeJobInsertParams(0) + insertParams := makeFakeJobInsertParams(0, nil) _, err := adapter.JobInsert(ctx, insertParams) require.NoError(t, err) }) @@ -65,7 +155,7 @@ func Test_StandardAdapter_Insert(t *testing.T) { const maxJobsToFetch = 8 - res, err := adapter.JobInsert(ctx, makeFakeJobInsertParams(0)) + res, err := adapter.JobInsert(ctx, makeFakeJobInsertParams(0, nil)) require.NoError(t, err) require.NotEqual(t, 0, res.Job.ID, "expected job ID to be set, got %d", res.Job.ID) require.WithinDuration(t, time.Now(), res.Job.ScheduledAt, 1*time.Second) @@ -78,7 +168,7 @@ func Test_StandardAdapter_Insert(t *testing.T) { "expected selected job to be in running state, got %q", jobs[0].State) for i := 1; i < 10; i++ { - _, err := adapter.JobInsert(ctx, makeFakeJobInsertParams(i)) + _, err := adapter.JobInsert(ctx, makeFakeJobInsertParams(i, nil)) require.NoError(t, err) } @@ -102,7 +192,7 @@ func Test_StandardAdapter_Insert(t *testing.T) { adapter, _ := setupTx(t) - insertParams := makeFakeJobInsertParams(0) + insertParams := makeFakeJobInsertParams(0, nil) insertParams.Unique = true insertParams.UniqueByArgs = true @@ -133,7 +223,7 @@ func Test_StandardAdapter_Insert(t *testing.T) { adapter, bundle := setupTx(t) - insertParams := makeFakeJobInsertParams(0) + insertParams := makeFakeJobInsertParams(0, nil) insertParams.Unique = true insertParams.UniqueByPeriod = 15 * time.Minute @@ -164,7 +254,7 @@ func Test_StandardAdapter_Insert(t *testing.T) { adapter, _ := setupTx(t) - insertParams := makeFakeJobInsertParams(0) + insertParams := makeFakeJobInsertParams(0, nil) insertParams.Unique = true insertParams.UniqueByQueue = true @@ -195,7 +285,7 @@ func Test_StandardAdapter_Insert(t *testing.T) { adapter, bundle := setupTx(t) - insertParams := makeFakeJobInsertParams(0) + insertParams := makeFakeJobInsertParams(0, nil) insertParams.Unique = true insertParams.UniqueByState = []dbsqlc.JobState{dbsqlc.JobStateAvailable, dbsqlc.JobStateRunning} @@ -246,7 +336,7 @@ func Test_StandardAdapter_Insert(t *testing.T) { adapter, bundle := setupTx(t) - insertParams := makeFakeJobInsertParams(0) + insertParams := makeFakeJobInsertParams(0, nil) insertParams.Unique = true insertParams.UniqueByQueue = true @@ -306,7 +396,7 @@ func Test_StandardAdapter_Insert(t *testing.T) { adapter, bundle := setupTx(t) - insertParams := makeFakeJobInsertParams(0) + insertParams := makeFakeJobInsertParams(0, nil) insertParams.Unique = true insertParams.UniqueByArgs = true insertParams.UniqueByPeriod = 15 * time.Minute @@ -388,7 +478,7 @@ func Test_StandardAdapter_Insert(t *testing.T) { adapter, bundle := setup(t, riverinternaltest.TestDB(ctx, t)) - insertParams := makeFakeJobInsertParams(0) + insertParams := makeFakeJobInsertParams(0, nil) insertParams.Unique = true insertParams.UniqueByPeriod = 15 * time.Minute @@ -442,7 +532,7 @@ func Test_Adapter_JobInsertMany(t *testing.T) { insertParams := make([]*JobInsertParams, 10) for i := 0; i < len(insertParams); i++ { - insertParams[i] = makeFakeJobInsertParams(i) + insertParams[i] = makeFakeJobInsertParams(i, nil) } count, err := adapter.JobInsertMany(ctx, insertParams) @@ -467,7 +557,7 @@ func Test_StandardAdapter_FetchIsPrioritized(t *testing.T) { for i := 3; i > 0; i-- { // Insert jobs with decreasing priority numbers (3, 2, 1) which means increasing priority. - insertParams := makeFakeJobInsertParams(i) + insertParams := makeFakeJobInsertParams(i, nil) insertParams.Priority = i _, err := adapter.JobInsert(ctx, insertParams) require.NoError(t, err) @@ -529,7 +619,7 @@ func Test_StandardAdapter_JobSetStateCompleted(t *testing.T) { adapter, bundle := setupTx(t) - params := makeFakeJobInsertParams(0) + params := makeFakeJobInsertParams(0, nil) params.State = dbsqlc.JobStateRunning res, err := adapter.JobInsert(ctx, params) require.NoError(t, err) @@ -550,7 +640,7 @@ func Test_StandardAdapter_JobSetStateCompleted(t *testing.T) { adapter, bundle := setupTx(t) - params := makeFakeJobInsertParams(0) + params := makeFakeJobInsertParams(0, nil) params.State = dbsqlc.JobStateRetryable res, err := adapter.JobInsert(ctx, params) require.NoError(t, err) @@ -610,13 +700,13 @@ func Test_StandardAdapter_JobSetStateErrored(t *testing.T) { adapter, bundle := setupTx(t) - params := makeFakeJobInsertParams(0) + params := makeFakeJobInsertParams(0, nil) params.State = dbsqlc.JobStateRunning res, err := adapter.JobInsert(ctx, params) require.NoError(t, err) require.Equal(t, dbsqlc.JobStateRunning, res.Job.State) - jAfter, err := adapter.JobSetStateIfRunning(ctx, JobSetStateErrored(res.Job.ID, bundle.baselineTime, bundle.errPayload)) + jAfter, err := adapter.JobSetStateIfRunning(ctx, JobSetStateErrorRetryable(res.Job.ID, bundle.baselineTime, bundle.errPayload)) require.NoError(t, err) require.Equal(t, dbsqlc.JobStateRetryable, jAfter.State) require.WithinDuration(t, bundle.baselineTime, jAfter.ScheduledAt, time.Microsecond) @@ -638,14 +728,14 @@ func Test_StandardAdapter_JobSetStateErrored(t *testing.T) { adapter, bundle := setupTx(t) - params := makeFakeJobInsertParams(0) + params := makeFakeJobInsertParams(0, nil) params.State = dbsqlc.JobStateRetryable params.ScheduledAt = bundle.baselineTime.Add(10 * time.Second) res, err := adapter.JobInsert(ctx, params) require.NoError(t, err) require.Equal(t, dbsqlc.JobStateRetryable, res.Job.State) - jAfter, err := adapter.JobSetStateIfRunning(ctx, JobSetStateErrored(res.Job.ID, bundle.baselineTime, bundle.errPayload)) + jAfter, err := adapter.JobSetStateIfRunning(ctx, JobSetStateErrorRetryable(res.Job.ID, bundle.baselineTime, bundle.errPayload)) require.NoError(t, err) require.Equal(t, dbsqlc.JobStateRetryable, jAfter.State) require.WithinDuration(t, params.ScheduledAt, jAfter.ScheduledAt, time.Microsecond) @@ -687,7 +777,7 @@ func Benchmark_StandardAdapter_Insert(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - if _, err := adapter.JobInsert(ctx, makeFakeJobInsertParams(i)); err != nil { + if _, err := adapter.JobInsert(ctx, makeFakeJobInsertParams(i, nil)); err != nil { b.Fatal(err) } } @@ -706,7 +796,7 @@ func Benchmark_StandardAdapter_Insert_Parallelized(b *testing.B) { b.RunParallel(func(pb *testing.PB) { i := 0 for pb.Next() { - if _, err := adapter.JobInsert(ctx, makeFakeJobInsertParams(i)); err != nil { + if _, err := adapter.JobInsert(ctx, makeFakeJobInsertParams(i, nil)); err != nil { b.Fatal(err) } i++ @@ -725,7 +815,7 @@ func Benchmark_StandardAdapter_Fetch_100(b *testing.B) { adapter := NewStandardAdapter(riverinternaltest.BaseServiceArchetype(b), testAdapterConfig(dbPool)) for i := 0; i < b.N*100; i++ { - insertParams := makeFakeJobInsertParams(i) + insertParams := makeFakeJobInsertParams(i, nil) if _, err := adapter.JobInsert(ctx, insertParams); err != nil { b.Fatal(err) } @@ -750,7 +840,7 @@ func Benchmark_StandardAdapter_Fetch_100_Parallelized(b *testing.B) { adapter := NewStandardAdapter(riverinternaltest.BaseServiceArchetype(b), testAdapterConfig(dbPool)) for i := 0; i < b.N*100*runtime.NumCPU(); i++ { - insertParams := makeFakeJobInsertParams(i) + insertParams := makeFakeJobInsertParams(i, nil) if _, err := adapter.JobInsert(ctx, insertParams); err != nil { b.Fatal(err) } @@ -775,14 +865,24 @@ func testAdapterConfig(ex dbutil.Executor) *StandardAdapterConfig { } } -func makeFakeJobInsertParams(i int) *JobInsertParams { +type makeFakeJobInsertParamsOpts struct { + Queue *string + ScheduledAt *time.Time +} + +func makeFakeJobInsertParams(i int, opts *makeFakeJobInsertParamsOpts) *JobInsertParams { + if opts == nil { + opts = &makeFakeJobInsertParamsOpts{} + } + return &JobInsertParams{ EncodedArgs: []byte(fmt.Sprintf(`{"job_num":%d}`, i)), Kind: "fake_job", MaxAttempts: rivercommon.MaxAttemptsDefault, Metadata: []byte("{}"), Priority: rivercommon.PriorityDefault, - Queue: rivercommon.QueueDefault, + Queue: ptrutil.ValOrDefault(opts.Queue, rivercommon.QueueDefault), + ScheduledAt: ptrutil.ValOrDefault(opts.ScheduledAt, time.Time{}), State: dbsqlc.JobStateAvailable, } } diff --git a/internal/dbsqlc/river_job.sql b/internal/dbsqlc/river_job.sql index 814cadfb..a8352144 100644 --- a/internal/dbsqlc/river_job.sql +++ b/internal/dbsqlc/river_job.sql @@ -72,6 +72,7 @@ WITH locked_jobs AS ( WHERE state = 'available'::river_job_state AND queue = @queue::text + AND scheduled_at <= now() ORDER BY priority ASC, scheduled_at ASC, @@ -84,7 +85,7 @@ UPDATE SET state = 'running'::river_job_state, attempt = river_job.attempt + 1, - attempted_at = NOW(), + attempted_at = now(), attempted_by = array_append(river_job.attempted_by, @worker::text) FROM locked_jobs diff --git a/internal/dbsqlc/river_job.sql.go b/internal/dbsqlc/river_job.sql.go index 83edf3c3..5ea259f3 100644 --- a/internal/dbsqlc/river_job.sql.go +++ b/internal/dbsqlc/river_job.sql.go @@ -79,6 +79,7 @@ WITH locked_jobs AS ( WHERE state = 'available'::river_job_state AND queue = $2::text + AND scheduled_at <= now() ORDER BY priority ASC, scheduled_at ASC, @@ -91,7 +92,7 @@ UPDATE SET state = 'running'::river_job_state, attempt = river_job.attempt + 1, - attempted_at = NOW(), + attempted_at = now(), attempted_by = array_append(river_job.attempted_by, $1::text) FROM locked_jobs diff --git a/internal/riverinternaltest/riverinternaltest.go b/internal/riverinternaltest/riverinternaltest.go index cc1edda4..9fd1f29d 100644 --- a/internal/riverinternaltest/riverinternaltest.go +++ b/internal/riverinternaltest/riverinternaltest.go @@ -26,6 +26,16 @@ import ( "github.com/riverqueue/river/internal/util/valutil" ) +// SchedulerShortInterval is an artificially short interval for the scheduler +// that's used in the tests of various components to make sure that errored jobs +// always end up in a `retryable` state rather than `available`. Normally, the +// job executor sets `available` if the retry delay is smaller than the +// scheduler's interval. To simplify things so errors are always `retryable`, +// this time is picked to be smaller than the any retry delay that the default +// retry policy will ever produce. It's shared so we can document/explain it all +// in one place. +const SchedulerShortInterval = 500 * time.Millisecond + var ( dbManager *testdb.Manager //nolint:gochecknoglobals diff --git a/job_executor.go b/job_executor.go index dc426b22..d873d4a8 100644 --- a/job_executor.go +++ b/job_executor.go @@ -122,6 +122,7 @@ type jobExecutor struct { ErrorHandler ErrorHandler InformProducerDoneFunc func(jobRow *rivertype.JobRow) JobRow *rivertype.JobRow + SchedulerInterval time.Duration WorkUnit workunit.WorkUnit // Meant to be used from within the job executor only. @@ -325,7 +326,19 @@ func (e *jobExecutor) reportError(ctx context.Context, res *jobExecutorResult) { nextRetryScheduledAt = (&DefaultClientRetryPolicy{}).NextRetry(e.JobRow) } - if err := e.Completer.JobSetStateIfRunning(e.stats, dbadapter.JobSetStateErrored(e.JobRow.ID, nextRetryScheduledAt, errData)); err != nil { + // Normally, errored jobs are set `retryable` for the future and it's the + // scheduler's job to set them back to `available` so they can be reworked. + // This isn't friendly for smaller retry times though because it means that + // effectively no retry time smaller than the scheduler's run interval is + // respected. Here, we offset that with a branch that makes jobs immediately + // `available` if their retry was smaller than the scheduler's run interval. + var params *dbadapter.JobSetStateIfRunningParams + if nextRetryScheduledAt.Sub(e.TimeNowUTC()) <= e.SchedulerInterval { + params = dbadapter.JobSetStateErrorAvailable(e.JobRow.ID, nextRetryScheduledAt, errData) + } else { + params = dbadapter.JobSetStateErrorRetryable(e.JobRow.ID, nextRetryScheduledAt, errData) + } + if err := e.Completer.JobSetStateIfRunning(e.stats, params); err != nil { e.Logger.ErrorContext(ctx, e.Name+": Failed to report error for job", logAttrs...) } } diff --git a/job_executor_test.go b/job_executor_test.go index 82446512..69b00b5c 100644 --- a/job_executor_test.go +++ b/job_executor_test.go @@ -190,6 +190,7 @@ func TestJobExecutor_Execute(t *testing.T) { ErrorHandler: bundle.errorHandler, InformProducerDoneFunc: func(job *rivertype.JobRow) {}, JobRow: bundle.jobRow, + SchedulerInterval: riverinternaltest.SchedulerShortInterval, WorkUnit: workUnitFactory.MakeUnit(bundle.jobRow), }) @@ -268,6 +269,45 @@ func TestJobExecutor_Execute(t *testing.T) { require.Equal(t, dbsqlc.JobStateRetryable, job.State) }) + t.Run("ErrorSetsJobAvailableBelowSchedulerIntervalThreshold", func(t *testing.T) { + t.Parallel() + + executor, bundle := setup(t) + + executor.SchedulerInterval = 3 * time.Second + + workerErr := errors.New("job error") + executor.WorkUnit = newWorkUnitFactoryWithCustomRetry(func() error { return workerErr }, nil).MakeUnit(bundle.jobRow) + + { + executor.Execute(ctx) + executor.Completer.Wait() + + job, err := queries.JobGetByID(ctx, bundle.tx, bundle.jobRow.ID) + require.NoError(t, err) + require.WithinDuration(t, executor.ClientRetryPolicy.NextRetry(bundle.jobRow), job.ScheduledAt, 1*time.Second) + require.Equal(t, dbsqlc.JobStateAvailable, job.State) + } + + _, err := queries.JobSetState(ctx, bundle.tx, dbsqlc.JobSetStateParams{ + ID: bundle.jobRow.ID, + State: dbsqlc.JobStateRunning, + }) + require.NoError(t, err) + + bundle.jobRow.Attempt = 2 + + { + executor.Execute(ctx) + executor.Completer.Wait() + + job, err := queries.JobGetByID(ctx, bundle.tx, bundle.jobRow.ID) + require.NoError(t, err) + require.WithinDuration(t, executor.ClientRetryPolicy.NextRetry(bundle.jobRow), job.ScheduledAt, 16*time.Second) + require.Equal(t, dbsqlc.JobStateRetryable, job.State) + } + }) + t.Run("ErrorDiscardsJobAfterTooManyAttempts", func(t *testing.T) { t.Parallel() diff --git a/producer.go b/producer.go index 8047b745..41ac946c 100644 --- a/producer.go +++ b/producer.go @@ -34,13 +34,14 @@ type producerConfig struct { // LISTEN/NOTIFY, but this provides a fallback. FetchPollInterval time.Duration - JobTimeout time.Duration - MaxWorkerCount uint16 - Notifier *notifier.Notifier - QueueName string - RetryPolicy ClientRetryPolicy - WorkerName string - Workers *Workers + JobTimeout time.Duration + MaxWorkerCount uint16 + Notifier *notifier.Notifier + QueueName string + RetryPolicy ClientRetryPolicy + SchedulerInterval time.Duration + WorkerName string + Workers *Workers } // producer manages a fleet of Workers up to a maximum size. It periodically fetches jobs @@ -105,6 +106,9 @@ func newProducer(archetype *baseservice.Archetype, adapter dbadapter.Adapter, co if config.RetryPolicy == nil { return nil, errors.New("RetryPolicy is required") } + if config.SchedulerInterval == 0 { + return nil, errors.New("SchedulerInterval is required") + } if config.WorkerName == "" { return nil, errors.New("WorkerName is required") } @@ -312,6 +316,7 @@ func (p *producer) startNewExecutors(workCtx context.Context, jobs []*rivertype. ErrorHandler: p.errorHandler, InformProducerDoneFunc: p.handleWorkerDone, JobRow: job, + SchedulerInterval: p.config.SchedulerInterval, WorkUnit: workUnit, }) p.addActiveJob(job.ID, executor) diff --git a/producer_test.go b/producer_test.go index 687393bc..81ea4ab7 100644 --- a/producer_test.go +++ b/producer_test.go @@ -15,6 +15,7 @@ import ( "github.com/riverqueue/river/internal/dbadapter" "github.com/riverqueue/river/internal/dbsqlc" "github.com/riverqueue/river/internal/jobcompleter" + "github.com/riverqueue/river/internal/maintenance" "github.com/riverqueue/river/internal/notifier" "github.com/riverqueue/river/internal/rivercommon" "github.com/riverqueue/river/internal/riverinternaltest" @@ -82,6 +83,7 @@ func Test_Producer_CanSafelyCompleteJobsWhileFetchingNewOnes(t *testing.T) { Notifier: notifier, QueueName: rivercommon.QueueDefault, RetryPolicy: &DefaultClientRetryPolicy{}, + SchedulerInterval: maintenance.SchedulerIntervalDefault, WorkerName: "fakeWorkerNameTODO", Workers: workers, } @@ -181,6 +183,7 @@ func Test_Producer_Run(t *testing.T) { Notifier: notifier, QueueName: rivercommon.QueueDefault, RetryPolicy: &DefaultClientRetryPolicy{}, + SchedulerInterval: riverinternaltest.SchedulerShortInterval, WorkerName: "fakeWorkerNameTODO", Workers: workers, }