diff --git a/CHANGELOG.md b/CHANGELOG.md index 814ca82c..daeeadab 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Fixed + +- River now considers per-worker timeout overrides when rescuing jobs so that jobs with a long custom timeout won't be rescued prematurely. [PR #350](https://github.com/riverqueue/river/pull/350). + ## [0.6.0] - 2024-05-08 ### Added diff --git a/internal/maintenance/job_rescuer.go b/internal/maintenance/job_rescuer.go index 146a1135..41edd91f 100644 --- a/internal/maintenance/job_rescuer.go +++ b/internal/maintenance/job_rescuer.go @@ -11,6 +11,7 @@ import ( "github.com/riverqueue/river/internal/baseservice" "github.com/riverqueue/river/internal/maintenance/startstop" "github.com/riverqueue/river/internal/rivercommon" + "github.com/riverqueue/river/internal/util/ptrutil" "github.com/riverqueue/river/internal/util/timeutil" "github.com/riverqueue/river/internal/util/valutil" "github.com/riverqueue/river/internal/workunit" @@ -164,22 +165,20 @@ func (s *JobRescuer) runOnce(ctx context.Context) (*rescuerRunOnceResult, error) now := time.Now().UTC() rescueManyParams := riverdriver.JobRescueManyParams{ - ID: make([]int64, len(stuckJobs)), - Error: make([][]byte, len(stuckJobs)), - FinalizedAt: make([]time.Time, len(stuckJobs)), - ScheduledAt: make([]time.Time, len(stuckJobs)), - State: make([]string, len(stuckJobs)), + ID: make([]int64, 0, len(stuckJobs)), + Error: make([][]byte, 0, len(stuckJobs)), + FinalizedAt: make([]time.Time, 0, len(stuckJobs)), + ScheduledAt: make([]time.Time, 0, len(stuckJobs)), + State: make([]string, 0, len(stuckJobs)), } - for i, job := range stuckJobs { - rescueManyParams.ID[i] = job.ID - + for _, job := range stuckJobs { var metadata metadataWithCancelAttemptedAt if err := json.Unmarshal(job.Metadata, &metadata); err != nil { return nil, fmt.Errorf("error unmarshaling job metadata: %w", err) } - rescueManyParams.Error[i], err = json.Marshal(rivertype.AttemptError{ + errorData, err := json.Marshal(rivertype.AttemptError{ At: now, Attempt: max(job.Attempt, 0), Error: "Stuck job rescued by Rescuer", @@ -189,29 +188,41 @@ func (s *JobRescuer) runOnce(ctx context.Context) (*rescuerRunOnceResult, error) return nil, fmt.Errorf("error marshaling error JSON: %w", err) } + addRescueParam := func(state rivertype.JobState, finalizedAt *time.Time, scheduledAt time.Time) { + rescueManyParams.ID = append(rescueManyParams.ID, job.ID) + rescueManyParams.Error = append(rescueManyParams.Error, errorData) + rescueManyParams.FinalizedAt = append(rescueManyParams.FinalizedAt, ptrutil.ValOrDefault(finalizedAt, time.Time{})) + rescueManyParams.ScheduledAt = append(rescueManyParams.ScheduledAt, scheduledAt) + rescueManyParams.State = append(rescueManyParams.State, string(state)) + } + if !metadata.CancelAttemptedAt.IsZero() { res.NumJobsCancelled++ - rescueManyParams.FinalizedAt[i] = now - rescueManyParams.ScheduledAt[i] = job.ScheduledAt // reuse previous value - rescueManyParams.State[i] = string(rivertype.JobStateCancelled) + addRescueParam(rivertype.JobStateCancelled, &now, job.ScheduledAt) // reused previous scheduled value continue } - shouldRetry, retryAt := s.makeRetryDecision(ctx, job) - if shouldRetry { - res.NumJobsRetried++ - rescueManyParams.ScheduledAt[i] = retryAt - rescueManyParams.State[i] = string(rivertype.JobStateRetryable) - } else { + + retryDecision, retryAt := s.makeRetryDecision(ctx, job, now) + + switch retryDecision { + case jobRetryDecisionDiscard: res.NumJobsDiscarded++ - rescueManyParams.FinalizedAt[i] = now - rescueManyParams.ScheduledAt[i] = job.ScheduledAt // reuse previous value - rescueManyParams.State[i] = string(rivertype.JobStateDiscarded) + addRescueParam(rivertype.JobStateDiscarded, &now, job.ScheduledAt) // reused previous scheduled value + + case jobRetryDecisionIgnore: + // job not timed out yet due to kind-specific timeout value; ignore + + case jobRetryDecisionRetry: + res.NumJobsRetried++ + addRescueParam(rivertype.JobStateRetryable, nil, retryAt) } } - _, err = s.exec.JobRescueMany(ctx, &rescueManyParams) - if err != nil { - return nil, fmt.Errorf("error rescuing stuck jobs: %w", err) + if len(rescueManyParams.ID) > 0 { + _, err = s.exec.JobRescueMany(ctx, &rescueManyParams) + if err != nil { + return nil, fmt.Errorf("error rescuing stuck jobs: %w", err) + } } s.TestSignals.UpdatedBatch.Signal(struct{}{}) @@ -245,14 +256,24 @@ func (s *JobRescuer) getStuckJobs(ctx context.Context) ([]*rivertype.JobRow, err }) } +// jobRetryDecision is a signal from makeRetryDecision as to what to do with a +// particular job that appears to be eligible for rescue. +type jobRetryDecision int + +const ( + jobRetryDecisionDiscard jobRetryDecision = iota // discard the job + jobRetryDecisionIgnore // don't retry or discard the job + jobRetryDecisionRetry // retry the job +) + // makeRetryDecision decides whether or not a rescued job should be retried, and if so, // when. -func (s *JobRescuer) makeRetryDecision(ctx context.Context, job *rivertype.JobRow) (bool, time.Time) { +func (s *JobRescuer) makeRetryDecision(ctx context.Context, job *rivertype.JobRow, now time.Time) (jobRetryDecision, time.Time) { workUnitFactory := s.Config.WorkUnitFactoryFunc(job.Kind) if workUnitFactory == nil { s.Logger.ErrorContext(ctx, s.Name+": Attempted to rescue unhandled job kind, discarding", slog.String("job_kind", job.Kind), slog.Int64("job_id", job.ID)) - return false, time.Time{} + return jobRetryDecisionDiscard, time.Time{} } workUnit := workUnitFactory.MakeUnit(job) @@ -261,9 +282,18 @@ func (s *JobRescuer) makeRetryDecision(ctx context.Context, job *rivertype.JobRo slog.String("job_kind", job.Kind), slog.Int64("job_id", job.ID)) } + if workUnit.Timeout() != 0 && now.Sub(*job.AttemptedAt) < workUnit.Timeout() { + return jobRetryDecisionIgnore, time.Time{} + } + nextRetry := workUnit.NextRetry() if nextRetry.IsZero() { nextRetry = s.Config.ClientRetryPolicy.NextRetry(job) } - return job.Attempt < max(job.MaxAttempts, 0), nextRetry + + if job.Attempt < max(job.MaxAttempts, 0) { + return jobRetryDecisionRetry, nextRetry + } + + return jobRetryDecisionDiscard, time.Time{} } diff --git a/internal/maintenance/job_rescuer_test.go b/internal/maintenance/job_rescuer_test.go index 66617842..92137750 100644 --- a/internal/maintenance/job_rescuer_test.go +++ b/internal/maintenance/job_rescuer_test.go @@ -23,20 +23,22 @@ import ( // callbackWorkUnitFactory wraps a Worker to implement workUnitFactory. type callbackWorkUnitFactory struct { Callback func(ctx context.Context, jobRow *rivertype.JobRow) error + timeout time.Duration // defaults to 0, which signals default timeout } func (w *callbackWorkUnitFactory) MakeUnit(jobRow *rivertype.JobRow) workunit.WorkUnit { - return &callbackWorkUnit{callback: w.Callback, jobRow: jobRow} + return &callbackWorkUnit{callback: w.Callback, jobRow: jobRow, timeout: w.timeout} } // callbackWorkUnit implements workUnit for a job and Worker. type callbackWorkUnit struct { callback func(ctx context.Context, jobRow *rivertype.JobRow) error jobRow *rivertype.JobRow + timeout time.Duration // defaults to 0, which signals default timeout } func (w *callbackWorkUnit) NextRetry() time.Time { return time.Now().Add(30 * time.Second) } -func (w *callbackWorkUnit) Timeout() time.Duration { return 0 } +func (w *callbackWorkUnit) Timeout() time.Duration { return w.timeout } func (w *callbackWorkUnit) Work(ctx context.Context) error { return w.callback(ctx, w.jobRow) } func (w *callbackWorkUnit) UnmarshalJob() error { return nil } @@ -51,10 +53,13 @@ func (p *SimpleClientRetryPolicy) NextRetry(job *rivertype.JobRow) time.Time { func TestJobRescuer(t *testing.T) { t.Parallel() - const rescuerJobKind = "rescuer" - ctx := context.Background() + const ( + rescuerJobKind = "rescuer" + rescuerJobKindLongTimeout = "rescuer_long_timeout" + ) + type testBundle struct { exec riverdriver.Executor rescueHorizon time.Time @@ -76,8 +81,13 @@ func TestJobRescuer(t *testing.T) { Interval: JobRescuerIntervalDefault, RescueAfter: JobRescuerRescueAfterDefault, WorkUnitFactoryFunc: func(kind string) workunit.WorkUnitFactory { - if kind == rescuerJobKind { - return &callbackWorkUnitFactory{Callback: func(ctx context.Context, jobRow *rivertype.JobRow) error { return nil }} + emptyCallback := func(ctx context.Context, jobRow *rivertype.JobRow) error { return nil } + + switch kind { + case rescuerJobKind: + return &callbackWorkUnitFactory{Callback: emptyCallback} + case rescuerJobKindLongTimeout: + return &callbackWorkUnitFactory{Callback: emptyCallback, timeout: JobRescuerRescueAfterDefault + 5*time.Minute} } panic("unhandled kind: " + kind) }, @@ -135,11 +145,18 @@ func TestJobRescuer(t *testing.T) { stuckToCancelJob1 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{Kind: ptrutil.Ptr(rescuerJobKind), State: ptrutil.Ptr(rivertype.JobStateRunning), AttemptedAt: ptrutil.Ptr(bundle.rescueHorizon.Add(-1 * time.Hour)), Metadata: []byte(fmt.Sprintf(`{"cancel_attempted_at": %q}`, cancelTime)), MaxAttempts: ptrutil.Ptr(5)}) stuckToCancelJob2 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{Kind: ptrutil.Ptr(rescuerJobKind), State: ptrutil.Ptr(rivertype.JobStateRunning), AttemptedAt: ptrutil.Ptr(bundle.rescueHorizon.Add(1 * time.Minute)), Metadata: []byte(fmt.Sprintf(`{"cancel_attempted_at": %q}`, cancelTime)), MaxAttempts: ptrutil.Ptr(5)}) // won't be rescued - // these aren't touched: + // these aren't touched because they're in ineligible states notRunningJob1 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{Kind: ptrutil.Ptr(rescuerJobKind), FinalizedAt: ptrutil.Ptr(bundle.rescueHorizon.Add(-1 * time.Hour)), State: ptrutil.Ptr(rivertype.JobStateCompleted), AttemptedAt: ptrutil.Ptr(bundle.rescueHorizon.Add(-1 * time.Hour)), MaxAttempts: ptrutil.Ptr(5)}) notRunningJob2 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{Kind: ptrutil.Ptr(rescuerJobKind), FinalizedAt: ptrutil.Ptr(bundle.rescueHorizon.Add(-1 * time.Hour)), State: ptrutil.Ptr(rivertype.JobStateDiscarded), AttemptedAt: ptrutil.Ptr(bundle.rescueHorizon.Add(-1 * time.Hour)), MaxAttempts: ptrutil.Ptr(5)}) notRunningJob3 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{Kind: ptrutil.Ptr(rescuerJobKind), FinalizedAt: ptrutil.Ptr(bundle.rescueHorizon.Add(-1 * time.Hour)), State: ptrutil.Ptr(rivertype.JobStateCancelled), AttemptedAt: ptrutil.Ptr(bundle.rescueHorizon.Add(-1 * time.Hour)), MaxAttempts: ptrutil.Ptr(5)}) + // Jobs with worker-specific long timeouts. The first isn't rescued + // because the difference between its `attempted_at` and now is still + // within the timeout threshold. The second _is_ rescued because it + // started earlier and even with the longer timeout, has still timed out. + longTimeOutJob1 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{Kind: ptrutil.Ptr(rescuerJobKindLongTimeout), State: ptrutil.Ptr(rivertype.JobStateRunning), AttemptedAt: ptrutil.Ptr(bundle.rescueHorizon.Add(-1 * time.Minute)), MaxAttempts: ptrutil.Ptr(5)}) + longTimeOutJob2 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{Kind: ptrutil.Ptr(rescuerJobKindLongTimeout), State: ptrutil.Ptr(rivertype.JobStateRunning), AttemptedAt: ptrutil.Ptr(bundle.rescueHorizon.Add(-6 * time.Minute)), MaxAttempts: ptrutil.Ptr(5)}) + require.NoError(cleaner.Start(ctx)) cleaner.TestSignals.FetchedBatch.WaitOrTimeout() @@ -158,37 +175,44 @@ func TestJobRescuer(t *testing.T) { require.NoError(err) require.Equal(stuckToRetryJob3.State, job3After.State) // not rescued - discard1After, err := bundle.exec.JobGetByID(ctx, stuckToDiscardJob1.ID) + discardJob1After, err := bundle.exec.JobGetByID(ctx, stuckToDiscardJob1.ID) require.NoError(err) - require.Equal(rivertype.JobStateDiscarded, discard1After.State) - require.WithinDuration(time.Now(), *discard1After.FinalizedAt, 5*time.Second) - require.Len(discard1After.Errors, 1) + require.Equal(rivertype.JobStateDiscarded, discardJob1After.State) + require.WithinDuration(time.Now(), *discardJob1After.FinalizedAt, 5*time.Second) + require.Len(discardJob1After.Errors, 1) - discard2After, err := bundle.exec.JobGetByID(ctx, stuckToDiscardJob2.ID) + discardJob2After, err := bundle.exec.JobGetByID(ctx, stuckToDiscardJob2.ID) require.NoError(err) - require.Equal(rivertype.JobStateRunning, discard2After.State) - require.Nil(discard2After.FinalizedAt) + require.Equal(rivertype.JobStateRunning, discardJob2After.State) + require.Nil(discardJob2After.FinalizedAt) - cancel1After, err := bundle.exec.JobGetByID(ctx, stuckToCancelJob1.ID) + cancelJob1After, err := bundle.exec.JobGetByID(ctx, stuckToCancelJob1.ID) require.NoError(err) - require.Equal(rivertype.JobStateCancelled, cancel1After.State) - require.WithinDuration(time.Now(), *cancel1After.FinalizedAt, 5*time.Second) - require.Len(cancel1After.Errors, 1) + require.Equal(rivertype.JobStateCancelled, cancelJob1After.State) + require.WithinDuration(time.Now(), *cancelJob1After.FinalizedAt, 5*time.Second) + require.Len(cancelJob1After.Errors, 1) - cancel2After, err := bundle.exec.JobGetByID(ctx, stuckToCancelJob2.ID) + cancelJob2After, err := bundle.exec.JobGetByID(ctx, stuckToCancelJob2.ID) require.NoError(err) - require.Equal(rivertype.JobStateRunning, cancel2After.State) - require.Nil(cancel2After.FinalizedAt) + require.Equal(rivertype.JobStateRunning, cancelJob2After.State) + require.Nil(cancelJob2After.FinalizedAt) - notRunning1After, err := bundle.exec.JobGetByID(ctx, notRunningJob1.ID) + notRunningJob1After, err := bundle.exec.JobGetByID(ctx, notRunningJob1.ID) require.NoError(err) - require.Equal(notRunning1After.State, notRunningJob1.State) - notRunning2After, err := bundle.exec.JobGetByID(ctx, notRunningJob2.ID) + require.Equal(notRunningJob1.State, notRunningJob1After.State) + notRunningJob2After, err := bundle.exec.JobGetByID(ctx, notRunningJob2.ID) + require.NoError(err) + require.Equal(notRunningJob2.State, notRunningJob2After.State) + notRunningJob3After, err := bundle.exec.JobGetByID(ctx, notRunningJob3.ID) + require.NoError(err) + require.Equal(notRunningJob3.State, notRunningJob3After.State) + + notTimedOutJob1After, err := bundle.exec.JobGetByID(ctx, longTimeOutJob1.ID) require.NoError(err) - require.Equal(notRunning2After.State, notRunningJob2.State) - notRunning3After, err := bundle.exec.JobGetByID(ctx, notRunningJob3.ID) + require.Equal(rivertype.JobStateRunning, notTimedOutJob1After.State) + notTimedOutJob2After, err := bundle.exec.JobGetByID(ctx, longTimeOutJob2.ID) require.NoError(err) - require.Equal(notRunning3After.State, notRunningJob3.State) + require.Equal(rivertype.JobStateRetryable, notTimedOutJob2After.State) }) t.Run("RescuesInBatches", func(t *testing.T) {