Skip to content

Commit

Permalink
reflex: Support error recovery and/or reporting/recording
Browse files Browse the repository at this point in the history
Summary: Adding a hook function to record and/or recover from event consumption errors
  • Loading branch information
jrkilloran committed Sep 11, 2023
1 parent 2225deb commit cc0924a
Show file tree
Hide file tree
Showing 5 changed files with 159 additions and 4 deletions.
11 changes: 9 additions & 2 deletions api.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ type StreamClient interface {
}

// StreamFunc is the main reflex stream interface that all implementations should provide.
// It returns a long lived StreamClient that will stream events from the source.
// It returns a long-lived StreamClient that will stream events from the source.
type StreamFunc func(ctx context.Context, after string, opts ...StreamOption) (StreamClient, error)

// ConsumeFunc is the main reflex consume interface. It blocks while events are
Expand All @@ -157,7 +157,7 @@ type ConsumeFunc func(context.Context, Consumer, ...StreamOption) error
// Consumable is an interface for an object that provides a ConsumeFunc with the name Run.
// Deprecated: Please use Spec.
type Consumable interface {
// Run blocks while events are streamed to consumer. It always returns a non-nil error.
// Consume blocks while events are streamed to consumer. It always returns a non-nil error.
// Cancel the context to return early.
Consume(context.Context, Consumer, ...StreamOption) error
}
Expand All @@ -173,3 +173,10 @@ type CursorStore interface {
// Flush writes any buffered cursors to the underlying store.
Flush(ctx context.Context) error
}

// RecoveryFunc is a function that can be added as a ConsumerOption using the WithRecoverFunction
// function to provide handling for when a consumer function returns an error. This handling can
// just be recording the error or since it takes in the error and returns an error as well it can
// return nil to "recover" from the error (additional work may obviously be needed to do any actual
// recovery), return the same error if it could not be handled or even return a different error.
type RecoveryFunc func(context.Context, fate.Fate, *Event, Consumer, error) error
27 changes: 26 additions & 1 deletion consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type consumer struct {
filterIncludeTypes []EventType
activityKey string
filterEvent EventFilter
rfn RecoveryFunc
}

// ConsumerOption will change the behaviour of the consumer
Expand Down Expand Up @@ -90,6 +91,19 @@ func WithEventFilter(flt EventFilter) ConsumerOption {
}
}

// WithRecoverFunction provides an option to specify a recovery function to be called when a consuming an event returns
// an error and potentially changing the error returned or even eliminate it by return nil. It can also be used for
// notification and/or recording of events that failed to process successfully.
func WithRecoverFunction(rfn RecoveryFunc) ConsumerOption {
return func(c *consumer) {
c.rfn = rfn
}
}

var defaultRecoveryFunc = func(_ context.Context, _ fate.Fate, _ *Event, _ Consumer, err error) error {
return err
}

// NewConsumer returns a new instrumented consumer of events.
func NewConsumer(name string, fn func(context.Context, fate.Fate, *Event) error,
opts ...ConsumerOption,
Expand All @@ -111,6 +125,9 @@ func NewConsumer(name string, fn func(context.Context, fate.Fate, *Event) error,
o(c)
}
c.activityKey = metrics.ConsumerActivityGauge.Register(ls, c.activityTTL)
if c.rfn == nil {
c.rfn = defaultRecoveryFunc
}
_ = c.Reset()
return c
}
Expand Down Expand Up @@ -150,7 +167,7 @@ func (c *consumer) Consume(ctx context.Context, ft fate.Fate,
} else if ok {
err = c.fn(ctx, ft, event)
if err != nil && !IsExpected(err) {
c.errorCounter.Inc()
err = c.consumeError(ctx, ft, event, err)
}

latency := time.Since(t0)
Expand All @@ -162,6 +179,14 @@ func (c *consumer) Consume(ctx context.Context, ft fate.Fate,
return err
}

func (c *consumer) consumeError(ctx context.Context, ft fate.Fate, event *Event, err error) error {
err = c.rfn(ctx, ft, event, c, err)
if err != nil && !IsExpected(err) {
c.errorCounter.Inc()
}
return err
}

func (c *consumer) filter(event *Event) (bool, error) {
ok := len(c.filterIncludeTypes) == 0 || IsAnyType(event.Type, c.filterIncludeTypes...)
if !ok {
Expand Down
120 changes: 120 additions & 0 deletions consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,3 +287,123 @@ func TestConsumeWithSkip(t *testing.T) {
})
}
}

