diff --git a/client.go b/client.go index dcb3527f..3e883e54 100644 --- a/client.go +++ b/client.go @@ -63,23 +63,23 @@ type Config struct { // internally conflicting River-generated keys more likely. AdvisoryLockPrefix int32 - // CancelledJobRetentionTime is the amount of time to keep cancelled jobs + // CancelledJobRetentionPeriod is the amount of time to keep cancelled jobs // around before they're removed permanently. // // Defaults to 24 hours. - CancelledJobRetentionTime time.Duration + CancelledJobRetentionPeriod time.Duration - // CompletedJobRetentionTime is the amount of time to keep completed jobs + // CompletedJobRetentionPeriod is the amount of time to keep completed jobs // around before they're removed permanently. // // Defaults to 24 hours. - CompletedJobRetentionTime time.Duration + CompletedJobRetentionPeriod time.Duration - // DiscardedJobRetentionTime is the amount of time to keep cancelled jobs + // DiscardedJobRetentionPeriod is the amount of time to keep cancelled jobs // around before they're removed permanently. // // Defaults to 7 days. - DiscardedJobRetentionTime time.Duration + DiscardedJobRetentionPeriod time.Duration // ErrorHandler can be configured to be invoked in case of an error or panic // occurring in a job. This is often useful for logging and exception @@ -173,14 +173,14 @@ type Config struct { } func (c *Config) validate() error { - if c.CancelledJobRetentionTime < 0 { - return fmt.Errorf("CancelledJobRetentionTime time cannot be less than zero") + if c.CancelledJobRetentionPeriod < 0 { + return fmt.Errorf("CancelledJobRetentionPeriod time cannot be less than zero") } - if c.CompletedJobRetentionTime < 0 { - return fmt.Errorf("CompletedJobRetentionTime cannot be less than zero") + if c.CompletedJobRetentionPeriod < 0 { + return fmt.Errorf("CompletedJobRetentionPeriod cannot be less than zero") } - if c.DiscardedJobRetentionTime < 0 { - return fmt.Errorf("DiscardedJobRetentionTime cannot be less than zero") + if c.DiscardedJobRetentionPeriod < 0 { + return fmt.Errorf("DiscardedJobRetentionPeriod cannot be less than zero") } if c.FetchCooldown < MinFetchCooldown { return fmt.Errorf("FetchCooldown must be at least %s", MinFetchCooldown) @@ -377,23 +377,23 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client // original object, so everything that we care about must be initialized // here, even if it's only carrying over the original value. config = &Config{ - AdvisoryLockPrefix: config.AdvisoryLockPrefix, - CancelledJobRetentionTime: valutil.ValOrDefault(config.CancelledJobRetentionTime, maintenance.DefaultCancelledJobRetentionTime), - CompletedJobRetentionTime: valutil.ValOrDefault(config.CompletedJobRetentionTime, maintenance.DefaultCompletedJobRetentionTime), - DiscardedJobRetentionTime: valutil.ValOrDefault(config.DiscardedJobRetentionTime, maintenance.DefaultDiscardedJobRetentionTime), - ErrorHandler: config.ErrorHandler, - FetchCooldown: valutil.ValOrDefault(config.FetchCooldown, DefaultFetchCooldown), - FetchPollInterval: valutil.ValOrDefault(config.FetchPollInterval, DefaultFetchPollInterval), - JobTimeout: valutil.ValOrDefault(config.JobTimeout, DefaultJobTimeout), - Logger: logger, - PeriodicJobs: config.PeriodicJobs, - Queues: config.Queues, - ReindexerSchedule: config.ReindexerSchedule, - RescueStuckJobsAfter: valutil.ValOrDefault(config.RescueStuckJobsAfter, rescueAfter), - RetryPolicy: retryPolicy, - Schema: valutil.ValOrDefault(config.Schema, DefaultSchema), - Workers: config.Workers, - disableSleep: config.disableSleep, + AdvisoryLockPrefix: config.AdvisoryLockPrefix, + CancelledJobRetentionPeriod: valutil.ValOrDefault(config.CancelledJobRetentionPeriod, maintenance.DefaultCancelledJobRetentionPeriod), + CompletedJobRetentionPeriod: valutil.ValOrDefault(config.CompletedJobRetentionPeriod, maintenance.DefaultCompletedJobRetentionPeriod), + DiscardedJobRetentionPeriod: valutil.ValOrDefault(config.DiscardedJobRetentionPeriod, maintenance.DefaultDiscardedJobRetentionPeriod), + ErrorHandler: config.ErrorHandler, + FetchCooldown: valutil.ValOrDefault(config.FetchCooldown, DefaultFetchCooldown), + FetchPollInterval: valutil.ValOrDefault(config.FetchPollInterval, DefaultFetchPollInterval), + JobTimeout: valutil.ValOrDefault(config.JobTimeout, DefaultJobTimeout), + Logger: logger, + PeriodicJobs: config.PeriodicJobs, + Queues: config.Queues, + ReindexerSchedule: config.ReindexerSchedule, + RescueStuckJobsAfter: valutil.ValOrDefault(config.RescueStuckJobsAfter, rescueAfter), + RetryPolicy: retryPolicy, + Schema: valutil.ValOrDefault(config.Schema, DefaultSchema), + Workers: config.Workers, + disableSleep: config.disableSleep, } if err := config.validate(); err != nil { @@ -464,9 +464,9 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client { jobCleaner := maintenance.NewJobCleaner(archetype, &maintenance.JobCleanerConfig{ - CancelledJobRetentionTime: config.CancelledJobRetentionTime, - CompletedJobRetentionTime: config.CompletedJobRetentionTime, - DiscardedJobRetentionTime: config.DiscardedJobRetentionTime, + CancelledJobRetentionPeriod: config.CancelledJobRetentionPeriod, + CompletedJobRetentionPeriod: config.CompletedJobRetentionPeriod, + DiscardedJobRetentionPeriod: config.DiscardedJobRetentionPeriod, }, driver.GetDBPool()) maintenanceServices = append(maintenanceServices, jobCleaner) client.testSignals.jobCleaner = &jobCleaner.TestSignals diff --git a/client_test.go b/client_test.go index 4d146e72..41709d37 100644 --- a/client_test.go +++ b/client_test.go @@ -1000,13 +1000,13 @@ func Test_Client_Maintenance(t *testing.T) { t.Parallel() config := newTestConfig(t, nil) - config.CancelledJobRetentionTime = 1 * time.Hour - config.CompletedJobRetentionTime = 1 * time.Hour - config.DiscardedJobRetentionTime = 1 * time.Hour + config.CancelledJobRetentionPeriod = 1 * time.Hour + config.CompletedJobRetentionPeriod = 1 * time.Hour + config.DiscardedJobRetentionPeriod = 1 * time.Hour client := newTestClient(ctx, t, config) - deleteHorizon := time.Now().Add(-config.CompletedJobRetentionTime) + deleteHorizon := time.Now().Add(-config.CompletedJobRetentionPeriod) // Take care to insert jobs before starting the client because otherwise // there's a race condition where the cleaner could run its initial @@ -1988,9 +1988,9 @@ func Test_NewClient_Defaults(t *testing.T) { require.Zero(t, client.adapter.(*dbadapter.StandardAdapter).Config.AdvisoryLockPrefix) //nolint:forcetypeassert jobCleaner := maintenance.GetService[*maintenance.JobCleaner](client.queueMaintainer) - require.Equal(t, maintenance.DefaultCancelledJobRetentionTime, jobCleaner.Config.CancelledJobRetentionTime) - require.Equal(t, maintenance.DefaultCompletedJobRetentionTime, jobCleaner.Config.CompletedJobRetentionTime) - require.Equal(t, maintenance.DefaultDiscardedJobRetentionTime, jobCleaner.Config.DiscardedJobRetentionTime) + require.Equal(t, maintenance.DefaultCancelledJobRetentionPeriod, jobCleaner.Config.CancelledJobRetentionPeriod) + require.Equal(t, maintenance.DefaultCompletedJobRetentionPeriod, jobCleaner.Config.CompletedJobRetentionPeriod) + require.Equal(t, maintenance.DefaultDiscardedJobRetentionPeriod, jobCleaner.Config.DiscardedJobRetentionPeriod) require.Nil(t, client.config.ErrorHandler) require.Equal(t, DefaultFetchCooldown, client.config.FetchCooldown) @@ -2018,29 +2018,29 @@ func Test_NewClient_Overrides(t *testing.T) { retryPolicy := &DefaultClientRetryPolicy{} client, err := NewClient(riverpgxv5.New(dbPool), &Config{ - AdvisoryLockPrefix: 123_456, - CancelledJobRetentionTime: 1 * time.Hour, - CompletedJobRetentionTime: 2 * time.Hour, - DiscardedJobRetentionTime: 3 * time.Hour, - ErrorHandler: errorHandler, - FetchCooldown: 123 * time.Millisecond, - FetchPollInterval: 124 * time.Millisecond, - JobTimeout: 125 * time.Millisecond, - Logger: logger, - Queues: map[string]QueueConfig{DefaultQueue: {MaxWorkers: 1}}, - RetryPolicy: retryPolicy, - Schema: "custom_schema", - Workers: workers, - disableSleep: true, + AdvisoryLockPrefix: 123_456, + CancelledJobRetentionPeriod: 1 * time.Hour, + CompletedJobRetentionPeriod: 2 * time.Hour, + DiscardedJobRetentionPeriod: 3 * time.Hour, + ErrorHandler: errorHandler, + FetchCooldown: 123 * time.Millisecond, + FetchPollInterval: 124 * time.Millisecond, + JobTimeout: 125 * time.Millisecond, + Logger: logger, + Queues: map[string]QueueConfig{DefaultQueue: {MaxWorkers: 1}}, + RetryPolicy: retryPolicy, + Schema: "custom_schema", + Workers: workers, + disableSleep: true, }) require.NoError(t, err) require.Equal(t, int32(123_456), client.adapter.(*dbadapter.StandardAdapter).Config.AdvisoryLockPrefix) //nolint:forcetypeassert jobCleaner := maintenance.GetService[*maintenance.JobCleaner](client.queueMaintainer) - require.Equal(t, 1*time.Hour, jobCleaner.Config.CancelledJobRetentionTime) - require.Equal(t, 2*time.Hour, jobCleaner.Config.CompletedJobRetentionTime) - require.Equal(t, 3*time.Hour, jobCleaner.Config.DiscardedJobRetentionTime) + require.Equal(t, 1*time.Hour, jobCleaner.Config.CancelledJobRetentionPeriod) + require.Equal(t, 2*time.Hour, jobCleaner.Config.CompletedJobRetentionPeriod) + require.Equal(t, 3*time.Hour, jobCleaner.Config.DiscardedJobRetentionPeriod) require.Equal(t, errorHandler, client.config.ErrorHandler) require.Equal(t, 123*time.Millisecond, client.config.FetchCooldown) @@ -2081,9 +2081,9 @@ func Test_NewClient_Validations(t *testing.T) { validateResult func(*testing.T, *Client[pgx.Tx]) }{ { - name: "CompletedJobRetentionTime cannot be less than zero", - configFunc: func(config *Config) { config.CompletedJobRetentionTime = -1 * time.Second }, - wantErr: errors.New("CompletedJobRetentionTime cannot be less than zero"), + name: "CompletedJobRetentionPeriod cannot be less than zero", + configFunc: func(config *Config) { config.CompletedJobRetentionPeriod = -1 * time.Second }, + wantErr: errors.New("CompletedJobRetentionPeriod cannot be less than zero"), }, { name: "FetchCooldown cannot be less than MinFetchCooldown", diff --git a/internal/maintenance/job_cleaner.go b/internal/maintenance/job_cleaner.go index 4f55cb3c..06bb4db4 100644 --- a/internal/maintenance/job_cleaner.go +++ b/internal/maintenance/job_cleaner.go @@ -17,10 +17,10 @@ import ( ) const ( - DefaultCancelledJobRetentionTime = 24 * time.Hour - DefaultCompletedJobRetentionTime = 24 * time.Hour - DefaultDiscardedJobRetentionTime = 7 * 24 * time.Hour - DefaultJobCleanerInterval = 30 * time.Second + DefaultCancelledJobRetentionPeriod = 24 * time.Hour + DefaultCompletedJobRetentionPeriod = 24 * time.Hour + DefaultDiscardedJobRetentionPeriod = 7 * 24 * time.Hour + DefaultJobCleanerInterval = 30 * time.Second ) // Test-only properties. @@ -33,31 +33,31 @@ func (ts *JobCleanerTestSignals) Init() { } type JobCleanerConfig struct { - // CancelledJobRetentionTime is the amount of time to keep cancelled jobs + // CancelledJobRetentionPeriod is the amount of time to keep cancelled jobs // around before they're removed permanently. - CancelledJobRetentionTime time.Duration + CancelledJobRetentionPeriod time.Duration - // CompletedJobRetentionTime is the amount of time to keep completed jobs + // CompletedJobRetentionPeriod is the amount of time to keep completed jobs // around before they're removed permanently. - CompletedJobRetentionTime time.Duration + CompletedJobRetentionPeriod time.Duration - // DiscardedJobRetentionTime is the amount of time to keep cancelled jobs + // DiscardedJobRetentionPeriod is the amount of time to keep cancelled jobs // around before they're removed permanently. - DiscardedJobRetentionTime time.Duration + DiscardedJobRetentionPeriod time.Duration // Interval is the amount of time to wait between runs of the cleaner. Interval time.Duration } func (c *JobCleanerConfig) mustValidate() *JobCleanerConfig { - if c.CancelledJobRetentionTime <= 0 { - panic("JobCleanerConfig.CancelledJobRetentionTime must be above zero") + if c.CancelledJobRetentionPeriod <= 0 { + panic("JobCleanerConfig.CancelledJobRetentionPeriod must be above zero") } - if c.CompletedJobRetentionTime <= 0 { - panic("JobCleanerConfig.CompletedJobRetentionTime must be above zero") + if c.CompletedJobRetentionPeriod <= 0 { + panic("JobCleanerConfig.CompletedJobRetentionPeriod must be above zero") } - if c.DiscardedJobRetentionTime <= 0 { - panic("JobCleanerConfig.DiscardedJobRetentionTime must be above zero") + if c.DiscardedJobRetentionPeriod <= 0 { + panic("JobCleanerConfig.DiscardedJobRetentionPeriod must be above zero") } if c.Interval <= 0 { panic("JobCleanerConfig.Interval must be above zero") @@ -84,10 +84,10 @@ type JobCleaner struct { func NewJobCleaner(archetype *baseservice.Archetype, config *JobCleanerConfig, executor dbutil.Executor) *JobCleaner { return baseservice.Init(archetype, &JobCleaner{ Config: (&JobCleanerConfig{ - CancelledJobRetentionTime: valutil.ValOrDefault(config.CancelledJobRetentionTime, DefaultCancelledJobRetentionTime), - CompletedJobRetentionTime: valutil.ValOrDefault(config.CompletedJobRetentionTime, DefaultCompletedJobRetentionTime), - DiscardedJobRetentionTime: valutil.ValOrDefault(config.DiscardedJobRetentionTime, DefaultDiscardedJobRetentionTime), - Interval: valutil.ValOrDefault(config.Interval, DefaultJobCleanerInterval), + CancelledJobRetentionPeriod: valutil.ValOrDefault(config.CancelledJobRetentionPeriod, DefaultCancelledJobRetentionPeriod), + CompletedJobRetentionPeriod: valutil.ValOrDefault(config.CompletedJobRetentionPeriod, DefaultCompletedJobRetentionPeriod), + DiscardedJobRetentionPeriod: valutil.ValOrDefault(config.DiscardedJobRetentionPeriod, DefaultDiscardedJobRetentionPeriod), + Interval: valutil.ValOrDefault(config.Interval, DefaultJobCleanerInterval), }).mustValidate(), batchSize: DefaultBatchSize, @@ -153,9 +153,9 @@ func (s *JobCleaner) runOnce(ctx context.Context) (*jobCleanerRunOnceResult, err defer cancelFunc() numDeleted, err := s.queries.JobDeleteBefore(ctx, s.dbExecutor, dbsqlc.JobDeleteBeforeParams{ - CancelledFinalizedAtHorizon: time.Now().Add(-s.Config.CancelledJobRetentionTime), - CompletedFinalizedAtHorizon: time.Now().Add(-s.Config.CompletedJobRetentionTime), - DiscardedFinalizedAtHorizon: time.Now().Add(-s.Config.DiscardedJobRetentionTime), + CancelledFinalizedAtHorizon: time.Now().Add(-s.Config.CancelledJobRetentionPeriod), + CompletedFinalizedAtHorizon: time.Now().Add(-s.Config.CompletedJobRetentionPeriod), + DiscardedFinalizedAtHorizon: time.Now().Add(-s.Config.DiscardedJobRetentionPeriod), Max: s.batchSize, }) if err != nil { diff --git a/internal/maintenance/job_cleaner_test.go b/internal/maintenance/job_cleaner_test.go index cde233fc..7d1bc392 100644 --- a/internal/maintenance/job_cleaner_test.go +++ b/internal/maintenance/job_cleaner_test.go @@ -51,19 +51,19 @@ func TestJobCleaner(t *testing.T) { t.Helper() bundle := &testBundle{ - cancelledDeleteHorizon: time.Now().Add(-DefaultCancelledJobRetentionTime), - completedDeleteHorizon: time.Now().Add(-DefaultCompletedJobRetentionTime), - discardedDeleteHorizon: time.Now().Add(-DefaultDiscardedJobRetentionTime), + cancelledDeleteHorizon: time.Now().Add(-DefaultCancelledJobRetentionPeriod), + completedDeleteHorizon: time.Now().Add(-DefaultCompletedJobRetentionPeriod), + discardedDeleteHorizon: time.Now().Add(-DefaultDiscardedJobRetentionPeriod), tx: riverinternaltest.TestTx(ctx, t), } cleaner := NewJobCleaner( riverinternaltest.BaseServiceArchetype(t), &JobCleanerConfig{ - CancelledJobRetentionTime: DefaultCancelledJobRetentionTime, - CompletedJobRetentionTime: DefaultCompletedJobRetentionTime, - DiscardedJobRetentionTime: DefaultDiscardedJobRetentionTime, - Interval: DefaultJobCleanerInterval, + CancelledJobRetentionPeriod: DefaultCancelledJobRetentionPeriod, + CompletedJobRetentionPeriod: DefaultCompletedJobRetentionPeriod, + DiscardedJobRetentionPeriod: DefaultDiscardedJobRetentionPeriod, + Interval: DefaultJobCleanerInterval, }, bundle.tx) cleaner.TestSignals.Init() @@ -77,9 +77,9 @@ func TestJobCleaner(t *testing.T) { cleaner := NewJobCleaner(riverinternaltest.BaseServiceArchetype(t), &JobCleanerConfig{}, nil) - require.Equal(t, cleaner.Config.CancelledJobRetentionTime, DefaultCancelledJobRetentionTime) - require.Equal(t, cleaner.Config.CompletedJobRetentionTime, DefaultCompletedJobRetentionTime) - require.Equal(t, cleaner.Config.DiscardedJobRetentionTime, DefaultDiscardedJobRetentionTime) + require.Equal(t, cleaner.Config.CancelledJobRetentionPeriod, DefaultCancelledJobRetentionPeriod) + require.Equal(t, cleaner.Config.CompletedJobRetentionPeriod, DefaultCompletedJobRetentionPeriod) + require.Equal(t, cleaner.Config.DiscardedJobRetentionPeriod, DefaultDiscardedJobRetentionPeriod) require.Equal(t, cleaner.Config.Interval, DefaultJobCleanerInterval) })