Skip to content

Commit

Permalink
Rename all *RetentionTime vars/constants to *RetentionPeriod (#10)
Browse files Browse the repository at this point in the history
Our various maintenance services take a series of durations that are
currently name `*RetentionTime` like `CompletedJobRetentionTime`. Given
that these are durations and not times, more apt naming for them is a
suffix of `*RetentionPeriod`, which is the change made here.
  • Loading branch information
brandur authored Nov 11, 2023
1 parent 7e836f2 commit c7bf24f
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 92 deletions.
64 changes: 32 additions & 32 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,23 +63,23 @@ type Config struct {
// internally conflicting River-generated keys more likely.
AdvisoryLockPrefix int32

// CancelledJobRetentionTime is the amount of time to keep cancelled jobs
// CancelledJobRetentionPeriod is the amount of time to keep cancelled jobs
// around before they're removed permanently.
//
// Defaults to 24 hours.
CancelledJobRetentionTime time.Duration
CancelledJobRetentionPeriod time.Duration

// CompletedJobRetentionTime is the amount of time to keep completed jobs
// CompletedJobRetentionPeriod is the amount of time to keep completed jobs
// around before they're removed permanently.
//
// Defaults to 24 hours.
CompletedJobRetentionTime time.Duration
CompletedJobRetentionPeriod time.Duration

// DiscardedJobRetentionTime is the amount of time to keep cancelled jobs
// DiscardedJobRetentionPeriod is the amount of time to keep cancelled jobs
// around before they're removed permanently.
//
// Defaults to 7 days.
DiscardedJobRetentionTime time.Duration
DiscardedJobRetentionPeriod time.Duration

// ErrorHandler can be configured to be invoked in case of an error or panic
// occurring in a job. This is often useful for logging and exception
Expand Down Expand Up @@ -173,14 +173,14 @@ type Config struct {
}

func (c *Config) validate() error {
if c.CancelledJobRetentionTime < 0 {
return fmt.Errorf("CancelledJobRetentionTime time cannot be less than zero")
if c.CancelledJobRetentionPeriod < 0 {
return fmt.Errorf("CancelledJobRetentionPeriod time cannot be less than zero")
}
if c.CompletedJobRetentionTime < 0 {
return fmt.Errorf("CompletedJobRetentionTime cannot be less than zero")
if c.CompletedJobRetentionPeriod < 0 {
return fmt.Errorf("CompletedJobRetentionPeriod cannot be less than zero")
}
if c.DiscardedJobRetentionTime < 0 {
return fmt.Errorf("DiscardedJobRetentionTime cannot be less than zero")
if c.DiscardedJobRetentionPeriod < 0 {
return fmt.Errorf("DiscardedJobRetentionPeriod cannot be less than zero")
}
if c.FetchCooldown < MinFetchCooldown {
return fmt.Errorf("FetchCooldown must be at least %s", MinFetchCooldown)
Expand Down Expand Up @@ -377,23 +377,23 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
// original object, so everything that we care about must be initialized
// here, even if it's only carrying over the original value.
config = &Config{
AdvisoryLockPrefix: config.AdvisoryLockPrefix,
CancelledJobRetentionTime: valutil.ValOrDefault(config.CancelledJobRetentionTime, maintenance.DefaultCancelledJobRetentionTime),
CompletedJobRetentionTime: valutil.ValOrDefault(config.CompletedJobRetentionTime, maintenance.DefaultCompletedJobRetentionTime),
DiscardedJobRetentionTime: valutil.ValOrDefault(config.DiscardedJobRetentionTime, maintenance.DefaultDiscardedJobRetentionTime),
ErrorHandler: config.ErrorHandler,
FetchCooldown: valutil.ValOrDefault(config.FetchCooldown, DefaultFetchCooldown),
FetchPollInterval: valutil.ValOrDefault(config.FetchPollInterval, DefaultFetchPollInterval),
JobTimeout: valutil.ValOrDefault(config.JobTimeout, DefaultJobTimeout),
Logger: logger,
PeriodicJobs: config.PeriodicJobs,
Queues: config.Queues,
ReindexerSchedule: config.ReindexerSchedule,
RescueStuckJobsAfter: valutil.ValOrDefault(config.RescueStuckJobsAfter, rescueAfter),
RetryPolicy: retryPolicy,
Schema: valutil.ValOrDefault(config.Schema, DefaultSchema),
Workers: config.Workers,
disableSleep: config.disableSleep,
AdvisoryLockPrefix: config.AdvisoryLockPrefix,
CancelledJobRetentionPeriod: valutil.ValOrDefault(config.CancelledJobRetentionPeriod, maintenance.DefaultCancelledJobRetentionPeriod),
CompletedJobRetentionPeriod: valutil.ValOrDefault(config.CompletedJobRetentionPeriod, maintenance.DefaultCompletedJobRetentionPeriod),
DiscardedJobRetentionPeriod: valutil.ValOrDefault(config.DiscardedJobRetentionPeriod, maintenance.DefaultDiscardedJobRetentionPeriod),
ErrorHandler: config.ErrorHandler,
FetchCooldown: valutil.ValOrDefault(config.FetchCooldown, DefaultFetchCooldown),
FetchPollInterval: valutil.ValOrDefault(config.FetchPollInterval, DefaultFetchPollInterval),
JobTimeout: valutil.ValOrDefault(config.JobTimeout, DefaultJobTimeout),
Logger: logger,
PeriodicJobs: config.PeriodicJobs,
Queues: config.Queues,
ReindexerSchedule: config.ReindexerSchedule,
RescueStuckJobsAfter: valutil.ValOrDefault(config.RescueStuckJobsAfter, rescueAfter),
RetryPolicy: retryPolicy,
Schema: valutil.ValOrDefault(config.Schema, DefaultSchema),
Workers: config.Workers,
disableSleep: config.disableSleep,
}

if err := config.validate(); err != nil {
Expand Down Expand Up @@ -464,9 +464,9 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client

{
jobCleaner := maintenance.NewJobCleaner(archetype, &maintenance.JobCleanerConfig{
CancelledJobRetentionTime: config.CancelledJobRetentionTime,
CompletedJobRetentionTime: config.CompletedJobRetentionTime,
DiscardedJobRetentionTime: config.DiscardedJobRetentionTime,
CancelledJobRetentionPeriod: config.CancelledJobRetentionPeriod,
CompletedJobRetentionPeriod: config.CompletedJobRetentionPeriod,
DiscardedJobRetentionPeriod: config.DiscardedJobRetentionPeriod,
}, driver.GetDBPool())
maintenanceServices = append(maintenanceServices, jobCleaner)
client.testSignals.jobCleaner = &jobCleaner.TestSignals
Expand Down
54 changes: 27 additions & 27 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1000,13 +1000,13 @@ func Test_Client_Maintenance(t *testing.T) {
t.Parallel()

config := newTestConfig(t, nil)
config.CancelledJobRetentionTime = 1 * time.Hour
config.CompletedJobRetentionTime = 1 * time.Hour
config.DiscardedJobRetentionTime = 1 * time.Hour
config.CancelledJobRetentionPeriod = 1 * time.Hour
config.CompletedJobRetentionPeriod = 1 * time.Hour
config.DiscardedJobRetentionPeriod = 1 * time.Hour

client := newTestClient(ctx, t, config)

deleteHorizon := time.Now().Add(-config.CompletedJobRetentionTime)
deleteHorizon := time.Now().Add(-config.CompletedJobRetentionPeriod)

// Take care to insert jobs before starting the client because otherwise
// there's a race condition where the cleaner could run its initial
Expand Down Expand Up @@ -1988,9 +1988,9 @@ func Test_NewClient_Defaults(t *testing.T) {
require.Zero(t, client.adapter.(*dbadapter.StandardAdapter).Config.AdvisoryLockPrefix) //nolint:forcetypeassert

jobCleaner := maintenance.GetService[*maintenance.JobCleaner](client.queueMaintainer)
require.Equal(t, maintenance.DefaultCancelledJobRetentionTime, jobCleaner.Config.CancelledJobRetentionTime)
require.Equal(t, maintenance.DefaultCompletedJobRetentionTime, jobCleaner.Config.CompletedJobRetentionTime)
require.Equal(t, maintenance.DefaultDiscardedJobRetentionTime, jobCleaner.Config.DiscardedJobRetentionTime)
require.Equal(t, maintenance.DefaultCancelledJobRetentionPeriod, jobCleaner.Config.CancelledJobRetentionPeriod)
require.Equal(t, maintenance.DefaultCompletedJobRetentionPeriod, jobCleaner.Config.CompletedJobRetentionPeriod)
require.Equal(t, maintenance.DefaultDiscardedJobRetentionPeriod, jobCleaner.Config.DiscardedJobRetentionPeriod)

require.Nil(t, client.config.ErrorHandler)
require.Equal(t, DefaultFetchCooldown, client.config.FetchCooldown)
Expand Down Expand Up @@ -2018,29 +2018,29 @@ func Test_NewClient_Overrides(t *testing.T) {
retryPolicy := &DefaultClientRetryPolicy{}

client, err := NewClient(riverpgxv5.New(dbPool), &Config{
AdvisoryLockPrefix: 123_456,
CancelledJobRetentionTime: 1 * time.Hour,
CompletedJobRetentionTime: 2 * time.Hour,
DiscardedJobRetentionTime: 3 * time.Hour,
ErrorHandler: errorHandler,
FetchCooldown: 123 * time.Millisecond,
FetchPollInterval: 124 * time.Millisecond,
JobTimeout: 125 * time.Millisecond,
Logger: logger,
Queues: map[string]QueueConfig{DefaultQueue: {MaxWorkers: 1}},
RetryPolicy: retryPolicy,
Schema: "custom_schema",
Workers: workers,
disableSleep: true,
AdvisoryLockPrefix: 123_456,
CancelledJobRetentionPeriod: 1 * time.Hour,
CompletedJobRetentionPeriod: 2 * time.Hour,
DiscardedJobRetentionPeriod: 3 * time.Hour,
ErrorHandler: errorHandler,
FetchCooldown: 123 * time.Millisecond,
FetchPollInterval: 124 * time.Millisecond,
JobTimeout: 125 * time.Millisecond,
Logger: logger,
Queues: map[string]QueueConfig{DefaultQueue: {MaxWorkers: 1}},
RetryPolicy: retryPolicy,
Schema: "custom_schema",
Workers: workers,
disableSleep: true,
})
require.NoError(t, err)

require.Equal(t, int32(123_456), client.adapter.(*dbadapter.StandardAdapter).Config.AdvisoryLockPrefix) //nolint:forcetypeassert

jobCleaner := maintenance.GetService[*maintenance.JobCleaner](client.queueMaintainer)
require.Equal(t, 1*time.Hour, jobCleaner.Config.CancelledJobRetentionTime)
require.Equal(t, 2*time.Hour, jobCleaner.Config.CompletedJobRetentionTime)
require.Equal(t, 3*time.Hour, jobCleaner.Config.DiscardedJobRetentionTime)
require.Equal(t, 1*time.Hour, jobCleaner.Config.CancelledJobRetentionPeriod)
require.Equal(t, 2*time.Hour, jobCleaner.Config.CompletedJobRetentionPeriod)
require.Equal(t, 3*time.Hour, jobCleaner.Config.DiscardedJobRetentionPeriod)

require.Equal(t, errorHandler, client.config.ErrorHandler)
require.Equal(t, 123*time.Millisecond, client.config.FetchCooldown)
Expand Down Expand Up @@ -2081,9 +2081,9 @@ func Test_NewClient_Validations(t *testing.T) {
validateResult func(*testing.T, *Client[pgx.Tx])
}{
{
name: "CompletedJobRetentionTime cannot be less than zero",
configFunc: func(config *Config) { config.CompletedJobRetentionTime = -1 * time.Second },
wantErr: errors.New("CompletedJobRetentionTime cannot be less than zero"),
name: "CompletedJobRetentionPeriod cannot be less than zero",
configFunc: func(config *Config) { config.CompletedJobRetentionPeriod = -1 * time.Second },
wantErr: errors.New("CompletedJobRetentionPeriod cannot be less than zero"),
},
{
name: "FetchCooldown cannot be less than MinFetchCooldown",
Expand Down
46 changes: 23 additions & 23 deletions internal/maintenance/job_cleaner.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ import (
)

const (
DefaultCancelledJobRetentionTime = 24 * time.Hour
DefaultCompletedJobRetentionTime = 24 * time.Hour
DefaultDiscardedJobRetentionTime = 7 * 24 * time.Hour
DefaultJobCleanerInterval = 30 * time.Second
DefaultCancelledJobRetentionPeriod = 24 * time.Hour
DefaultCompletedJobRetentionPeriod = 24 * time.Hour
DefaultDiscardedJobRetentionPeriod = 7 * 24 * time.Hour
DefaultJobCleanerInterval = 30 * time.Second
)

// Test-only properties.
Expand All @@ -33,31 +33,31 @@ func (ts *JobCleanerTestSignals) Init() {
}

type JobCleanerConfig struct {
// CancelledJobRetentionTime is the amount of time to keep cancelled jobs
// CancelledJobRetentionPeriod is the amount of time to keep cancelled jobs
// around before they're removed permanently.
CancelledJobRetentionTime time.Duration
CancelledJobRetentionPeriod time.Duration

// CompletedJobRetentionTime is the amount of time to keep completed jobs
// CompletedJobRetentionPeriod is the amount of time to keep completed jobs
// around before they're removed permanently.
CompletedJobRetentionTime time.Duration
CompletedJobRetentionPeriod time.Duration

// DiscardedJobRetentionTime is the amount of time to keep cancelled jobs
// DiscardedJobRetentionPeriod is the amount of time to keep cancelled jobs
// around before they're removed permanently.
DiscardedJobRetentionTime time.Duration
DiscardedJobRetentionPeriod time.Duration

// Interval is the amount of time to wait between runs of the cleaner.
Interval time.Duration
}

func (c *JobCleanerConfig) mustValidate() *JobCleanerConfig {
if c.CancelledJobRetentionTime <= 0 {
panic("JobCleanerConfig.CancelledJobRetentionTime must be above zero")
if c.CancelledJobRetentionPeriod <= 0 {
panic("JobCleanerConfig.CancelledJobRetentionPeriod must be above zero")
}
if c.CompletedJobRetentionTime <= 0 {
panic("JobCleanerConfig.CompletedJobRetentionTime must be above zero")
if c.CompletedJobRetentionPeriod <= 0 {
panic("JobCleanerConfig.CompletedJobRetentionPeriod must be above zero")
}
if c.DiscardedJobRetentionTime <= 0 {
panic("JobCleanerConfig.DiscardedJobRetentionTime must be above zero")
if c.DiscardedJobRetentionPeriod <= 0 {
panic("JobCleanerConfig.DiscardedJobRetentionPeriod must be above zero")
}
if c.Interval <= 0 {
panic("JobCleanerConfig.Interval must be above zero")
Expand All @@ -84,10 +84,10 @@ type JobCleaner struct {
func NewJobCleaner(archetype *baseservice.Archetype, config *JobCleanerConfig, executor dbutil.Executor) *JobCleaner {
return baseservice.Init(archetype, &JobCleaner{
Config: (&JobCleanerConfig{
CancelledJobRetentionTime: valutil.ValOrDefault(config.CancelledJobRetentionTime, DefaultCancelledJobRetentionTime),
CompletedJobRetentionTime: valutil.ValOrDefault(config.CompletedJobRetentionTime, DefaultCompletedJobRetentionTime),
DiscardedJobRetentionTime: valutil.ValOrDefault(config.DiscardedJobRetentionTime, DefaultDiscardedJobRetentionTime),
Interval: valutil.ValOrDefault(config.Interval, DefaultJobCleanerInterval),
CancelledJobRetentionPeriod: valutil.ValOrDefault(config.CancelledJobRetentionPeriod, DefaultCancelledJobRetentionPeriod),
CompletedJobRetentionPeriod: valutil.ValOrDefault(config.CompletedJobRetentionPeriod, DefaultCompletedJobRetentionPeriod),
DiscardedJobRetentionPeriod: valutil.ValOrDefault(config.DiscardedJobRetentionPeriod, DefaultDiscardedJobRetentionPeriod),
Interval: valutil.ValOrDefault(config.Interval, DefaultJobCleanerInterval),
}).mustValidate(),

batchSize: DefaultBatchSize,
Expand Down Expand Up @@ -153,9 +153,9 @@ func (s *JobCleaner) runOnce(ctx context.Context) (*jobCleanerRunOnceResult, err
defer cancelFunc()

numDeleted, err := s.queries.JobDeleteBefore(ctx, s.dbExecutor, dbsqlc.JobDeleteBeforeParams{
CancelledFinalizedAtHorizon: time.Now().Add(-s.Config.CancelledJobRetentionTime),
CompletedFinalizedAtHorizon: time.Now().Add(-s.Config.CompletedJobRetentionTime),
DiscardedFinalizedAtHorizon: time.Now().Add(-s.Config.DiscardedJobRetentionTime),
CancelledFinalizedAtHorizon: time.Now().Add(-s.Config.CancelledJobRetentionPeriod),
CompletedFinalizedAtHorizon: time.Now().Add(-s.Config.CompletedJobRetentionPeriod),
DiscardedFinalizedAtHorizon: time.Now().Add(-s.Config.DiscardedJobRetentionPeriod),
Max: s.batchSize,
})
if err != nil {
Expand Down
20 changes: 10 additions & 10 deletions internal/maintenance/job_cleaner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,19 +51,19 @@ func TestJobCleaner(t *testing.T) {
t.Helper()

bundle := &testBundle{
cancelledDeleteHorizon: time.Now().Add(-DefaultCancelledJobRetentionTime),
completedDeleteHorizon: time.Now().Add(-DefaultCompletedJobRetentionTime),
discardedDeleteHorizon: time.Now().Add(-DefaultDiscardedJobRetentionTime),
cancelledDeleteHorizon: time.Now().Add(-DefaultCancelledJobRetentionPeriod),
completedDeleteHorizon: time.Now().Add(-DefaultCompletedJobRetentionPeriod),
discardedDeleteHorizon: time.Now().Add(-DefaultDiscardedJobRetentionPeriod),
tx: riverinternaltest.TestTx(ctx, t),
}

cleaner := NewJobCleaner(
riverinternaltest.BaseServiceArchetype(t),
&JobCleanerConfig{
CancelledJobRetentionTime: DefaultCancelledJobRetentionTime,
CompletedJobRetentionTime: DefaultCompletedJobRetentionTime,
DiscardedJobRetentionTime: DefaultDiscardedJobRetentionTime,
Interval: DefaultJobCleanerInterval,
CancelledJobRetentionPeriod: DefaultCancelledJobRetentionPeriod,
CompletedJobRetentionPeriod: DefaultCompletedJobRetentionPeriod,
DiscardedJobRetentionPeriod: DefaultDiscardedJobRetentionPeriod,
Interval: DefaultJobCleanerInterval,
},
bundle.tx)
cleaner.TestSignals.Init()
Expand All @@ -77,9 +77,9 @@ func TestJobCleaner(t *testing.T) {

cleaner := NewJobCleaner(riverinternaltest.BaseServiceArchetype(t), &JobCleanerConfig{}, nil)

require.Equal(t, cleaner.Config.CancelledJobRetentionTime, DefaultCancelledJobRetentionTime)
require.Equal(t, cleaner.Config.CompletedJobRetentionTime, DefaultCompletedJobRetentionTime)
require.Equal(t, cleaner.Config.DiscardedJobRetentionTime, DefaultDiscardedJobRetentionTime)
require.Equal(t, cleaner.Config.CancelledJobRetentionPeriod, DefaultCancelledJobRetentionPeriod)
require.Equal(t, cleaner.Config.CompletedJobRetentionPeriod, DefaultCompletedJobRetentionPeriod)
require.Equal(t, cleaner.Config.DiscardedJobRetentionPeriod, DefaultDiscardedJobRetentionPeriod)
require.Equal(t, cleaner.Config.Interval, DefaultJobCleanerInterval)
})

Expand Down

0 comments on commit c7bf24f

Please sign in to comment.