func TestConsumeWithErrorAndReporting(t *testing.T) {
ctx := context.Background()
consumerName := "test_consumer"
f := fate.New()
consumedCounter := new(int)
recoveredCounter := new(int)
eventId := new(int)
cErr := errors.New("cfn errored")
rErr := errors.New("rfn changed error")
cFnErr := func(context.Context, fate.Fate, *Event) error {
*consumedCounter++
return errors.Wrap(cErr, "")
}
cFnCanc := func(context.Context, fate.Fate, *Event) error {
*consumedCounter++
return errors.Wrap(context.Canceled, "")
}
rFnNil := func(_ context.Context, _ fate.Fate, e *Event, c Consumer, err error) error {
*recoveredCounter++
*eventId, _ = strconv.Atoi(e.ID)
return nil
}
rFnSame := func(_ context.Context, _ fate.Fate, e *Event, c Consumer, err error) error {
*recoveredCounter++
*eventId, _ = strconv.Atoi(e.ID)
return err
}
rFnDiff := func(_ context.Context, _ fate.Fate, e *Event, c Consumer, err error) error {
*recoveredCounter++
*eventId, _ = strconv.Atoi(e.ID)
return errors.Wrap(rErr, "")
}
evts := []*Event{
{
ID: "1",
Type: eventType(1),
Timestamp: time.Now(),
},
{
ID: "2",
Type: eventType(2),
Timestamp: time.Now(),
},
{
ID: "3",
Type: eventType(3),
Timestamp: time.Now(),
},
}

allEventIds := []int{1, 2, 3}
noEventIds := []int{0, 0, 0}

tcs := []struct {
name string
c Consumer
expConsumeCount int
expRecoveredCount int
expMetricErrorCount int
expEventIDs []int
expErr error
}{
{
name: "Recover Function notes but doesn't change error",
c: NewConsumer(consumerName, cFnErr, WithRecoverFunction(rFnSame)),
expConsumeCount: 3,
expRecoveredCount: 3,
expMetricErrorCount: 3,
expEventIDs: allEventIds,
expErr: cErr,
},
{
name: "Recover Function not called when expected error",
c: NewConsumer(consumerName, cFnCanc, WithRecoverFunction(rFnSame)),
expConsumeCount: 3,
expRecoveredCount: 0,
expMetricErrorCount: 0,
expEventIDs: noEventIds,
expErr: context.Canceled,
},
{
name: "Recover Function handles error",
c: NewConsumer(consumerName, cFnErr, WithRecoverFunction(rFnNil)),
expConsumeCount: 3,
expRecoveredCount: 3,
expMetricErrorCount: 3,
expEventIDs: allEventIds,
expErr: nil,
},
{
name: "Recover Function throws different error",
c: NewConsumer(consumerName, cFnErr, WithRecoverFunction(rFnDiff)),
expConsumeCount: 3,
expRecoveredCount: 3,
expMetricErrorCount: 3,
expEventIDs: allEventIds,
expErr: rErr,
},
}

for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
metrics.ConsumerErrors.Reset()
metrics.ConsumerSkippedEvents.Reset()
*consumedCounter = 0
*recoveredCounter = 0
*eventId = 0

for i, ev := range evts {
err := tc.c.Consume(ctx, f, ev)
require.Equal(t, tc.expEventIDs[i], *eventId)
jtest.Require(t, tc.expErr, err)
}

require.Equal(t, tc.expConsumeCount, *consumedCounter)
require.Equal(t, tc.expRecoveredCount, *recoveredCounter)
})
}
}
2 changes: 1 addition & 1 deletion options.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type StreamOptions struct {
type StreamOption func(*StreamOptions)

// WithStreamFromHead provides an option to stream only new events from
// from the head of events table. Note this overrides the "after" parameter.
// the head of events table. Note this overrides the "after" parameter.
func WithStreamFromHead() StreamOption {
return func(sc *StreamOptions) {
sc.StreamFromHead = true
Expand Down
3 changes: 3 additions & 0 deletions run.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ func Run(in context.Context, s Spec) error {
}

ctx := log.ContextWith(ctx, j.MKS{
"consumer": s.consumer.Name(),
"event_id": e.ID,
"event_fid": e.ForeignID,
})
Expand All @@ -90,13 +91,15 @@ func Run(in context.Context, s Spec) error {

if err := s.consumer.Consume(ctx, f, e); err != nil {
return errors.Wrap(err, "consume error", j.MKS{
"consumer": s.consumer.Name(),
"event_id": e.ID,
"event_fid": e.ForeignID,
})
}

if err := s.cstore.SetCursor(ctx, s.consumer.Name(), e.ID); err != nil {
return errors.Wrap(err, "set cursor error", j.MKS{
"consumer": s.consumer.Name(),
"event_id": e.ID,
"event_fid": e.ForeignID,
})
Expand Down

0 comments on commit cc0924a

Please sign in to comment.