Skip to content

Commit

Permalink
Eager retry for short retry delays (#105)
Browse files Browse the repository at this point in the history
An aspect of River that could currently be considered a bug is that
retry delays below five seconds are effectively ignored. This is because
when a job errors it always transitions to `retryable` state, and needs
to wait for the scheduler to run to transition it back to `available`,
and the scheduler run interval is five seconds.

This is in practice a problem because it means that the first retry in a
retry policy isn't actually one second as claimed in docs, etc., but
actually ~5 seconds, and it won't be obvious why.

Here, implement an "eager" retry system such that if we detect that the
retry delay is smaller than the scheduler's run interval, we place the
job immediately into an `available` state, allowing it to be worked
sooner.

To facilitate this we add a predicate on the "get available" query that
checks for `scheduled_at <= now()` so that errored jobs with short
retries can be `available` for a short time without being worked
immediately. Most of the time this should have no effect because this
really only applies to the first retry in any failure (the second is at
16 seconds, which is well above the scheduler's run interval).
  • Loading branch information
brandur authored Dec 13, 2023
1 parent b037935 commit 1468298
Show file tree
Hide file tree
Showing 13 changed files with 228 additions and 38 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Changed

- Errored jobs that have a very short duration before their next retry (<5 seconds) are set to `available` immediately instead of being made `scheduled` and having to wait for the scheduler to make a pass to make them workable. [PR #105](https://github.com/riverqueue/river/pull/105).

## [0.0.12] - 2023-12-02

### Added
Expand Down
10 changes: 9 additions & 1 deletion client.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,10 @@ type Config struct {
// Test-only property that allows sleep statements to be disable. Only
// functions in cases where the CancellableSleep helper is used to sleep.
disableSleep bool

// Scheduler run interval. Shared between the scheduler and producer/job
// executors, but not currently exposed for configuration.
schedulerInterval time.Duration
}

func (c *Config) validate() error {
Expand Down Expand Up @@ -388,6 +392,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
RetryPolicy: retryPolicy,
Workers: config.Workers,
disableSleep: config.disableSleep,
schedulerInterval: valutil.ValOrDefault(config.schedulerInterval, maintenance.SchedulerIntervalDefault),
}

if err := config.validate(); err != nil {
Expand Down Expand Up @@ -524,7 +529,9 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
}

{
scheduler := maintenance.NewScheduler(archetype, &maintenance.SchedulerConfig{}, driver.GetDBPool())
scheduler := maintenance.NewScheduler(archetype, &maintenance.SchedulerConfig{
Interval: config.schedulerInterval,
}, driver.GetDBPool())
maintenanceServices = append(maintenanceServices, scheduler)
client.testSignals.scheduler = &scheduler.TestSignals
}
Expand Down Expand Up @@ -898,6 +905,7 @@ func (c *Client[TTx]) provisionProducers() error {
Notifier: c.notifier,
QueueName: queue,
RetryPolicy: c.config.RetryPolicy,
SchedulerInterval: c.config.schedulerInterval,
WorkerName: c.id,
Workers: c.config.Workers,
}
Expand Down
1 change: 1 addition & 0 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ func newTestConfig(t *testing.T, callback callbackFunc) *Config {
Queues: map[string]QueueConfig{QueueDefault: {MaxWorkers: 50}},
Workers: workers,
disableSleep: true,
schedulerInterval: riverinternaltest.SchedulerShortInterval,
}
}

Expand Down
2 changes: 1 addition & 1 deletion example_subscription_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func Example_subscription() {

// Output:
// Got job with state: completed
// Got job with state: retryable
// Got job with state: available
// Got job with state: cancelled
// Client stopped
// Channel is closed
Expand Down
6 changes: 5 additions & 1 deletion internal/dbadapter/db_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,11 @@ func JobSetStateDiscarded(id int64, finalizedAt time.Time, errData []byte) *JobS
return &JobSetStateIfRunningParams{ID: id, errData: errData, finalizedAt: &finalizedAt, state: dbsqlc.JobStateDiscarded}
}

func JobSetStateErrored(id int64, scheduledAt time.Time, errData []byte) *JobSetStateIfRunningParams {
func JobSetStateErrorAvailable(id int64, scheduledAt time.Time, errData []byte) *JobSetStateIfRunningParams {
return &JobSetStateIfRunningParams{ID: id, errData: errData, scheduledAt: &scheduledAt, state: dbsqlc.JobStateAvailable}
}

func JobSetStateErrorRetryable(id int64, scheduledAt time.Time, errData []byte) *JobSetStateIfRunningParams {
return &JobSetStateIfRunningParams{ID: id, errData: errData, scheduledAt: &scheduledAt, state: dbsqlc.JobStateRetryable}
}

Expand Down
Loading

0 comments on commit 1468298

Please sign in to comment.