Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add context to error handler's HandleError and HandlePanic #13

Merged
merged 1 commit into from
Nov 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -871,7 +871,7 @@ func Test_Client_ErrorHandler(t *testing.T) {

var errorHandlerCalled bool
config.ErrorHandler = &testErrorHandler{
HandleErrorFunc: func(job *JobRow, err error) *ErrorHandlerResult {
HandleErrorFunc: func(ctx context.Context, job *JobRow, err error) *ErrorHandlerResult {
require.Equal(t, handlerErr, err)
errorHandlerCalled = true
return &ErrorHandlerResult{}
Expand All @@ -893,7 +893,7 @@ func Test_Client_ErrorHandler(t *testing.T) {

var errorHandlerCalled bool
config.ErrorHandler = &testErrorHandler{
HandleErrorFunc: func(job *JobRow, err error) *ErrorHandlerResult {
HandleErrorFunc: func(ctx context.Context, job *JobRow, err error) *ErrorHandlerResult {
var unknownJobKindErr *UnknownJobKindError
require.ErrorAs(t, err, &unknownJobKindErr)
require.Equal(t, *unknownJobKindErr, UnknownJobKindError{Kind: "RandomWorkerNameThatIsNeverRegistered"})
Expand Down Expand Up @@ -925,7 +925,7 @@ func Test_Client_ErrorHandler(t *testing.T) {

var panicHandlerCalled bool
config.ErrorHandler = &testErrorHandler{
HandlePanicFunc: func(job *JobRow, panicVal any) *ErrorHandlerResult {
HandlePanicFunc: func(ctx context.Context, job *JobRow, panicVal any) *ErrorHandlerResult {
require.Equal(t, "panic val", panicVal)
panicHandlerCalled = true
return &ErrorHandlerResult{}
Expand Down
12 changes: 10 additions & 2 deletions error_handler.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,22 @@
package river

import "context"

// ErrorHandler provides an interface that will be invoked in case of an error
// or panic occurring in the job. This is often useful for logging and exception
// tracking, but can also be used to customize retry behavior.
type ErrorHandler interface {
// HandleError is invoked in case of an error occurring in a job.
HandleError(job *JobRow, err error) *ErrorHandlerResult
//
// Context is descended from the one used to start the River client that
// worked the job.
HandleError(ctx context.Context, job *JobRow, err error) *ErrorHandlerResult

// HandlePanic is invoked in case of a panic occurring in a job.
HandlePanic(job *JobRow, panicVal any) *ErrorHandlerResult
//
// Context is descended from the one used to start the River client that
// worked the job.
HandlePanic(ctx context.Context, job *JobRow, panicVal any) *ErrorHandlerResult
}

type ErrorHandlerResult struct {
Expand Down
4 changes: 2 additions & 2 deletions job_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ func (e *jobExecutor) reportError(ctx context.Context) {
if e.ErrorHandler != nil {
fnName = "HandleError"
errorHandler = func() *ErrorHandlerResult {
return e.ErrorHandler.HandleError(e.JobRow, e.result.Err)
return e.ErrorHandler.HandleError(ctx, e.JobRow, e.result.Err)
}
}

Expand All @@ -243,7 +243,7 @@ func (e *jobExecutor) reportError(ctx context.Context) {
if e.ErrorHandler != nil {
fnName = "HandlePanic"
errorHandler = func() *ErrorHandlerResult {
return e.ErrorHandler.HandlePanic(e.JobRow, e.result.PanicVal)
return e.ErrorHandler.HandlePanic(ctx, e.JobRow, e.result.PanicVal)
}
}
}
Expand Down
37 changes: 17 additions & 20 deletions job_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,28 +86,28 @@ func (p *retryPolicyNoJitter) NextRetry(job *JobRow) time.Time {

type testErrorHandler struct {
HandleErrorCalled bool
HandleErrorFunc func(job *JobRow, err error) *ErrorHandlerResult
HandleErrorFunc func(ctx context.Context, job *JobRow, err error) *ErrorHandlerResult

HandlePanicCalled bool
HandlePanicFunc func(job *JobRow, panicVal any) *ErrorHandlerResult
HandlePanicFunc func(ctx context.Context, job *JobRow, panicVal any) *ErrorHandlerResult
}

// Test handler with no-ops for both error handling functions.
func newTestErrorHandler() *testErrorHandler {
return &testErrorHandler{
HandleErrorFunc: func(job *JobRow, err error) *ErrorHandlerResult { return nil },
HandlePanicFunc: func(job *JobRow, panicVal any) *ErrorHandlerResult { return nil },
HandleErrorFunc: func(ctx context.Context, job *JobRow, err error) *ErrorHandlerResult { return nil },
HandlePanicFunc: func(ctx context.Context, job *JobRow, panicVal any) *ErrorHandlerResult { return nil },
}
}

func (h *testErrorHandler) HandleError(job *JobRow, err error) *ErrorHandlerResult {
func (h *testErrorHandler) HandleError(ctx context.Context, job *JobRow, err error) *ErrorHandlerResult {
h.HandleErrorCalled = true
return h.HandleErrorFunc(job, err)
return h.HandleErrorFunc(ctx, job, err)
}

func (h *testErrorHandler) HandlePanic(job *JobRow, panicVal any) *ErrorHandlerResult {
func (h *testErrorHandler) HandlePanic(ctx context.Context, job *JobRow, panicVal any) *ErrorHandlerResult {
h.HandlePanicCalled = true
return h.HandlePanicFunc(job, panicVal)
return h.HandlePanicFunc(ctx, job, panicVal)
}

func TestJobExecutor_Execute(t *testing.T) {
Expand Down Expand Up @@ -173,12 +173,9 @@ func TestJobExecutor_Execute(t *testing.T) {
job = jobs[0]

bundle := &testBundle{
adapter: adapter,
completer: completer,
errorHandler: &testErrorHandler{
HandleErrorFunc: func(job *JobRow, err error) *ErrorHandlerResult { return nil },
HandlePanicFunc: func(job *JobRow, panicVal any) *ErrorHandlerResult { return nil },
},
adapter: adapter,
completer: completer,
errorHandler: newTestErrorHandler(),
getUpdatesAndStop: getJobUpdates,
jobRow: jobRowFromInternal(job),
tx: tx,
Expand Down Expand Up @@ -396,7 +393,7 @@ func TestJobExecutor_Execute(t *testing.T) {

workerErr := fmt.Errorf("job error")
executor.WorkUnit = newWorkUnitFactoryWithCustomRetry(func() error { return workerErr }, nil).MakeUnit(bundle.jobRow)
bundle.errorHandler.HandleErrorFunc = func(job *JobRow, err error) *ErrorHandlerResult {
bundle.errorHandler.HandleErrorFunc = func(ctx context.Context, job *JobRow, err error) *ErrorHandlerResult {
require.Equal(t, workerErr, err)
return nil
}
Expand All @@ -418,7 +415,7 @@ func TestJobExecutor_Execute(t *testing.T) {

workerErr := fmt.Errorf("job error")
executor.WorkUnit = newWorkUnitFactoryWithCustomRetry(func() error { return workerErr }, nil).MakeUnit(bundle.jobRow)
bundle.errorHandler.HandleErrorFunc = func(job *JobRow, err error) *ErrorHandlerResult {
bundle.errorHandler.HandleErrorFunc = func(ctx context.Context, job *JobRow, err error) *ErrorHandlerResult {
return &ErrorHandlerResult{SetCancelled: true}
}

Expand All @@ -439,7 +436,7 @@ func TestJobExecutor_Execute(t *testing.T) {

workerErr := fmt.Errorf("job error")
executor.WorkUnit = newWorkUnitFactoryWithCustomRetry(func() error { return workerErr }, nil).MakeUnit(bundle.jobRow)
bundle.errorHandler.HandleErrorFunc = func(job *JobRow, err error) *ErrorHandlerResult {
bundle.errorHandler.HandleErrorFunc = func(ctx context.Context, job *JobRow, err error) *ErrorHandlerResult {
panic("error handled panicked!")
}

Expand Down Expand Up @@ -513,7 +510,7 @@ func TestJobExecutor_Execute(t *testing.T) {
executor, bundle := setup(t)

executor.WorkUnit = newWorkUnitFactoryWithCustomRetry(func() error { panic("panic val") }, nil).MakeUnit(bundle.jobRow)
bundle.errorHandler.HandlePanicFunc = func(job *JobRow, panicVal any) *ErrorHandlerResult {
bundle.errorHandler.HandlePanicFunc = func(ctx context.Context, job *JobRow, panicVal any) *ErrorHandlerResult {
require.Equal(t, "panic val", panicVal)
return nil
}
Expand All @@ -534,7 +531,7 @@ func TestJobExecutor_Execute(t *testing.T) {
executor, bundle := setup(t)

executor.WorkUnit = newWorkUnitFactoryWithCustomRetry(func() error { panic("panic val") }, nil).MakeUnit(bundle.jobRow)
bundle.errorHandler.HandlePanicFunc = func(job *JobRow, panicVal any) *ErrorHandlerResult {
bundle.errorHandler.HandlePanicFunc = func(ctx context.Context, job *JobRow, panicVal any) *ErrorHandlerResult {
return &ErrorHandlerResult{SetCancelled: true}
}

Expand All @@ -554,7 +551,7 @@ func TestJobExecutor_Execute(t *testing.T) {
executor, bundle := setup(t)

executor.WorkUnit = newWorkUnitFactoryWithCustomRetry(func() error { panic("panic val") }, nil).MakeUnit(bundle.jobRow)
bundle.errorHandler.HandlePanicFunc = func(job *JobRow, panicVal any) *ErrorHandlerResult {
bundle.errorHandler.HandlePanicFunc = func(ctx context.Context, job *JobRow, panicVal any) *ErrorHandlerResult {
panic("panic handler panicked!")
}

Expand Down
Loading