Event Driver is a lightweight and flexible event-driven programming framework for managing and handling events in your applications. It provides a simple and intuitive API to facilitate communication between different components or modules in your software.
- Event-driven Architecture: Easily implement an event-driven architecture in your application.
- Custom Handlers: Define and dispatch custom handlers tailored to your application's needs.
- Pipeline Structure: Simply put your handlers in order and expect it to work like a pipeline.
- Asynchronous Support: Handle events asynchronously for improved performance and responsiveness.
- Lightweight and Easy to Use: Minimal dependencies and quick integration & usage.
This tutorial is in the format of a case study of a real-world service.
There's an event-driven service that processes orders when event1
happens,
but the processing logic only uses data from event2
and event3
.
This service needs to
- be fault-tolerant (shouldn't crash at errors, and should be able to recover fast when crash does happen)
- be idempotent (not processing duplicate orders)
-
Let's start with joining the events. Create a joiner that makes a join when all required events are present.
myJoiner := joiner.New(joiner.MatchAll("event1", "event2", "event3"), myEventStore)
-
The event joiner that we just created requires an event store for lookups. Here we pick GCS rather than the in-memory store to avoid losing data at restart.
gcsTimeout := time.Second * 5 // tune the timeout that fits your content size gcsConfig := gcs_event_store.Config("my-bucket"). WithTimeout(gcs_event_store.Timeout{Default: &gcsTimeout}) // can also specify finer timeout for each operation // Create a GCS event store without authentication just for showcase. myEventStore, err := gcs_event_store.New(ctx, gcsConfig, option.WithoutAuthentication())
-
Create a cache for idempotency. The cache stores the events under the same GCS bucket as joiner (beware of source name conflict between them).
The cache achieves idempotency by skipping the process (i.e. not passing result to the next handler) on key conflict.
idempotencyHandler := cache.New(myEventStore, cache.SkipOnConflict())
-
It would also be great if we can just ignore the content from
event1
to save both GCS storage and the network bandwidth & memory of service pod.eraseContentOfEvent1 := transformer.EraseContentFromSources("event1") eraseContentOfEvent1Handler := transformer.New(eraseContentOfEvent1)
-
Now we need to build a customized handler to handle the business logic.
package business_handler import ( "github.com/honestbank/event-driver/handlers" ) type MyInput struct { event2 Event2Type `json:"event2"` event3 Event3Type `json:"event3"` } type myHandler struct { config Config } func New(config Config) handlers.Handler { return &myHandler{ config: config, } } func (c *myHandler) Process(ctx context.Context, in *event.Message, next handlers.CallNext) error { var myInput MyInput if err := json.UnMarshal(in.GetContent(), &myInput); err != nil { return err } if err = handlesBusinessLogic(myInput, c.config); err != nil { return err } return next.Call(ctx, in) // can omit if this is already the last handler }
-
Build a pipeline with the handlers
myPipeline := pipeline.New(). WithNextHandler(eraseContentOfEvent1Handler). // remove the content of event1 before joining WithNextHandler(myJoiner). // join all events of the same key into a single message WithNextHandler(idempotencyHandler). // check for idempotency after a joint message is formed WithNextHandler(businessHandler) // handles the business logic
-
Start serving traffic!
// Showcasing converting the pipeline into KNative cloud events handler // One can also use sarama/confluentic kafka, Google Cloud function, etc. handleKNativeEvent := convert.ToKNativeEventHandler( convert.CloudEventToInput, myPipeline, convert.OutputToCloudResult) cloudEventClient.StartReceiver(ctx, handleKNativeEvent) // assume cloudEventClient is already created
Now assemble the step break-downs above to an example code
package main
import (
"context"
"log"
"time"
"google.golang.org/api/option"
"github.com/honestbank/event-driver/event"
"github.com/honestbank/event-driver/extensions/cloudevents/convert"
"github.com/honestbank/event-driver/extensions/google-cloud/storage/gcs_event_store"
"github.com/honestbank/event-driver/handlers/cache"
"github.com/honestbank/event-driver/handlers/joiner"
"github.com/honestbank/event-driver/handlers/transformer"
"github.com/honestbank/event-driver/pipeline"
"github.com/honestbank/event-driver/storage"
)
func main() {
ctx := context.Background()
eraseContentOfEvent1 := transformer.EraseContentFromSources("event1")
eraseContentOfEvent1Handler := transformer.New(eraseContentOfEvent1)
myEventStore, err := createEventStore(ctx)
if err != nil {
log.Panic("failed to create GCS event store", err)
}
myJoiner := joiner.New(joiner.MatchAll("event1", "event2", "event3"), myEventStore)
idempotencyHandler := cache.New(myEventStore, cache.SkipOnConflict())
businessHandler := business_handler.New()
myPipeline := pipeline.New().
WithNextHandler(eraseContentOfEvent1Handler). // remove the content of event1 before joining
WithNextHandler(myJoiner). // join all events of the same key into a single message
WithNextHandler(idempotencyHandler). // check for idempotency after a joint message is formed
WithNextHandler(businessHandler) // handles the business logic
// Showcasing converting the pipeline into KNative cloud events handler
// One can also use sarama/confluentic kafka, Google Cloud function, etc.
handleKNativeEvent := convert.ToKNativeEventHandler(
convert.CloudEventToInput,
myPipeline,
convert.OutputToCloudResult)
cloudEventClient.StartReceiver(ctx, handleKNativeEvent) // assume cloudEventClient is already created
}
func createEventStore(ctx context.Context) (storage.EventStore, error) {
gcsTimeout := time.Second * 5 // tune the timeout that fits your content size
gcsConfig := gcs_event_store.Config("my-bucket").
WithTimeout(gcs_event_store.Timeout{Default: &gcsTimeout}) // can also specify finer timeout for each operation
// Create a GCS event store without authentication just for showcase.
return gcs_event_store.New(ctx, gcsConfig, option.WithoutAuthentication())
}
Event Driver also provides the following libs for integrating with other services/frameworks as extensions.
Link: github.com/honestbank/event-driver/extensions/cloudevents
Integrate event driver with Cloud Events, i.e. providing a converter that converts the event driver pipeline into a cloud events handler. Check the document to see what is currently supported and the latest update.
Link: github.com/honestbank/event-driver/extensions/google-cloud
Integrate event driver with Google Cloud, including using GCS/BigQuery as event store, integrating the event driver pipeline with Cloud Functions, etc. Check the document to see what is currently supported and the latest update.