Skip to content

Commit

Permalink
Add context to error handler's HandleError and HandlePanic
Browse files Browse the repository at this point in the history
As I was writing the documentation for `ErrorHandler` I realized that
it'd probably be desirable for its `HandleError` and `HandlePanic`
functions to take a context. The user might, for example, have a logger
embedded in context which they'd extract and use the log the error or
panic. This could also be done with a member field on implementing
struct of course, but it feels like making context available doesn't
really have a downside and its presence would likely be expected by some
users.

Another benefit is that in case someone is doing some heavy lifting in
one of these (they probably shouldn't be, but just in case), their
handlers could respond to the context cancellation caused by a client
`StopAndCancel` shutdown. Currently, the handlers are immune to
cancellation and could conceivably cause a shutdown to be stuck.
  • Loading branch information
brandur committed Nov 12, 2023
1 parent a9e0090 commit 34d4dcc
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 27 deletions.
6 changes: 3 additions & 3 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -871,7 +871,7 @@ func Test_Client_ErrorHandler(t *testing.T) {

var errorHandlerCalled bool
config.ErrorHandler = &testErrorHandler{
HandleErrorFunc: func(job *JobRow, err error) *ErrorHandlerResult {
HandleErrorFunc: func(ctx context.Context, job *JobRow, err error) *ErrorHandlerResult {
require.Equal(t, handlerErr, err)
errorHandlerCalled = true
return &ErrorHandlerResult{}
Expand All @@ -893,7 +893,7 @@ func Test_Client_ErrorHandler(t *testing.T) {

var errorHandlerCalled bool
config.ErrorHandler = &testErrorHandler{
HandleErrorFunc: func(job *JobRow, err error) *ErrorHandlerResult {
HandleErrorFunc: func(ctx context.Context, job *JobRow, err error) *ErrorHandlerResult {
var unknownJobKindErr *UnknownJobKindError
require.ErrorAs(t, err, &unknownJobKindErr)
require.Equal(t, *unknownJobKindErr, UnknownJobKindError{Kind: "RandomWorkerNameThatIsNeverRegistered"})
Expand Down Expand Up @@ -925,7 +925,7 @@ func Test_Client_ErrorHandler(t *testing.T) {

var panicHandlerCalled bool
config.ErrorHandler = &testErrorHandler{
HandlePanicFunc: func(job *JobRow, panicVal any) *ErrorHandlerResult {
HandlePanicFunc: func(ctx context.Context, job *JobRow, panicVal any) *ErrorHandlerResult {
require.Equal(t, "panic val", panicVal)
panicHandlerCalled = true
return &ErrorHandlerResult{}
Expand Down
12 changes: 10 additions & 2 deletions error_handler.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,22 @@
package river

import "context"

// ErrorHandler provides an interface that will be invoked in case of an error
// or panic occurring in the job. This is often useful for logging and exception
// tracking, but can also be used to customize retry behavior.
type ErrorHandler interface {
// HandleError is invoked in case of an error occurring in a job.
HandleError(job *JobRow, err error) *ErrorHandlerResult
//
// Context is descended from the one used to start the River client that
// worked the job.
HandleError(ctx context.Context, job *JobRow, err error) *ErrorHandlerResult

// HandlePanic is invoked in case of a panic occurring in a job.
HandlePanic(job *JobRow, panicVal any) *ErrorHandlerResult
//
// Context is descended from the one used to start the River client that
// worked the job.
HandlePanic(ctx context.Context, job *JobRow, panicVal any) *ErrorHandlerResult
}

type ErrorHandlerResult struct {
Expand Down
4 changes: 2 additions & 2 deletions job_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ func (e *jobExecutor) reportError(ctx context.Context) {
if e.ErrorHandler != nil {
fnName = "HandleError"
errorHandler = func() *ErrorHandlerResult {
return e.ErrorHandler.HandleError(e.JobRow, e.result.Err)
return e.ErrorHandler.HandleError(ctx, e.JobRow, e.result.Err)
}
}

Expand All @@ -243,7 +243,7 @@ func (e *jobExecutor) reportError(ctx context.Context) {
if e.ErrorHandler != nil {
fnName = "HandlePanic"
errorHandler = func() *ErrorHandlerResult {
return e.ErrorHandler.HandlePanic(e.JobRow, e.result.PanicVal)
return e.ErrorHandler.HandlePanic(ctx, e.JobRow, e.result.PanicVal)
}
}
}
Expand Down
37 changes: 17 additions & 20 deletions job_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,28 +86,28 @@ func (p *retryPolicyNoJitter) NextRetry(job *JobRow) time.Time {

type testErrorHandler struct {
HandleErrorCalled bool
HandleErrorFunc func(job *JobRow, err error) *ErrorHandlerResult
HandleErrorFunc func(ctx context.Context, job *JobRow, err error) *ErrorHandlerResult

HandlePanicCalled bool
HandlePanicFunc func(job *JobRow, panicVal any) *ErrorHandlerResult
HandlePanicFunc func(ctx context.Context, job *JobRow, panicVal any) *ErrorHandlerResult
}

// Test handler with no-ops for both error handling functions.
func newTestErrorHandler() *testErrorHandler {
return &testErrorHandler{
HandleErrorFunc: func(job *JobRow, err error) *ErrorHandlerResult { return nil },
HandlePanicFunc: func(job *JobRow, panicVal any) *ErrorHandlerResult { return nil },
HandleErrorFunc: func(ctx context.Context, job *JobRow, err error) *ErrorHandlerResult { return nil },
HandlePanicFunc: func(ctx context.Context, job *JobRow, panicVal any) *ErrorHandlerResult { return nil },
}
}

func (h *testErrorHandler) HandleError(job *JobRow, err error) *ErrorHandlerResult {
func (h *testErrorHandler) HandleError(ctx context.Context, job *JobRow, err error) *ErrorHandlerResult {
h.HandleErrorCalled = true
return h.HandleErrorFunc(job, err)
return h.HandleErrorFunc(ctx, job, err)
}

func (h *testErrorHandler) HandlePanic(job *JobRow, panicVal any) *ErrorHandlerResult {
func (h *testErrorHandler) HandlePanic(ctx context.Context, job *JobRow, panicVal any) *ErrorHandlerResult {
h.HandlePanicCalled = true
return h.HandlePanicFunc(job, panicVal)
return h.HandlePanicFunc(ctx, job, panicVal)
}

func TestJobExecutor_Execute(t *testing.T) {
Expand Down Expand Up @@ -173,12 +173,9 @@ func TestJobExecutor_Execute(t *testing.T) {
job = jobs[0]

bundle := &testBundle{
adapter: adapter,
completer: completer,
errorHandler: &testErrorHandler{
HandleErrorFunc: func(job *JobRow, err error) *ErrorHandlerResult { return nil },
HandlePanicFunc: func(job *JobRow, panicVal any) *ErrorHandlerResult { return nil },
},
adapter: adapter,
completer: completer,
errorHandler: newTestErrorHandler(),
getUpdatesAndStop: getJobUpdates,
jobRow: jobRowFromInternal(job),
tx: tx,
Expand Down Expand Up @@ -396,7 +393,7 @@ func TestJobExecutor_Execute(t *testing.T) {

workerErr := fmt.Errorf("job error")
executor.WorkUnit = newWorkUnitFactoryWithCustomRetry(func() error { return workerErr }, nil).MakeUnit(bundle.jobRow)
bundle.errorHandler.HandleErrorFunc = func(job *JobRow, err error) *ErrorHandlerResult {
bundle.errorHandler.HandleErrorFunc = func(ctx context.Context, job *JobRow, err error) *ErrorHandlerResult {
require.Equal(t, workerErr, err)
return nil
}
Expand All @@ -418,7 +415,7 @@ func TestJobExecutor_Execute(t *testing.T) {

workerErr := fmt.Errorf("job error")
executor.WorkUnit = newWorkUnitFactoryWithCustomRetry(func() error { return workerErr }, nil).MakeUnit(bundle.jobRow)
bundle.errorHandler.HandleErrorFunc = func(job *JobRow, err error) *ErrorHandlerResult {
bundle.errorHandler.HandleErrorFunc = func(ctx context.Context, job *JobRow, err error) *ErrorHandlerResult {
return &ErrorHandlerResult{SetCancelled: true}
}

Expand All @@ -439,7 +436,7 @@ func TestJobExecutor_Execute(t *testing.T) {

workerErr := fmt.Errorf("job error")
executor.WorkUnit = newWorkUnitFactoryWithCustomRetry(func() error { return workerErr }, nil).MakeUnit(bundle.jobRow)
bundle.errorHandler.HandleErrorFunc = func(job *JobRow, err error) *ErrorHandlerResult {
bundle.errorHandler.HandleErrorFunc = func(ctx context.Context, job *JobRow, err error) *ErrorHandlerResult {
panic("error handled panicked!")
}

Expand Down Expand Up @@ -513,7 +510,7 @@ func TestJobExecutor_Execute(t *testing.T) {
executor, bundle := setup(t)

executor.WorkUnit = newWorkUnitFactoryWithCustomRetry(func() error { panic("panic val") }, nil).MakeUnit(bundle.jobRow)
bundle.errorHandler.HandlePanicFunc = func(job *JobRow, panicVal any) *ErrorHandlerResult {
bundle.errorHandler.HandlePanicFunc = func(ctx context.Context, job *JobRow, panicVal any) *ErrorHandlerResult {
require.Equal(t, "panic val", panicVal)
return nil
}
Expand All @@ -534,7 +531,7 @@ func TestJobExecutor_Execute(t *testing.T) {
executor, bundle := setup(t)

executor.WorkUnit = newWorkUnitFactoryWithCustomRetry(func() error { panic("panic val") }, nil).MakeUnit(bundle.jobRow)
bundle.errorHandler.HandlePanicFunc = func(job *JobRow, panicVal any) *ErrorHandlerResult {
bundle.errorHandler.HandlePanicFunc = func(ctx context.Context, job *JobRow, panicVal any) *ErrorHandlerResult {
return &ErrorHandlerResult{SetCancelled: true}
}

Expand All @@ -554,7 +551,7 @@ func TestJobExecutor_Execute(t *testing.T) {
executor, bundle := setup(t)

executor.WorkUnit = newWorkUnitFactoryWithCustomRetry(func() error { panic("panic val") }, nil).MakeUnit(bundle.jobRow)
bundle.errorHandler.HandlePanicFunc = func(job *JobRow, panicVal any) *ErrorHandlerResult {
bundle.errorHandler.HandlePanicFunc = func(ctx context.Context, job *JobRow, panicVal any) *ErrorHandlerResult {
panic("panic handler panicked!")
}

Expand Down

0 comments on commit 34d4dcc

Please sign in to comment.