Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consider per-worker timeout overrides when rescuing jobs #350

Merged
merged 1 commit into from
May 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Fixed

- River now considers per-worker timeout overrides when rescuing jobs so that jobs with a long custom timeout won't be rescued prematurely. [PR #350](https://github.com/riverqueue/river/pull/350).

## [0.6.0] - 2024-05-08

### Added
Expand Down
84 changes: 57 additions & 27 deletions internal/maintenance/job_rescuer.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/riverqueue/river/internal/baseservice"
"github.com/riverqueue/river/internal/maintenance/startstop"
"github.com/riverqueue/river/internal/rivercommon"
"github.com/riverqueue/river/internal/util/ptrutil"
"github.com/riverqueue/river/internal/util/timeutil"
"github.com/riverqueue/river/internal/util/valutil"
"github.com/riverqueue/river/internal/workunit"
Expand Down Expand Up @@ -164,22 +165,20 @@ func (s *JobRescuer) runOnce(ctx context.Context) (*rescuerRunOnceResult, error)
now := time.Now().UTC()

rescueManyParams := riverdriver.JobRescueManyParams{
ID: make([]int64, len(stuckJobs)),
Error: make([][]byte, len(stuckJobs)),
FinalizedAt: make([]time.Time, len(stuckJobs)),
ScheduledAt: make([]time.Time, len(stuckJobs)),
State: make([]string, len(stuckJobs)),
ID: make([]int64, 0, len(stuckJobs)),
Error: make([][]byte, 0, len(stuckJobs)),
FinalizedAt: make([]time.Time, 0, len(stuckJobs)),
ScheduledAt: make([]time.Time, 0, len(stuckJobs)),
State: make([]string, 0, len(stuckJobs)),
}

