diff --git a/.github/workflows/go-coverage.yaml b/.github/workflows/go-coverage.yml similarity index 100% rename from .github/workflows/go-coverage.yaml rename to .github/workflows/go-coverage.yml diff --git a/.golangci.yml b/.golangci.yml index 1f01c9b..1e3b1bf 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -104,7 +104,7 @@ linters: - errcheck - errorlint - exhaustive - - exportloopref + - copyloopvar - funlen - gochecknoglobals - gochecknoinits diff --git a/pkg/eventrunner/app_interface.go b/pkg/eventrunner/app_interface.go index 477c03b..7ab49e0 100644 --- a/pkg/eventrunner/app_interface.go +++ b/pkg/eventrunner/app_interface.go @@ -8,7 +8,7 @@ import ( "gofr.dev/pkg/gofr/migration" ) -// AppWrapper wraps a *gofr.App and implements AppInterface +// AppWrapper wraps a *gofr.App and implements AppInterface. type AppWrapper struct { app *gofr.App } diff --git a/pkg/eventrunner/cassandra_sink.go b/pkg/eventrunner/cassandra_sink.go index be3f0d2..8c4cfba 100644 --- a/pkg/eventrunner/cassandra_sink.go +++ b/pkg/eventrunner/cassandra_sink.go @@ -15,15 +15,32 @@ func NewCassandraEventSink() *CassandraEventSink { return &CassandraEventSink{} } -func (s *CassandraEventSink) ConsumeEvent(ctx *gofr.Context, event *cloudevents.Event) error { +// CassandraInsertError is a custom error type for Cassandra insertion errors. +type CassandraInsertError struct { + OriginalError error +} + +// Error implements the error interface for CassandraInsertError. +func (cie *CassandraInsertError) Error() string { + return fmt.Sprintf("failed to insert event into Cassandra: %v", cie.OriginalError) +} + +// Unwrap allows errors.Is and errors.As to work with CassandraInsertError. +func (cie *CassandraInsertError) Unwrap() error { + return cie.OriginalError +} + +func (*CassandraEventSink) ConsumeEvent(ctx *gofr.Context, event *cloudevents.Event) error { if ctx == nil { - return fmt.Errorf("nil context provided to CassandraEventSink") + return errNilContext } + if event == nil { - return fmt.Errorf("nil event provided to CassandraEventSink") + return errNilEvent } + if ctx.Cassandra == nil { - return fmt.Errorf("cassandra client is nil in CassandraEventSink") + return errNilCassandra } // Get the event data as []byte @@ -56,7 +73,7 @@ func (s *CassandraEventSink) ConsumeEvent(ctx *gofr.Context, event *cloudevents. event.SpecVersion(), ) if err != nil { - return fmt.Errorf("failed to insert event into Cassandra: %w", err) + return &CassandraInsertError{OriginalError: err} } return nil diff --git a/pkg/eventrunner/cassandra_sink_test.go b/pkg/eventrunner/cassandra_sink_test.go index ccb7d66..432afd8 100644 --- a/pkg/eventrunner/cassandra_sink_test.go +++ b/pkg/eventrunner/cassandra_sink_test.go @@ -27,6 +27,7 @@ func (m *MockCassandra) Exec(stmt string, values ...any) error { if m.ExecFunc != nil { return m.ExecFunc(stmt, values...) } + return nil } @@ -34,6 +35,7 @@ func (m *MockCassandra) Query(dest any, stmt string, values ...any) error { if m.QueryFunc != nil { return m.QueryFunc(dest, stmt, values...) } + return nil } @@ -41,6 +43,7 @@ func (m *MockCassandra) ExecCAS(dest any, stmt string, values ...any) (bool, err if m.ExecCASFunc != nil { return m.ExecCASFunc(dest, stmt, values...) } + return true, nil } @@ -48,6 +51,7 @@ func (m *MockCassandra) NewBatch(name string, batchType int) error { if m.NewBatchFunc != nil { return m.NewBatchFunc(name, batchType) } + return nil } @@ -55,6 +59,7 @@ func (m *MockCassandra) BatchQuery(name, stmt string, values ...any) error { if m.BatchQueryFunc != nil { m.BatchQueryFunc(name, stmt, values...) } + return nil } @@ -62,6 +67,7 @@ func (m *MockCassandra) ExecuteBatch(name string) error { if m.ExecuteBatchFunc != nil { return m.ExecuteBatchFunc(name) } + return nil } @@ -69,6 +75,7 @@ func (m *MockCassandra) ExecuteBatchCAS(name string, dest ...any) (bool, error) if m.ExecuteBatchCASFunc != nil { return m.ExecuteBatchCASFunc(name, dest...) } + return true, nil } @@ -76,10 +83,11 @@ func (m *MockCassandra) HealthCheck(ctx context.Context) (any, error) { if m.HealthCheckFunc != nil { return m.HealthCheckFunc(ctx) } + return "OK", nil } -// MockContext attempts to mimic the structure of gofr.Context +// MockContext attempts to mimic the structure of gofr.Context. type MockContext struct { *container.Container } @@ -99,20 +107,22 @@ func TestCassandraEventSink_ConsumeEvent(t *testing.T) { event.SetSubject("test-subject") event.SetTime(time.Now()) event.SetDataContentType("application/json") - event.SetData(cloudevents.ApplicationJSON, map[string]string{"key": "value"}) + err := event.SetData(cloudevents.ApplicationJSON, map[string]string{"key": "value"}) + require.NoError(t, err) mockCassandra := &MockCassandra{} + mockContext := NewMockContext() mockContext.Container.Cassandra = mockCassandra - err := sink.ConsumeEvent(mockContext, &event) + err = sink.ConsumeEvent(mockContext, &event) require.NoError(t, err) } func TestCassandraEventSink_ConsumeEvent_NilContext(t *testing.T) { sink := NewCassandraEventSink() err := sink.ConsumeEvent(nil, &cloudevents.Event{}) - assert.Error(t, err) + require.Error(t, err) assert.Contains(t, err.Error(), "nil context provided") } @@ -120,7 +130,7 @@ func TestCassandraEventSink_ConsumeEvent_NilEvent(t *testing.T) { sink := NewCassandraEventSink() mockContext := NewMockContext() err := sink.ConsumeEvent(mockContext, nil) - assert.Error(t, err) + require.Error(t, err) assert.Contains(t, err.Error(), "nil event provided") } @@ -129,19 +139,24 @@ func TestCassandraEventSink_ConsumeEvent_NilCassandra(t *testing.T) { mockContext := NewMockContext() mockContext.Container.Cassandra = nil err := sink.ConsumeEvent(mockContext, &cloudevents.Event{}) - assert.Error(t, err) + require.Error(t, err) assert.Contains(t, err.Error(), "cassandra client is nil") } func TestCassandraEventSink_ConsumeEvent_InvalidJSON(t *testing.T) { sink := NewCassandraEventSink() + event := cloudevents.NewEvent() - event.SetData(cloudevents.ApplicationJSON, []byte("invalid json")) + err := event.SetData(cloudevents.ApplicationJSON, []byte("invalid json")) + require.NoError(t, err) + mockCassandra := &MockCassandra{} + mockContext := NewMockContext() mockContext.Container.Cassandra = mockCassandra - err := sink.ConsumeEvent(mockContext, &event) - assert.Error(t, err) + + err = sink.ConsumeEvent(mockContext, &event) + require.Error(t, err) assert.Contains(t, err.Error(), "event data is not valid JSON") } @@ -154,17 +169,18 @@ func TestCassandraEventSink_ConsumeEvent_CassandraError(t *testing.T) { event.SetSubject("test-subject") event.SetTime(time.Now()) event.SetDataContentType("application/json") - event.SetData(cloudevents.ApplicationJSON, map[string]string{"key": "value"}) + err := event.SetData(cloudevents.ApplicationJSON, map[string]string{"key": "value"}) + require.NoError(t, err) mockCassandra := &MockCassandra{ - ExecFunc: func(stmt string, values ...any) error { + ExecFunc: func(string, ...any) error { return assert.AnError }, } mockContext := NewMockContext() mockContext.Container.Cassandra = mockCassandra - err := sink.ConsumeEvent(mockContext, &event) - assert.Error(t, err) + err = sink.ConsumeEvent(mockContext, &event) + require.Error(t, err) assert.Contains(t, err.Error(), "failed to insert event into Cassandra") } diff --git a/pkg/eventrunner/consumer_manager.go b/pkg/eventrunner/consumer_manager.go index fadd376..4ab52b7 100644 --- a/pkg/eventrunner/consumer_manager.go +++ b/pkg/eventrunner/consumer_manager.go @@ -3,6 +3,7 @@ package eventrunner import ( "fmt" + "strings" "sync" cloudevents "github.com/cloudevents/sdk-go/v2" @@ -31,31 +32,54 @@ func (cm *ConsumerManager) AddConsumer(name string, consumer EventConsumer) { cm.consumers[name] = consumer } +// ConsumerErrors is a custom error type that holds multiple errors. +type ConsumerErrors struct { + Errors []error +} + +// Error implements the error interface for ConsumerErrors. +func (ce *ConsumerErrors) Error() string { + if len(ce.Errors) == 0 { + return "no errors occurred" + } + + errorStrings := make([]string, len(ce.Errors)) + + for i, err := range ce.Errors { + errorStrings[i] = err.Error() + } + + return fmt.Sprintf("errors occurred while consuming event: %s", strings.Join(errorStrings, "; ")) +} + func (cm *ConsumerManager) ConsumeEvent(c *gofr.Context, event *cloudevents.Event) error { cm.mu.RLock() defer cm.mu.RUnlock() if c == nil { - return fmt.Errorf("nil context provided to ConsumerManager") + return errNilContext } + if event == nil { - return fmt.Errorf("nil event provided to ConsumerManager") + return errNilEvent } - var errors []error + var consumerErrors ConsumerErrors + for name, consumer := range cm.consumers { if consumer == nil { cm.logger.Logf("EventConsumer %s is nil, skipping", name) continue } + if err := consumer.ConsumeEvent(c, event); err != nil { cm.logger.Errorf("EventConsumer %s failed: %v", name, err) - errors = append(errors, fmt.Errorf("consumer %s failed: %w", name, err)) + consumerErrors.Errors = append(consumerErrors.Errors, fmt.Errorf("consumer %s failed: %w", name, err)) } } - if len(errors) > 0 { - return fmt.Errorf("errors occurred while consuming event: %v", errors) + if len(consumerErrors.Errors) > 0 { + return &consumerErrors } return nil diff --git a/pkg/eventrunner/consumer_manager_test.go b/pkg/eventrunner/consumer_manager_test.go index c6b247d..84f6bc5 100644 --- a/pkg/eventrunner/consumer_manager_test.go +++ b/pkg/eventrunner/consumer_manager_test.go @@ -1,7 +1,6 @@ package eventrunner import ( - "fmt" "testing" cloudevents "github.com/cloudevents/sdk-go/v2" @@ -60,7 +59,7 @@ func TestConsumerManager_ConsumeEvent(t *testing.T) { cm.AddConsumer("test", consumer) err := cm.ConsumeEvent(mockContext, &mockEvent) - assert.NoError(t, err) + require.NoError(t, err) cm.consumers = make(map[string]EventConsumer) // Reset consumers }) @@ -70,7 +69,7 @@ func TestConsumerManager_ConsumeEvent(t *testing.T) { successConsumer.EXPECT().ConsumeEvent(mockContext, &mockEvent).Return(nil) failConsumer := NewMockEventConsumer(ctrl) - failConsumer.EXPECT().ConsumeEvent(mockContext, &mockEvent).Return(fmt.Errorf("consumer failed")) + failConsumer.EXPECT().ConsumeEvent(mockContext, &mockEvent).Return(errConsumerFailed) cm.AddConsumer("success", successConsumer) cm.AddConsumer("fail", failConsumer) @@ -86,20 +85,20 @@ func TestConsumerManager_ConsumeEvent(t *testing.T) { cm.AddConsumer("nil", nil) err := cm.ConsumeEvent(mockContext, &mockEvent) - assert.NoError(t, err) + require.NoError(t, err) cm.consumers = make(map[string]EventConsumer) // Reset consumers }) t.Run("Nil context", func(t *testing.T) { err := cm.ConsumeEvent(nil, &mockEvent) - assert.Error(t, err) + require.Error(t, err) assert.Contains(t, err.Error(), "nil context provided") }) t.Run("Nil event", func(t *testing.T) { err := cm.ConsumeEvent(mockContext, nil) - assert.Error(t, err) + require.Error(t, err) assert.Contains(t, err.Error(), "nil event provided") }) } diff --git a/pkg/eventrunner/errors.go b/pkg/eventrunner/errors.go new file mode 100644 index 0000000..a4d6977 --- /dev/null +++ b/pkg/eventrunner/errors.go @@ -0,0 +1,13 @@ +package eventrunner + +import "errors" + +var ( + errBufferForcedWriteError = errors.New("forced write error") + errPublishError = errors.New("publish error") + errConsumeEventError = errors.New("consume event error") + errConsumerFailed = errors.New("consumer failed") + errNilContext = errors.New("nil context provided") + errNilEvent = errors.New("nil event provided") + errNilCassandra = errors.New("cassandra client is nil in CassandraEventSink") +) diff --git a/pkg/eventrunner/eventsink.go b/pkg/eventrunner/eventsink.go index 6d27b95..f6a0f28 100644 --- a/pkg/eventrunner/eventsink.go +++ b/pkg/eventrunner/eventsink.go @@ -10,7 +10,7 @@ type LogEventSink struct { // Add any necessary fields (e.g., database connection) } -func (s *LogEventSink) LogEvent(ctx context.Context, event *cloudevents.Event) error { +func (*LogEventSink) LogEvent(context.Context, *cloudevents.Event) error { // Implement event logging logic here return nil } diff --git a/pkg/eventrunner/router.go b/pkg/eventrunner/router.go index 200f7e8..713a3b3 100644 --- a/pkg/eventrunner/router.go +++ b/pkg/eventrunner/router.go @@ -44,6 +44,7 @@ func NewEventRouter(app AppInterface, natsClient NATSClient, cassandraClient *ca } cassandraClient = cassandraPkg.New(cassandraConfig) } + app.AddCassandra(cassandraClient) // Add migrations to run @@ -68,6 +69,7 @@ func NewEventRouter(app AppInterface, natsClient NATSClient, cassandraClient *ca realNatsClient.Connect() natsClient = realNatsClient } + app.AddPubSub(natsClient) consumerManager := NewConsumerManager(app, app.Logger()) @@ -83,6 +85,7 @@ func NewEventRouter(app AppInterface, natsClient NATSClient, cassandraClient *ca logger: app.Logger(), } er.getBufferFunc = er.defaultGetBuffer + return er } @@ -117,6 +120,7 @@ func (er *EventRouter) handleEvent(c *gofr.Context) error { event.SetSource("eventrunner") event.SetType("com.example.event") event.SetTime(time.Now()) + if err := event.SetData(cloudevents.ApplicationJSON, rawMessage); err != nil { return fmt.Errorf("failed to set event data: %w", err) } @@ -124,6 +128,7 @@ func (er *EventRouter) handleEvent(c *gofr.Context) error { // Apply middlewares handler := er.applyMiddleware(er.routeEvent) + return handler(c, &event) } @@ -131,6 +136,7 @@ func (er *EventRouter) applyMiddleware(handler HandlerFunc) HandlerFunc { for i := len(er.middlewares) - 1; i >= 0; i-- { handler = er.middlewares[i](handler) } + return handler } @@ -138,7 +144,6 @@ func (er *EventRouter) routeEvent(c *gofr.Context, event *cloudevents.Event) err // Log event using ConsumerManager if err := er.consumerManager.ConsumeEvent(c, event); err != nil { er.logger.Errorf("Failed to consume event: %v", err) - // Continue processing even if logging fails } eventType := event.Type() @@ -155,6 +160,7 @@ func (er *EventRouter) routeEvent(c *gofr.Context, event *cloudevents.Event) err } messageID := uuid.New().String() + if err := er.natsClient.Publish(c, consumerQueue, buf.Bytes()); err != nil { return fmt.Errorf("failed to publish event: %w", err) } diff --git a/pkg/eventrunner/router_test.go b/pkg/eventrunner/router_test.go index 9ea5416..fb78376 100644 --- a/pkg/eventrunner/router_test.go +++ b/pkg/eventrunner/router_test.go @@ -9,6 +9,8 @@ import ( "sync" "testing" + "github.com/stretchr/testify/require" + cloudevents "github.com/cloudevents/sdk-go/v2" "github.com/google/uuid" "github.com/stretchr/testify/assert" @@ -33,6 +35,7 @@ func (r *MockRequest) Param(key string) string { if vals, ok := r.params[key]; ok && len(vals) > 0 { return vals[0] } + return "" } @@ -44,6 +47,7 @@ func (r *MockRequest) PathParam(key string) string { if vals, ok := r.params[key]; ok && len(vals) > 0 { return vals[0] } + return "" } @@ -51,7 +55,7 @@ func (r *MockRequest) Bind(i interface{}) error { return json.Unmarshal(r.body, i) } -func (r *MockRequest) HostName() string { +func (*MockRequest) HostName() string { return "localhost" } @@ -105,7 +109,6 @@ func TestEventRouter_handleEvent(t *testing.T) { err := er.handleEvent(mockContext) assert.NoError(t, err) }) - } func TestEventRouter_routeEvent(t *testing.T) { @@ -300,7 +303,7 @@ func TestEventRouter_handleEvent_BindError(t *testing.T) { } err := er.handleEvent(mockContext) - assert.Error(t, err) + require.Error(t, err) assert.Contains(t, err.Error(), "failed to parse message") } @@ -311,7 +314,7 @@ func TestEventRouter_applyMiddleware_NoMiddlewares(t *testing.T) { handlerCalled := false - handler := func(c *gofr.Context, e *cloudevents.Event) error { + handler := func(*gofr.Context, *cloudevents.Event) error { handlerCalled = true return nil } @@ -320,20 +323,22 @@ func TestEventRouter_applyMiddleware_NoMiddlewares(t *testing.T) { // Call the handler err := wrappedHandler(&gofr.Context{}, &cloudevents.Event{}) - assert.NoError(t, err) + require.NoError(t, err) assert.True(t, handlerCalled) } func TestEventRouter_applyMiddleware_WithMiddlewares(t *testing.T) { er := &EventRouter{} - callSequence := []string{} + var callSequence []string middleware1 := func(next HandlerFunc) HandlerFunc { return func(c *gofr.Context, e *cloudevents.Event) error { callSequence = append(callSequence, "middleware1 before") err := next(c, e) + callSequence = append(callSequence, "middleware1 after") + return err } } @@ -342,7 +347,9 @@ func TestEventRouter_applyMiddleware_WithMiddlewares(t *testing.T) { return func(c *gofr.Context, e *cloudevents.Event) error { callSequence = append(callSequence, "middleware2 before") err := next(c, e) + callSequence = append(callSequence, "middleware2 after") + return err } } @@ -350,7 +357,7 @@ func TestEventRouter_applyMiddleware_WithMiddlewares(t *testing.T) { er.Use(middleware1) er.Use(middleware2) - handler := func(c *gofr.Context, e *cloudevents.Event) error { + handler := func(*gofr.Context, *cloudevents.Event) error { callSequence = append(callSequence, "handler") return nil } @@ -359,7 +366,7 @@ func TestEventRouter_applyMiddleware_WithMiddlewares(t *testing.T) { // Call the handler err := wrappedHandler(&gofr.Context{}, &cloudevents.Event{}) - assert.NoError(t, err) + require.NoError(t, err) // Assert the call sequence expectedSequence := []string{ @@ -377,7 +384,7 @@ func TestEventRouter_routeEvent_ConsumeEventError(t *testing.T) { defer ctrl.Finish() mockConsumerManager := NewMockEventConsumer(ctrl) - mockConsumerManager.EXPECT().ConsumeEvent(gomock.Any(), gomock.Any()).Return(fmt.Errorf("consume event error")).Times(1) + mockConsumerManager.EXPECT().ConsumeEvent(gomock.Any(), gomock.Any()).Return(errConsumeEventError).Times(1) mockApp := NewMockAppInterface(ctrl) mockLogger := logging.NewMockLogger(logging.INFO) @@ -416,8 +423,7 @@ func TestEventRouter_routeEvent_PublishError(t *testing.T) { mockConsumerManager.EXPECT().ConsumeEvent(gomock.Any(), gomock.Any()).Return(nil).Times(1) mockNatsClient := NewMockNATSClient(ctrl) - // mockNatsClient.EXPECT().Publish(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).Times(1) - mockNatsClient.EXPECT().Publish(gomock.Any(), gomock.Any(), gomock.Any()).Return(fmt.Errorf("publish error")).Times(1) + mockNatsClient.EXPECT().Publish(gomock.Any(), gomock.Any(), gomock.Any()).Return(errPublishError).Times(1) er := &EventRouter{ app: gofr.New(), @@ -447,7 +453,7 @@ func TestEventRouter_routeEvent_PublishError(t *testing.T) { } err := er.routeEvent(mockContext, &event) - assert.Error(t, err) + require.Error(t, err) assert.Contains(t, err.Error(), "failed to publish event") } @@ -481,28 +487,29 @@ func TestEventRouter_routeEvent_EncodeError(t *testing.T) { event.SetID(uuid.New().String()) event.SetType("test.event") event.SetSource("test") - event.SetData(cloudevents.ApplicationJSON, map[string]string{"key": "value"}) + err := event.SetData(cloudevents.ApplicationJSON, map[string]string{"key": "value"}) + require.NoError(t, err) mockContext := &gofr.Context{ Context: context.Background(), } - err := er.routeEvent(mockContext, &event) - assert.Error(t, err, "Expected an error but got none") + err = er.routeEvent(mockContext, &event) + require.Error(t, err, "Expected an error but got none") t.Logf("Actual error: %v", err) assert.Contains(t, err.Error(), "failed to encode event", "Error message does not contain the expected substring") } -// failingBuffer is a custom buffer that always fails on Write +// failingBuffer is a custom buffer that always fails on Write. type failingBuffer struct{} -func (fb *failingBuffer) Write(p []byte) (n int, err error) { +func (*failingBuffer) Write([]byte) (n int, err error) { fmt.Println("Forced write error") - return 0, fmt.Errorf("forced write error") + return 0, errBufferForcedWriteError } -func (fb *failingBuffer) Bytes() []byte { +func (*failingBuffer) Bytes() []byte { return []byte{} } -func (fb *failingBuffer) Reset() {} +func (*failingBuffer) Reset() {}