From e2a00fc31b5d5b226bab76b3a7a95b614a7dca5b Mon Sep 17 00:00:00 2001 From: Brandur Date: Sun, 12 Nov 2023 11:17:15 -0800 Subject: [PATCH] Add context to error handler's `HandleError` and `HandlePanic` As I was writing the documentation for `ErrorHandler` I realized that it'd probably be desirable for its `HandleError` and `HandlePanic` functions to take a context. The user might, for example, have a logger embedded in context which they'd extract and use the log the error or panic. This could also be done with a member field on implementing struct of course, but it feels like making context available doesn't really have a downside and its presence would likely be expected by some users. Another benefit is that in case someone is doing some heavy lifting in one of these (they probably shouldn't be, but just in case), their handlers could respond to the context cancellation caused by a client `StopAndCancel` shutdown. Currently, the handlers are immune to cancellation and could conceivably cause a shutdown to be stuck. --- client_test.go | 6 +++--- error_handler.go | 12 ++++++++++-- job_executor.go | 4 ++-- job_executor_test.go | 37 +++++++++++++++++-------------------- 4 files changed, 32 insertions(+), 27 deletions(-) diff --git a/client_test.go b/client_test.go index 41709d37..3f6c4354 100644 --- a/client_test.go +++ b/client_test.go @@ -871,7 +871,7 @@ func Test_Client_ErrorHandler(t *testing.T) { var errorHandlerCalled bool config.ErrorHandler = &testErrorHandler{ - HandleErrorFunc: func(job *JobRow, err error) *ErrorHandlerResult { + HandleErrorFunc: func(ctx context.Context, job *JobRow, err error) *ErrorHandlerResult { require.Equal(t, handlerErr, err) errorHandlerCalled = true return &ErrorHandlerResult{} @@ -893,7 +893,7 @@ func Test_Client_ErrorHandler(t *testing.T) { var errorHandlerCalled bool config.ErrorHandler = &testErrorHandler{ - HandleErrorFunc: func(job *JobRow, err error) *ErrorHandlerResult { + HandleErrorFunc: func(ctx context.Context, job *JobRow, err error) *ErrorHandlerResult { var unknownJobKindErr *UnknownJobKindError require.ErrorAs(t, err, &unknownJobKindErr) require.Equal(t, *unknownJobKindErr, UnknownJobKindError{Kind: "RandomWorkerNameThatIsNeverRegistered"}) @@ -925,7 +925,7 @@ func Test_Client_ErrorHandler(t *testing.T) { var panicHandlerCalled bool config.ErrorHandler = &testErrorHandler{ - HandlePanicFunc: func(job *JobRow, panicVal any) *ErrorHandlerResult { + HandlePanicFunc: func(ctx context.Context, job *JobRow, panicVal any) *ErrorHandlerResult { require.Equal(t, "panic val", panicVal) panicHandlerCalled = true return &ErrorHandlerResult{} diff --git a/error_handler.go b/error_handler.go index 950b9401..da76bcc5 100644 --- a/error_handler.go +++ b/error_handler.go @@ -1,14 +1,22 @@ package river +import "context" + // ErrorHandler provides an interface that will be invoked in case of an error // or panic occurring in the job. This is often useful for logging and exception // tracking, but can also be used to customize retry behavior. type ErrorHandler interface { // HandleError is invoked in case of an error occurring in a job. - HandleError(job *JobRow, err error) *ErrorHandlerResult + // + // Context is descended from the one used to start the River client that + // worked the job. + HandleError(ctx context.Context, job *JobRow, err error) *ErrorHandlerResult // HandlePanic is invoked in case of a panic occurring in a job. - HandlePanic(job *JobRow, panicVal any) *ErrorHandlerResult + // + // Context is descended from the one used to start the River client that + // worked the job. + HandlePanic(ctx context.Context, job *JobRow, panicVal any) *ErrorHandlerResult } type ErrorHandlerResult struct { diff --git a/job_executor.go b/job_executor.go index f3d71752..9a80efeb 100644 --- a/job_executor.go +++ b/job_executor.go @@ -232,7 +232,7 @@ func (e *jobExecutor) reportError(ctx context.Context) { if e.ErrorHandler != nil { fnName = "HandleError" errorHandler = func() *ErrorHandlerResult { - return e.ErrorHandler.HandleError(e.JobRow, e.result.Err) + return e.ErrorHandler.HandleError(ctx, e.JobRow, e.result.Err) } } @@ -243,7 +243,7 @@ func (e *jobExecutor) reportError(ctx context.Context) { if e.ErrorHandler != nil { fnName = "HandlePanic" errorHandler = func() *ErrorHandlerResult { - return e.ErrorHandler.HandlePanic(e.JobRow, e.result.PanicVal) + return e.ErrorHandler.HandlePanic(ctx, e.JobRow, e.result.PanicVal) } } } diff --git a/job_executor_test.go b/job_executor_test.go index 9d6e6b27..76435238 100644 --- a/job_executor_test.go +++ b/job_executor_test.go @@ -86,28 +86,28 @@ func (p *retryPolicyNoJitter) NextRetry(job *JobRow) time.Time { type testErrorHandler struct { HandleErrorCalled bool - HandleErrorFunc func(job *JobRow, err error) *ErrorHandlerResult + HandleErrorFunc func(ctx context.Context, job *JobRow, err error) *ErrorHandlerResult HandlePanicCalled bool - HandlePanicFunc func(job *JobRow, panicVal any) *ErrorHandlerResult + HandlePanicFunc func(ctx context.Context, job *JobRow, panicVal any) *ErrorHandlerResult } // Test handler with no-ops for both error handling functions. func newTestErrorHandler() *testErrorHandler { return &testErrorHandler{ - HandleErrorFunc: func(job *JobRow, err error) *ErrorHandlerResult { return nil }, - HandlePanicFunc: func(job *JobRow, panicVal any) *ErrorHandlerResult { return nil }, + HandleErrorFunc: func(ctx context.Context, job *JobRow, err error) *ErrorHandlerResult { return nil }, + HandlePanicFunc: func(ctx context.Context, job *JobRow, panicVal any) *ErrorHandlerResult { return nil }, } } -func (h *testErrorHandler) HandleError(job *JobRow, err error) *ErrorHandlerResult { +func (h *testErrorHandler) HandleError(ctx context.Context, job *JobRow, err error) *ErrorHandlerResult { h.HandleErrorCalled = true - return h.HandleErrorFunc(job, err) + return h.HandleErrorFunc(ctx, job, err) } -func (h *testErrorHandler) HandlePanic(job *JobRow, panicVal any) *ErrorHandlerResult { +func (h *testErrorHandler) HandlePanic(ctx context.Context, job *JobRow, panicVal any) *ErrorHandlerResult { h.HandlePanicCalled = true - return h.HandlePanicFunc(job, panicVal) + return h.HandlePanicFunc(ctx, job, panicVal) } func TestJobExecutor_Execute(t *testing.T) { @@ -173,12 +173,9 @@ func TestJobExecutor_Execute(t *testing.T) { job = jobs[0] bundle := &testBundle{ - adapter: adapter, - completer: completer, - errorHandler: &testErrorHandler{ - HandleErrorFunc: func(job *JobRow, err error) *ErrorHandlerResult { return nil }, - HandlePanicFunc: func(job *JobRow, panicVal any) *ErrorHandlerResult { return nil }, - }, + adapter: adapter, + completer: completer, + errorHandler: newTestErrorHandler(), getUpdatesAndStop: getJobUpdates, jobRow: jobRowFromInternal(job), tx: tx, @@ -396,7 +393,7 @@ func TestJobExecutor_Execute(t *testing.T) { workerErr := fmt.Errorf("job error") executor.WorkUnit = newWorkUnitFactoryWithCustomRetry(func() error { return workerErr }, nil).MakeUnit(bundle.jobRow) - bundle.errorHandler.HandleErrorFunc = func(job *JobRow, err error) *ErrorHandlerResult { + bundle.errorHandler.HandleErrorFunc = func(ctx context.Context, job *JobRow, err error) *ErrorHandlerResult { require.Equal(t, workerErr, err) return nil } @@ -418,7 +415,7 @@ func TestJobExecutor_Execute(t *testing.T) { workerErr := fmt.Errorf("job error") executor.WorkUnit = newWorkUnitFactoryWithCustomRetry(func() error { return workerErr }, nil).MakeUnit(bundle.jobRow) - bundle.errorHandler.HandleErrorFunc = func(job *JobRow, err error) *ErrorHandlerResult { + bundle.errorHandler.HandleErrorFunc = func(ctx context.Context, job *JobRow, err error) *ErrorHandlerResult { return &ErrorHandlerResult{SetCancelled: true} } @@ -439,7 +436,7 @@ func TestJobExecutor_Execute(t *testing.T) { workerErr := fmt.Errorf("job error") executor.WorkUnit = newWorkUnitFactoryWithCustomRetry(func() error { return workerErr }, nil).MakeUnit(bundle.jobRow) - bundle.errorHandler.HandleErrorFunc = func(job *JobRow, err error) *ErrorHandlerResult { + bundle.errorHandler.HandleErrorFunc = func(ctx context.Context, job *JobRow, err error) *ErrorHandlerResult { panic("error handled panicked!") } @@ -513,7 +510,7 @@ func TestJobExecutor_Execute(t *testing.T) { executor, bundle := setup(t) executor.WorkUnit = newWorkUnitFactoryWithCustomRetry(func() error { panic("panic val") }, nil).MakeUnit(bundle.jobRow) - bundle.errorHandler.HandlePanicFunc = func(job *JobRow, panicVal any) *ErrorHandlerResult { + bundle.errorHandler.HandlePanicFunc = func(ctx context.Context, job *JobRow, panicVal any) *ErrorHandlerResult { require.Equal(t, "panic val", panicVal) return nil } @@ -534,7 +531,7 @@ func TestJobExecutor_Execute(t *testing.T) { executor, bundle := setup(t) executor.WorkUnit = newWorkUnitFactoryWithCustomRetry(func() error { panic("panic val") }, nil).MakeUnit(bundle.jobRow) - bundle.errorHandler.HandlePanicFunc = func(job *JobRow, panicVal any) *ErrorHandlerResult { + bundle.errorHandler.HandlePanicFunc = func(ctx context.Context, job *JobRow, panicVal any) *ErrorHandlerResult { return &ErrorHandlerResult{SetCancelled: true} } @@ -554,7 +551,7 @@ func TestJobExecutor_Execute(t *testing.T) { executor, bundle := setup(t) executor.WorkUnit = newWorkUnitFactoryWithCustomRetry(func() error { panic("panic val") }, nil).MakeUnit(bundle.jobRow) - bundle.errorHandler.HandlePanicFunc = func(job *JobRow, panicVal any) *ErrorHandlerResult { + bundle.errorHandler.HandlePanicFunc = func(ctx context.Context, job *JobRow, panicVal any) *ErrorHandlerResult { panic("panic handler panicked!") }