diff --git a/internal/maintenance/job_cleaner.go b/internal/maintenance/job_cleaner.go index 0433e591..6d5a1e0d 100644 --- a/internal/maintenance/job_cleaner.go +++ b/internal/maintenance/job_cleaner.go @@ -152,7 +152,7 @@ func (s *JobCleaner) runOnce(ctx context.Context) (*jobCleanerRunOnceResult, err for { // Wrapped in a function so that defers run as expected. numDeleted, err := func() (int, error) { - ctx, cancelFunc := context.WithTimeout(ctx, 30*time.Second) + ctx, cancelFunc := context.WithTimeout(ctx, s.Config.JobCleanerTimeout) defer cancelFunc() numDeleted, err := s.exec.JobDeleteBefore(ctx, &riverdriver.JobDeleteBeforeParams{ diff --git a/internal/maintenance/job_cleaner_test.go b/internal/maintenance/job_cleaner_test.go index 3bf74a11..ee7ca330 100644 --- a/internal/maintenance/job_cleaner_test.go +++ b/internal/maintenance/job_cleaner_test.go @@ -65,6 +65,7 @@ func TestJobCleaner(t *testing.T) { require.Equal(t, CompletedJobRetentionPeriodDefault, cleaner.Config.CompletedJobRetentionPeriod) require.Equal(t, DiscardedJobRetentionPeriodDefault, cleaner.Config.DiscardedJobRetentionPeriod) require.Equal(t, JobCleanerIntervalDefault, cleaner.Config.Interval) + require.Equal(t, JobCleanerTimeoutDefault, cleaner.Config.JobCleanerTimeout) }) t.Run("StartStopStress", func(t *testing.T) { @@ -162,6 +163,41 @@ func TestJobCleaner(t *testing.T) { } }) + t.Run("DeletesCompletedJobsWithTimeout", func(t *testing.T) { + t.Parallel() + + cleaner, bundle := setup(t) + cleaner.Config.JobCleanerTimeout = 1 * time.Nanosecond + + // none of these get removed + testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateAvailable)}) + testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateRunning)}) + testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateScheduled)}) + + testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateCancelled), FinalizedAt: ptrutil.Ptr(bundle.cancelledDeleteHorizon.Add(-1 * time.Hour))}) + testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateCancelled), FinalizedAt: ptrutil.Ptr(bundle.cancelledDeleteHorizon.Add(-1 * time.Minute))}) + testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateCancelled), FinalizedAt: ptrutil.Ptr(bundle.cancelledDeleteHorizon.Add(1 * time.Minute))}) // won't be deleted + + testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateCompleted), FinalizedAt: ptrutil.Ptr(bundle.completedDeleteHorizon.Add(-1 * time.Hour))}) + testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateCompleted), FinalizedAt: ptrutil.Ptr(bundle.completedDeleteHorizon.Add(-1 * time.Minute))}) + testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateCompleted), FinalizedAt: ptrutil.Ptr(bundle.completedDeleteHorizon.Add(1 * time.Minute))}) // won't be deleted + + testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateDiscarded), FinalizedAt: ptrutil.Ptr(bundle.discardedDeleteHorizon.Add(-1 * time.Hour))}) + testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateDiscarded), FinalizedAt: ptrutil.Ptr(bundle.discardedDeleteHorizon.Add(-1 * time.Minute))}) + testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateDiscarded), FinalizedAt: ptrutil.Ptr(bundle.discardedDeleteHorizon.Add(1 * time.Minute))}) // won't be deleted + + require.NoError(t, cleaner.Start(ctx)) + + timeout := riversharedtest.WaitTimeout() + + select { + case <-cleaner.TestSignals.DeletedBatch.WaitC(): + t.Error("That supposed to have timed out") + case <-time.After(timeout): + t.Log("Expected timeout") + } + }) + t.Run("CustomizableInterval", func(t *testing.T) { t.Parallel()