Skip to content

Commit

Permalink
reflex: Added event publication to error recording
Browse files Browse the repository at this point in the history
Summary: Added event publication to error recording to dead letter consumer

Issue: [Base Issue](https://gitlab.com/lunomoney/product-engineering/pods/customer-trust/work/-/issues/1950)
  • Loading branch information
jrkilloran committed Feb 9, 2024
1 parent 44174c1 commit 5d7f4c0
Show file tree
Hide file tree
Showing 11 changed files with 254 additions and 134 deletions.
26 changes: 21 additions & 5 deletions api.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,13 +179,29 @@ type CursorStore interface {
// recovery), return the same error if it could not be handled or even return a different error.
type RecoveryFunc func(context.Context, *Event, Consumer, error) error

// ErrorInsertFunc abstracts the insertion of an event into a sql table.
type ErrorInsertFunc func(ctx context.Context, consumer string, eventID string, errMsg string) error
// ConsumerError is record of record reflex event consumer errors.
type ConsumerError struct {
ID string
Consumer string
EventID string
Message string
Timestamp time.Time
Status ErrorStatus
}

// ErrorStatus is the current status of a consumer error.
type ErrorStatus int

func (e ErrorStatus) ReflexType() int {
return int(e)
}

func (e ErrorStatus) ShiftStatus() int {
return e.ReflexType()
}

const (
UnknownEventError ErrorStatus = 0
SavedEventError ErrorStatus = 1
SentinelEventError = 2
UnknownEventError ErrorStatus = 0
// EventErrorRecorded - New errors should be saved in this state [initial]
EventErrorRecorded ErrorStatus = 1
)
29 changes: 16 additions & 13 deletions rpatterns/deadletter.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,35 +10,38 @@ import (
"github.com/luno/reflex"
)

// DeadLetterFunc abstracts the recording an error with an event.
type DeadLetterFunc func(ctx context.Context, consumer string, eventID string, errMsg string) error

// NewDeadLetterConsumer returns a reflex consumer that records but ignores errors after
// the provided number of retries and therefore eventually continues to the next event.
// However, if the consumer cannot record the error it will return the error in a blocking
// fashion like a standard consumer.
func NewDeadLetterConsumer(name string, retries int, fn reflex.ConsumerFunc, eFn reflex.ErrorInsertFunc,
func NewDeadLetterConsumer(name string, retries int, cFn reflex.ConsumerFunc, dFn DeadLetterFunc,
opts ...reflex.ConsumerOption,
) reflex.Consumer {
dl := &deadLetter{
name: name,
inner: fn,
retries: retries,
inserter: eFn,
name: name,
process: cFn,
retries: retries,
record: dFn,
}

return reflex.NewConsumer(name, dl.consume, opts...)
}

type deadLetter struct {
name string
inner reflex.ConsumerFunc
retries int
inserter reflex.ErrorInsertFunc
name string
process reflex.ConsumerFunc
retries int
record DeadLetterFunc

retryID string
retryCount int
}

func (d *deadLetter) consume(ctx context.Context, e *reflex.Event) error {
err := d.inner(ctx, e)
err := d.process(ctx, e)
if err == nil {
d.reset("")
return nil
Expand All @@ -50,17 +53,17 @@ func (d *deadLetter) consume(ctx context.Context, e *reflex.Event) error {
d.retryCount++
if d.retryCount > d.retries {
d.reset("")
return d.record(ctx, e, err)
return d.offload(ctx, e, err)
}

return err
}

func (d *deadLetter) record(ctx context.Context, e *reflex.Event, err error) error {
func (d *deadLetter) offload(ctx context.Context, e *reflex.Event, err error) error {
if reflex.IsExpected(err) {
log.Info(ctx, "dead letter consumer ignoring error",
log.WithError(errors.Wrap(err, "", j.MKS{"consumer": d.name, "eventID": e.ID, "errMsg": err.Error()})))
} else if iErr := d.inserter(ctx, d.name, e.ID, err.Error()); iErr != nil {
} else if iErr := d.record(ctx, d.name, e.ID, err.Error()); iErr != nil {
log.Error(ctx, errors.Wrap(err, "dead letter consumer cannot record an error"),
j.MKS{"consumer": d.name, "eventID": e.ID, "errMsg": err.Error(), "recErrMsg": iErr.Error()})
return err
Expand Down
8 changes: 4 additions & 4 deletions rsql/cursorstable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func TestNewCursorsTable(t *testing.T) {

for _, test := range cases {
t.Run(test.name, func(t *testing.T) {
dbc := ConnectTestDB(t, DefaultEventTable(), DefaultCursorTable(), DefaultErrorTable())
dbc := ConnectTestDB(t, DefaultEventTable(), DefaultCursorTable(), DefaultErrorTable(), DefaultErrorEventTable())

var sets int

Expand Down Expand Up @@ -87,7 +87,7 @@ func TestNewCursorsTable(t *testing.T) {
}

func TestAsyncSetCursor(t *testing.T) {
dbc := ConnectTestDB(t, DefaultEventTable(), DefaultCursorTable(), DefaultErrorTable())
dbc := ConnectTestDB(t, DefaultEventTable(), DefaultCursorTable(), DefaultErrorTable(), DefaultErrorEventTable())

s := newTestSleep()

Expand Down Expand Up @@ -120,7 +120,7 @@ func TestAsyncSetCursor(t *testing.T) {
}

func TestSyncSetCursor(t *testing.T) {
dbc := ConnectTestDB(t, DefaultEventTable(), DefaultCursorTable(), DefaultErrorTable())
dbc := ConnectTestDB(t, DefaultEventTable(), DefaultCursorTable(), DefaultErrorTable(), DefaultErrorEventTable())

s := newTestSleep()

Expand All @@ -142,7 +142,7 @@ func TestSyncSetCursor(t *testing.T) {
}

func TestCloneAsyncCursor(t *testing.T) {
dbc := ConnectTestDB(t, DefaultEventTable(), DefaultCursorTable(), DefaultErrorTable())
dbc := ConnectTestDB(t, DefaultEventTable(), DefaultCursorTable(), DefaultErrorTable(), DefaultErrorEventTable())

s := newTestSleep()

Expand Down
37 changes: 28 additions & 9 deletions rsql/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ const (
defaultTraceField = "" // default is empty to support backwards compatibility

defaultErrorTable = "consumer_errors"
defaultErrorEventsSuffix = "_events"
defaultErrorEventMetadataField = "metadata"
defaultErrorIDField = "id"
defaultErrorEventConsumerField = "consumer"
defaultErrorEventIDField = "event_id"
Expand Down Expand Up @@ -268,18 +270,35 @@ func setCursor(ctx context.Context, dbc *sql.DB, schema ctableSchema,
}

// makeDefaultErrorInserter returns the default sql ErrorInsertFunc configured via WithErrorsXField options.
func makeDefaultErrorInserter(schema errTableSchema) errorInserter {
func makeDefaultErrorInserter(schema errTableSchema) ErrorInserter {
msg := "insert consumer error failed"
// TODO(jkilloran): Should we also reset the status to be 1 i.e. EventErrorRecorded status even if it has previously
// been handled/updated to another state. Or should we return any duplicate error in a way so we
// don't write another event off of the same error. Or indeed is it safer as currently written when
// encountering a duplicate error to still write a new event off of it but not to revert the status
// back to recorded.

// NOTE: This insert statement will return the generated autoincrement "id" column value if no (secondary) key is
// already found in the table (i.e. something like consumer + event_id) otherwise it will do a non-op update
// but due to the use of last_insert_id(id) it will still pass the existing row's "id" column back as if it
// was just inserted ensuring that it always returns a reasonable value.
// NB: See the documentation is the following link on the behaviour of "on duplicate key update" https://dev.mysql.com/doc/refman/5.7/en/insert-on-duplicate.html#:~:text=KEY%20UPDATE%20Statement-,13.2.5.2,-INSERT%20...%20ON%20DUPLICATE
// NB: See the documentation is the following link on the behaviour of "on last_insert_id(<expr>)" https://dev.mysql.com/doc/refman/5.7/en/information-functions.html#function_last-insert-id
q := fmt.Sprintf(
"insert into %s set %s=?, %s=?, %s=?, %s=now(6), %s=?",
schema.name, schema.eventConsumerField, schema.eventIDField, schema.errorMsgField, schema.errorTimeField, schema.errorStatusField)
return func(ctx context.Context, dbc *sql.DB, consumer string, eventID string, errMsg string, errStatus reflex.ErrorStatus,
) error {
_, err := dbc.ExecContext(ctx, q, consumer, eventID, errMsg, errStatus)
"insert into %s set %s=?, %s=?, %s=?, %s=now(6), %s=? on duplicate key update %s=last_insert_id(%s)",
schema.name, schema.eventConsumerField, schema.eventIDField, schema.errorMsgField, schema.errorTimeField, schema.errorStatusField, schema.idField, schema.idField)
return func(ctx context.Context, tx *sql.Tx, consumer string, eventID string, errMsg string, errStatus reflex.ErrorStatus) (string, error) {
r, err := tx.ExecContext(ctx, q, consumer, eventID, errMsg, errStatus)
// If the error has already been written then we can ignore the error
if IsDuplicateErrorInsertion(err) {
return nil
if err != nil && !IsDuplicateErrorInsertion(err) {
return "", errors.Wrap(err, msg)
}
// This will still work with a duplicate due the "on duplicate key update id" part of the insert statement above
id, idErr := r.LastInsertId()
if idErr != nil {
return "", errors.Wrap(idErr, msg)
}
return errors.Wrap(err, "insert consumer error failed")
return strconv.FormatInt(id, 10), nil
}
}

Expand Down
35 changes: 24 additions & 11 deletions rsql/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@ import (
var dbTestURI = flag.String("db_test_uri", getDefaultURI(), "Test database uri")

const (
eventsTable = "events"
cursorsTable = "cursors"
errorsTable = "consumer_errors"
eventsTable = "events"
cursorsTable = "cursors"
errorsTable = "consumer_errors"
errorEventsTable = "consumer_errors_events"
)

type EventTableSchema struct {
Expand Down Expand Up @@ -186,8 +187,19 @@ func DefaultErrorTable() ErrorTableSchema {
}
}

func DefaultErrorEventTable() EventTableSchema {
return EventTableSchema{
Name: errorEventsTable,
IDField: "id",
TimeField: "timestamp",
TypeField: "type",
ForeignIDField: "foreign_id",
MetadataField: "metadata",
}
}

// ConnectTestDB returns a db connection with non-temporary DB tables that support multiple connections.
func ConnectTestDB(t *testing.T, ev EventTableSchema, cursor CursorTableSchema, er ErrorTableSchema) *sql.DB {
func ConnectTestDB(t *testing.T, ev EventTableSchema, cursor CursorTableSchema, er ErrorTableSchema, ee EventTableSchema) *sql.DB {
admin, err := sql.Open("mysql", *dbTestURI)
jtest.RequireNil(t, err)

Expand Down Expand Up @@ -216,6 +228,7 @@ func ConnectTestDB(t *testing.T, ev EventTableSchema, cursor CursorTableSchema,
ev.CreateTable(t, dbc)
cursor.CreateTable(t, dbc)
er.CreateTable(t, dbc)
ee.CreateTable(t, dbc)

dbc.SetMaxOpenConns(10)
_, err = dbc.Exec("set time_zone='+00:00';")
Expand Down Expand Up @@ -243,7 +256,7 @@ func getSocketFile() string {
}

func TestGetNextEventNoRows(t *testing.T) {
dbc := ConnectTestDB(t, DefaultEventTable(), DefaultCursorTable(), DefaultErrorTable())
dbc := ConnectTestDB(t, DefaultEventTable(), DefaultCursorTable(), DefaultErrorTable(), DefaultErrorEventTable())

table := rsql.NewEventsTable(eventsTable)

Expand All @@ -253,7 +266,7 @@ func TestGetNextEventNoRows(t *testing.T) {
}

func TestStreamRecv(t *testing.T) {
dbc := ConnectTestDB(t, DefaultEventTable(), DefaultCursorTable(), DefaultErrorTable())
dbc := ConnectTestDB(t, DefaultEventTable(), DefaultCursorTable(), DefaultErrorTable(), DefaultErrorEventTable())

table := rsql.NewEventsTable("events")

Expand All @@ -277,7 +290,7 @@ func TestStreamRecv(t *testing.T) {
}

func TestGetLatestID(t *testing.T) {
dbc := ConnectTestDB(t, DefaultEventTable(), DefaultCursorTable(), DefaultErrorTable())
dbc := ConnectTestDB(t, DefaultEventTable(), DefaultCursorTable(), DefaultErrorTable(), DefaultErrorEventTable())

id, err := rsql.GetLatestIDForTesting(context.Background(), t, dbc, "events", "id")
assert.NoError(t, err)
Expand All @@ -297,7 +310,7 @@ func TestGetLatestID(t *testing.T) {
func TestCursorTableInt(t *testing.T) {
const id = "test"

dbc := ConnectTestDB(t, DefaultEventTable(), DefaultCursorTable(), DefaultErrorTable())
dbc := ConnectTestDB(t, DefaultEventTable(), DefaultCursorTable(), DefaultErrorTable(), DefaultErrorEventTable())

ctable := rsql.NewCursorsTable(cursorsTable, rsql.WithCursorAsyncDisabled())

Expand Down Expand Up @@ -336,7 +349,7 @@ func TestCursorTableString(t *testing.T) {
cursors.CursorType = "varchar(255)"

const id = "test"
dbc := ConnectTestDB(t, DefaultEventTable(), cursors, DefaultErrorTable())
dbc := ConnectTestDB(t, DefaultEventTable(), cursors, DefaultErrorTable(), DefaultErrorEventTable())

ctable := rsql.NewCursorsTable(cursorsTable,
rsql.WithCursorAsyncDisabled(), rsql.WithCursorStrings())
Expand Down Expand Up @@ -371,7 +384,7 @@ func TestCursorTableStringWithInts(t *testing.T) {
const id = "test"
cursors := DefaultCursorTable()
cursors.CursorType = "varchar(255)"
dbc := ConnectTestDB(t, DefaultEventTable(), cursors, DefaultErrorTable())
dbc := ConnectTestDB(t, DefaultEventTable(), cursors, DefaultErrorTable(), DefaultErrorEventTable())

ctable := rsql.NewCursorsTable(cursorsTable,
rsql.WithCursorAsyncDisabled())
Expand Down Expand Up @@ -403,7 +416,7 @@ func TestCursorTableStringWithInts(t *testing.T) {
}

func TestSetCursorBack(t *testing.T) {
dbc := ConnectTestDB(t, DefaultEventTable(), DefaultCursorTable(), DefaultErrorTable())
dbc := ConnectTestDB(t, DefaultEventTable(), DefaultCursorTable(), DefaultErrorTable(), DefaultErrorEventTable())

ctable := rsql.NewCursorsTable(cursorsTable, rsql.WithCursorAsyncDisabled())

Expand Down
Loading

0 comments on commit 5d7f4c0

Please sign in to comment.