for i, job := range stuckJobs {
rescueManyParams.ID[i] = job.ID

for _, job := range stuckJobs {
var metadata metadataWithCancelAttemptedAt
if err := json.Unmarshal(job.Metadata, &metadata); err != nil {
return nil, fmt.Errorf("error unmarshaling job metadata: %w", err)
}

rescueManyParams.Error[i], err = json.Marshal(rivertype.AttemptError{
errorData, err := json.Marshal(rivertype.AttemptError{
At: now,
Attempt: max(job.Attempt, 0),
Error: "Stuck job rescued by Rescuer",
Expand All @@ -189,29 +188,41 @@ func (s *JobRescuer) runOnce(ctx context.Context) (*rescuerRunOnceResult, error)
return nil, fmt.Errorf("error marshaling error JSON: %w", err)
}

addRescueParam := func(state rivertype.JobState, finalizedAt *time.Time, scheduledAt time.Time) {
rescueManyParams.ID = append(rescueManyParams.ID, job.ID)
rescueManyParams.Error = append(rescueManyParams.Error, errorData)
rescueManyParams.FinalizedAt = append(rescueManyParams.FinalizedAt, ptrutil.ValOrDefault(finalizedAt, time.Time{}))
rescueManyParams.ScheduledAt = append(rescueManyParams.ScheduledAt, scheduledAt)
rescueManyParams.State = append(rescueManyParams.State, string(state))
}

if !metadata.CancelAttemptedAt.IsZero() {
res.NumJobsCancelled++
rescueManyParams.FinalizedAt[i] = now
rescueManyParams.ScheduledAt[i] = job.ScheduledAt // reuse previous value
rescueManyParams.State[i] = string(rivertype.JobStateCancelled)
addRescueParam(rivertype.JobStateCancelled, &now, job.ScheduledAt) // reused previous scheduled value
continue
}
shouldRetry, retryAt := s.makeRetryDecision(ctx, job)
if shouldRetry {
res.NumJobsRetried++
rescueManyParams.ScheduledAt[i] = retryAt
rescueManyParams.State[i] = string(rivertype.JobStateRetryable)
} else {

retryDecision, retryAt := s.makeRetryDecision(ctx, job, now)

switch retryDecision {
case jobRetryDecisionDiscard:
res.NumJobsDiscarded++
rescueManyParams.FinalizedAt[i] = now
rescueManyParams.ScheduledAt[i] = job.ScheduledAt // reuse previous value
rescueManyParams.State[i] = string(rivertype.JobStateDiscarded)
addRescueParam(rivertype.JobStateDiscarded, &now, job.ScheduledAt) // reused previous scheduled value

case jobRetryDecisionIgnore:
// job not timed out yet due to kind-specific timeout value; ignore

case jobRetryDecisionRetry:
res.NumJobsRetried++
addRescueParam(rivertype.JobStateRetryable, nil, retryAt)
}
}

_, err = s.exec.JobRescueMany(ctx, &rescueManyParams)
if err != nil {
return nil, fmt.Errorf("error rescuing stuck jobs: %w", err)
if len(rescueManyParams.ID) > 0 {
_, err = s.exec.JobRescueMany(ctx, &rescueManyParams)
if err != nil {
return nil, fmt.Errorf("error rescuing stuck jobs: %w", err)
}
}

s.TestSignals.UpdatedBatch.Signal(struct{}{})
Expand Down Expand Up @@ -245,14 +256,24 @@ func (s *JobRescuer) getStuckJobs(ctx context.Context) ([]*rivertype.JobRow, err
})
}

// jobRetryDecision is a signal from makeRetryDecision as to what to do with a
// particular job that appears to be eligible for rescue.
type jobRetryDecision int

const (
jobRetryDecisionDiscard jobRetryDecision = iota // discard the job
jobRetryDecisionIgnore // don't retry or discard the job
jobRetryDecisionRetry // retry the job
)

// makeRetryDecision decides whether or not a rescued job should be retried, and if so,
// when.
func (s *JobRescuer) makeRetryDecision(ctx context.Context, job *rivertype.JobRow) (bool, time.Time) {
func (s *JobRescuer) makeRetryDecision(ctx context.Context, job *rivertype.JobRow, now time.Time) (jobRetryDecision, time.Time) {
workUnitFactory := s.Config.WorkUnitFactoryFunc(job.Kind)
if workUnitFactory == nil {
s.Logger.ErrorContext(ctx, s.Name+": Attempted to rescue unhandled job kind, discarding",
slog.String("job_kind", job.Kind), slog.Int64("job_id", job.ID))
return false, time.Time{}
return jobRetryDecisionDiscard, time.Time{}
}

workUnit := workUnitFactory.MakeUnit(job)
Expand All @@ -261,9 +282,18 @@ func (s *JobRescuer) makeRetryDecision(ctx context.Context, job *rivertype.JobRo
slog.String("job_kind", job.Kind), slog.Int64("job_id", job.ID))
}

if workUnit.Timeout() != 0 && now.Sub(*job.AttemptedAt) < workUnit.Timeout() {
return jobRetryDecisionIgnore, time.Time{}
}

nextRetry := workUnit.NextRetry()
if nextRetry.IsZero() {
nextRetry = s.Config.ClientRetryPolicy.NextRetry(job)
}
return job.Attempt < max(job.MaxAttempts, 0), nextRetry

if job.Attempt < max(job.MaxAttempts, 0) {
return jobRetryDecisionRetry, nextRetry
}

return jobRetryDecisionDiscard, time.Time{}
}
78 changes: 51 additions & 27 deletions internal/maintenance/job_rescuer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,22 @@ import (
// callbackWorkUnitFactory wraps a Worker to implement workUnitFactory.
type callbackWorkUnitFactory struct {
Callback func(ctx context.Context, jobRow *rivertype.JobRow) error
timeout time.Duration // defaults to 0, which signals default timeout
}

func (w *callbackWorkUnitFactory) MakeUnit(jobRow *rivertype.JobRow) workunit.WorkUnit {
return &callbackWorkUnit{callback: w.Callback, jobRow: jobRow}
return &callbackWorkUnit{callback: w.Callback, jobRow: jobRow, timeout: w.timeout}
}

// callbackWorkUnit implements workUnit for a job and Worker.
type callbackWorkUnit struct {
callback func(ctx context.Context, jobRow *rivertype.JobRow) error
jobRow *rivertype.JobRow
timeout time.Duration // defaults to 0, which signals default timeout
}

func (w *callbackWorkUnit) NextRetry() time.Time { return time.Now().Add(30 * time.Second) }
func (w *callbackWorkUnit) Timeout() time.Duration { return 0 }
func (w *callbackWorkUnit) Timeout() time.Duration { return w.timeout }
func (w *callbackWorkUnit) Work(ctx context.Context) error { return w.callback(ctx, w.jobRow) }
func (w *callbackWorkUnit) UnmarshalJob() error { return nil }

Expand All @@ -51,10 +53,13 @@ func (p *SimpleClientRetryPolicy) NextRetry(job *rivertype.JobRow) time.Time {
func TestJobRescuer(t *testing.T) {
t.Parallel()

const rescuerJobKind = "rescuer"

ctx := context.Background()

const (
rescuerJobKind = "rescuer"
rescuerJobKindLongTimeout = "rescuer_long_timeout"
)

type testBundle struct {
exec riverdriver.Executor
rescueHorizon time.Time
Expand All @@ -76,8 +81,13 @@ func TestJobRescuer(t *testing.T) {
Interval: JobRescuerIntervalDefault,
RescueAfter: JobRescuerRescueAfterDefault,
WorkUnitFactoryFunc: func(kind string) workunit.WorkUnitFactory {
if kind == rescuerJobKind {
return &callbackWorkUnitFactory{Callback: func(ctx context.Context, jobRow *rivertype.JobRow) error { return nil }}
emptyCallback := func(ctx context.Context, jobRow *rivertype.JobRow) error { return nil }

switch kind {
case rescuerJobKind:
return &callbackWorkUnitFactory{Callback: emptyCallback}
case rescuerJobKindLongTimeout:
return &callbackWorkUnitFactory{Callback: emptyCallback, timeout: JobRescuerRescueAfterDefault + 5*time.Minute}
}
panic("unhandled kind: " + kind)
},
Expand Down Expand Up @@ -135,11 +145,18 @@ func TestJobRescuer(t *testing.T) {
stuckToCancelJob1 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{Kind: ptrutil.Ptr(rescuerJobKind), State: ptrutil.Ptr(rivertype.JobStateRunning), AttemptedAt: ptrutil.Ptr(bundle.rescueHorizon.Add(-1 * time.Hour)), Metadata: []byte(fmt.Sprintf(`{"cancel_attempted_at": %q}`, cancelTime)), MaxAttempts: ptrutil.Ptr(5)})
stuckToCancelJob2 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{Kind: ptrutil.Ptr(rescuerJobKind), State: ptrutil.Ptr(rivertype.JobStateRunning), AttemptedAt: ptrutil.Ptr(bundle.rescueHorizon.Add(1 * time.Minute)), Metadata: []byte(fmt.Sprintf(`{"cancel_attempted_at": %q}`, cancelTime)), MaxAttempts: ptrutil.Ptr(5)}) // won't be rescued

// these aren't touched:
// these aren't touched because they're in ineligible states
notRunningJob1 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{Kind: ptrutil.Ptr(rescuerJobKind), FinalizedAt: ptrutil.Ptr(bundle.rescueHorizon.Add(-1 * time.Hour)), State: ptrutil.Ptr(rivertype.JobStateCompleted), AttemptedAt: ptrutil.Ptr(bundle.rescueHorizon.Add(-1 * time.Hour)), MaxAttempts: ptrutil.Ptr(5)})
notRunningJob2 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{Kind: ptrutil.Ptr(rescuerJobKind), FinalizedAt: ptrutil.Ptr(bundle.rescueHorizon.Add(-1 * time.Hour)), State: ptrutil.Ptr(rivertype.JobStateDiscarded), AttemptedAt: ptrutil.Ptr(bundle.rescueHorizon.Add(-1 * time.Hour)), MaxAttempts: ptrutil.Ptr(5)})
notRunningJob3 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{Kind: ptrutil.Ptr(rescuerJobKind), FinalizedAt: ptrutil.Ptr(bundle.rescueHorizon.Add(-1 * time.Hour)), State: ptrutil.Ptr(rivertype.JobStateCancelled), AttemptedAt: ptrutil.Ptr(bundle.rescueHorizon.Add(-1 * time.Hour)), MaxAttempts: ptrutil.Ptr(5)})

// Jobs with worker-specific long timeouts. The first isn't rescued
// because the difference between its `attempted_at` and now is still
// within the timeout threshold. The second _is_ rescued because it
// started earlier and even with the longer timeout, has still timed out.
longTimeOutJob1 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{Kind: ptrutil.Ptr(rescuerJobKindLongTimeout), State: ptrutil.Ptr(rivertype.JobStateRunning), AttemptedAt: ptrutil.Ptr(bundle.rescueHorizon.Add(-1 * time.Minute)), MaxAttempts: ptrutil.Ptr(5)})
longTimeOutJob2 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{Kind: ptrutil.Ptr(rescuerJobKindLongTimeout), State: ptrutil.Ptr(rivertype.JobStateRunning), AttemptedAt: ptrutil.Ptr(bundle.rescueHorizon.Add(-6 * time.Minute)), MaxAttempts: ptrutil.Ptr(5)})

require.NoError(cleaner.Start(ctx))

cleaner.TestSignals.FetchedBatch.WaitOrTimeout()
Expand All @@ -158,37 +175,44 @@ func TestJobRescuer(t *testing.T) {
require.NoError(err)
require.Equal(stuckToRetryJob3.State, job3After.State) // not rescued

discard1After, err := bundle.exec.JobGetByID(ctx, stuckToDiscardJob1.ID)
discardJob1After, err := bundle.exec.JobGetByID(ctx, stuckToDiscardJob1.ID)
require.NoError(err)
require.Equal(rivertype.JobStateDiscarded, discard1After.State)
require.WithinDuration(time.Now(), *discard1After.FinalizedAt, 5*time.Second)
require.Len(discard1After.Errors, 1)
require.Equal(rivertype.JobStateDiscarded, discardJob1After.State)
require.WithinDuration(time.Now(), *discardJob1After.FinalizedAt, 5*time.Second)
require.Len(discardJob1After.Errors, 1)

discard2After, err := bundle.exec.JobGetByID(ctx, stuckToDiscardJob2.ID)
discardJob2After, err := bundle.exec.JobGetByID(ctx, stuckToDiscardJob2.ID)
require.NoError(err)
require.Equal(rivertype.JobStateRunning, discard2After.State)
require.Nil(discard2After.FinalizedAt)
require.Equal(rivertype.JobStateRunning, discardJob2After.State)
require.Nil(discardJob2After.FinalizedAt)

cancel1After, err := bundle.exec.JobGetByID(ctx, stuckToCancelJob1.ID)
cancelJob1After, err := bundle.exec.JobGetByID(ctx, stuckToCancelJob1.ID)
require.NoError(err)
require.Equal(rivertype.JobStateCancelled, cancel1After.State)
require.WithinDuration(time.Now(), *cancel1After.FinalizedAt, 5*time.Second)
require.Len(cancel1After.Errors, 1)
require.Equal(rivertype.JobStateCancelled, cancelJob1After.State)
require.WithinDuration(time.Now(), *cancelJob1After.FinalizedAt, 5*time.Second)
require.Len(cancelJob1After.Errors, 1)

cancel2After, err := bundle.exec.JobGetByID(ctx, stuckToCancelJob2.ID)
cancelJob2After, err := bundle.exec.JobGetByID(ctx, stuckToCancelJob2.ID)
require.NoError(err)
require.Equal(rivertype.JobStateRunning, cancel2After.State)
require.Nil(cancel2After.FinalizedAt)
require.Equal(rivertype.JobStateRunning, cancelJob2After.State)
require.Nil(cancelJob2After.FinalizedAt)

notRunning1After, err := bundle.exec.JobGetByID(ctx, notRunningJob1.ID)
notRunningJob1After, err := bundle.exec.JobGetByID(ctx, notRunningJob1.ID)
require.NoError(err)
require.Equal(notRunning1After.State, notRunningJob1.State)
notRunning2After, err := bundle.exec.JobGetByID(ctx, notRunningJob2.ID)
require.Equal(notRunningJob1.State, notRunningJob1After.State)
notRunningJob2After, err := bundle.exec.JobGetByID(ctx, notRunningJob2.ID)
require.NoError(err)
require.Equal(notRunningJob2.State, notRunningJob2After.State)
notRunningJob3After, err := bundle.exec.JobGetByID(ctx, notRunningJob3.ID)
require.NoError(err)
require.Equal(notRunningJob3.State, notRunningJob3After.State)

notTimedOutJob1After, err := bundle.exec.JobGetByID(ctx, longTimeOutJob1.ID)
require.NoError(err)
require.Equal(notRunning2After.State, notRunningJob2.State)
notRunning3After, err := bundle.exec.JobGetByID(ctx, notRunningJob3.ID)
require.Equal(rivertype.JobStateRunning, notTimedOutJob1After.State)
notTimedOutJob2After, err := bundle.exec.JobGetByID(ctx, longTimeOutJob2.ID)
require.NoError(err)
require.Equal(notRunning3After.State, notRunningJob3.State)
require.Equal(rivertype.JobStateRetryable, notTimedOutJob2After.State)
})

t.Run("RescuesInBatches", func(t *testing.T) {
Expand Down
Loading