diff --git a/docs/advanced-guide/using-publisher-subscriber/page.md b/docs/advanced-guide/using-publisher-subscriber/page.md index a6e352377..4135cb874 100644 --- a/docs/advanced-guide/using-publisher-subscriber/page.md +++ b/docs/advanced-guide/using-publisher-subscriber/page.md @@ -176,6 +176,75 @@ docker run -d \ ``` > **Note**: find the default mosquitto config file {% new-tab-link title="here" href="https://github.com/eclipse/mosquitto/blob/master/mosquitto.conf" /%} +### Azure Eventhub +GoFr supports eventhub starting gofr version v1.22.0. + +While subscribing gofr reads from all the partitions of the consumer group provided in the configuration reducing hassle to manage them. + +#### Configs + +Eventhub is supported as an external pubsub provider such that if you are not using it, it doesn't get added in your binary. + +Import the external driver for eventhub using the following command. + +```bash +go get gofr.dev/pkg/gofr/datasources/pubsub/eventhub +``` + +Use the AddPubSub method of GoFr's app to connect + +**Example** +```go + app := gofr.New() + + app.AddPubSub(eventhub.New(eventhub.Config{ + ConnectionString: "Endpoint=sb://gofr-dev.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=", + ContainerConnectionString: "DefaultEndpointsProtocol=https;AccountName=gofrdev;AccountKey=;EndpointSuffix=core.windows.net", + StorageServiceURL: "https://gofrdev.windows.net/", + StorageContainerName: "test", + EventhubName: "test1", + })) +``` + +While subscribing/publishing from eventhub make sure to keep the topic-name same as event-hub name. + +#### Setup + +1. To setup azure eventhub refer the following [documentation](https://learn.microsoft.com/en-us/azure/event-hubs/event-hubs-create). + +2. As GoFr manages reading from all the partitions it needs to store the information about what has been read and what is left for that GoFr uses Azure Container which can be setup from the following [documentation](https://learn.microsoft.com/en-us/azure/storage/blobs/blob-containers-portal). + +##### Mandatory Configs Configuration Map +{% table %} +- ConnectionString +- [connection-string-primary-key](https://learn.microsoft.com/en-us/azure/event-hubs/event-hubs-get-connection-string) + +--- + +- ContainerConnectionString +- [ConnectionString](https://learn.microsoft.com/en-us/azure/storage/common/storage-account-keys-manage?toc=%2Fazure%2Fstorage%2Fblobs%2Ftoc.json&bc=%2Fazure%2Fstorage%2Fblobs%2Fbreadcrumb%2Ftoc.json&tabs=azure-portal#view-account-access-keys) + + +--- + +- StorageServiceURL +- [Blob Service URL](https://learn.microsoft.com/en-us/azure/storage/common/storage-account-get-info?tabs=portal#get-service-endpoints-for-the-storage-account) + +--- + +- StorageContainerName +- [Container Name](https://learn.microsoft.com/en-us/azure/storage/blobs/blob-containers-portal#create-a-container) + +--- + +- EventhubName +- [Eventhub](https://learn.microsoft.com/en-us/azure/event-hubs/event-hubs-create#create-an-event-hub) + +{% /table %} + +#### Example + + ## Subscribing Adding a subscriber is similar to adding an HTTP handler, which makes it easier to develop scalable applications, as it decoupled from the Sender/Publisher. diff --git a/pkg/gofr/container/datasources.go b/pkg/gofr/container/datasources.go index ab74b548d..a0d66c045 100644 --- a/pkg/gofr/container/datasources.go +++ b/pkg/gofr/container/datasources.go @@ -8,6 +8,7 @@ import ( "github.com/redis/go-redis/v9" "gofr.dev/pkg/gofr/datasource" + "gofr.dev/pkg/gofr/datasource/pubsub" gofrSQL "gofr.dev/pkg/gofr/datasource/sql" ) @@ -271,6 +272,12 @@ type KVStoreProvider interface { provider } +type PubSubProvider interface { + pubsub.Client + + provider +} + type Solr interface { Search(ctx context.Context, collection string, params map[string]any) (any, error) Create(ctx context.Context, collection string, document *bytes.Buffer, params map[string]any) (any, error) diff --git a/pkg/gofr/datasource/README.md b/pkg/gofr/datasource/README.md index a2f9b7c9a..6bc172eb1 100644 --- a/pkg/gofr/datasource/README.md +++ b/pkg/gofr/datasource/README.md @@ -69,19 +69,20 @@ Therefore, GoFr utilizes a pluggable approach for new datasources by separating ## Supported Datasources -| Datasource | Health-Check | Logs | Metrics | Traces | As Driver | -|------------|-----------|------|-------|--------|-----------| -| MySQL | ✅ | ✅ | ✅ | ✅ | | -| REDIS | ✅ | ✅ | ✅ | ✅ | | -| PostgreSQL | ✅ | ✅ | ✅ | ✅ | | -| MongoDB | ✅ | ✅ | ✅ | | ✅ | -| SQLite | ✅ | ✅ | ✅ | ✅ | | -| BadgerDB | ✅ | ✅ | | | ✅ | -| Cassandra | ✅ | ✅ | ✅ | | ✅ | -| Clickhouse | | ✅ | ✅ | | ✅ | -| FTP | | ✅ | | | ✅ | -| SFTP | | ✅ | | | ✅ | -| Solr | | ✅ | ✅ | | ✅ | -| DGraph | ✅ | ✅ |✅ ||| +| Datasource | Health-Check | Logs | Metrics | Traces | As Driver | +|----------------|-----------|------|-------|--------|-----------| +| MySQL | ✅ | ✅ | ✅ | ✅ | | +| REDIS | ✅ | ✅ | ✅ | ✅ | | +| PostgreSQL | ✅ | ✅ | ✅ | ✅ | | +| MongoDB | ✅ | ✅ | ✅ | | ✅ | +| SQLite | ✅ | ✅ | ✅ | ✅ | | +| BadgerDB | ✅ | ✅ | | | ✅ | +| Cassandra | ✅ | ✅ | ✅ | | ✅ | +| Clickhouse | | ✅ | ✅ | | ✅ | +| FTP | | ✅ | | | ✅ | +| SFTP | | ✅ | | | ✅ | +| Solr | | ✅ | ✅ | | ✅ | +| DGraph | ✅ | ✅ |✅ | || +| Azure Eventhub | | ✅ |✅ | |✅| diff --git a/pkg/gofr/datasource/pubsub/eventhub/eventhub.go b/pkg/gofr/datasource/pubsub/eventhub/eventhub.go new file mode 100644 index 000000000..136d2da57 --- /dev/null +++ b/pkg/gofr/datasource/pubsub/eventhub/eventhub.go @@ -0,0 +1,365 @@ +package eventhub + +import ( + "context" + "errors" + "fmt" + "strings" + "time" + + "go.opentelemetry.io/otel/trace" + + "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs" + "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/checkpoints" + "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container" + + "gofr.dev/pkg/gofr/datasource" + "gofr.dev/pkg/gofr/datasource/pubsub" +) + +// code reference from https://learn.microsoft.com/en-us/azure/event-hubs/event-hubs-go-get-started-send +// metrics are being registered in the container, and we are using the same metrics so we have not re-registered the metrics here. +// It is different from other datasources. + +var errNoMsgReceived = errors.New("no message received") + +type Config struct { + ConnectionString string + ContainerConnectionString string + StorageServiceURL string + StorageContainerName string + EventhubName string + // if not provided it will read from the $Default consumergroup. + ConsumerGroup string + // following configs are for advance setup of the eventhub. + StorageOptions *container.ClientOptions + BlobStoreOptions *checkpoints.BlobStoreOptions + ConsumerOptions *azeventhubs.ConsumerClientOptions + ProducerOptions *azeventhubs.ProducerClientOptions +} + +type Client struct { + producer *azeventhubs.ProducerClient + consumer *azeventhubs.ConsumerClient + // we are using processor such that to keep consuming the events from all the different partitions. + processor *azeventhubs.Processor + // checkpoint is being called while committing the event received from the event. + checkPoint *checkpoints.BlobStore + // processorCtx is being stored such that to gracefully shutting down the application. + processorCtx context.CancelFunc + cfg Config + logger Logger + metrics Metrics + tracer trace.Tracer +} + +// New Creates the client for Eventhub +func New(cfg Config) *Client { + return &Client{ + cfg: cfg, + } +} + +func (c *Client) validConfigs(cfg Config) bool { + ok := true + + if cfg.EventhubName == "" { + ok = false + + c.logger.Error("eventhubName cannot be an empty") + } + + if cfg.ConnectionString == "" { + ok = false + + c.logger.Error("connectionString cannot be an empty") + } + + if cfg.StorageServiceURL == "" { + ok = false + + c.logger.Error("storageServiceURL cannot be an empty") + } + + if cfg.StorageContainerName == "" { + ok = false + + c.logger.Error("storageContainerName cannot be an empty") + } + + if cfg.ContainerConnectionString == "" { + ok = false + + c.logger.Error("containerConnectionString cannot be an empty") + } + + if cfg.ConsumerGroup == "" { + cfg.ConsumerGroup = azeventhubs.DefaultConsumerGroup + } + + return ok + +} + +// UseLogger sets the logger for the eventhub client. +func (c *Client) UseLogger(logger any) { + if l, ok := logger.(Logger); ok { + c.logger = l + } +} + +// UseMetrics sets the metrics for the eventhub client. +func (c *Client) UseMetrics(metrics any) { + if m, ok := metrics.(Metrics); ok { + c.metrics = m + } +} + +// UseTracer sets the tracer for the eventhub client. +func (c *Client) UseTracer(tracer any) { + if t, ok := tracer.(trace.Tracer); ok { + c.tracer = t + } +} + +// Connect establishes a connection to Cassandra and registers metrics using the provided configuration when the client was Created. +func (c *Client) Connect() { + if !c.validConfigs(c.cfg) { + return + } + + c.logger.Debug("azure eventhub connection started using connection string") + + producerClient, err := azeventhubs.NewProducerClientFromConnectionString(c.cfg.ConnectionString, + c.cfg.EventhubName, c.cfg.ProducerOptions) + if err != nil { + c.logger.Errorf("error occurred while creating producer client %v", err) + + return + } + + c.logger.Debug("azure eventhub producer client setup success") + + containerClient, err := container.NewClientFromConnectionString(c.cfg.ContainerConnectionString, c.cfg.StorageContainerName, + c.cfg.StorageOptions) + if err != nil { + c.logger.Errorf("error occurred while creating container client %v", err) + + return + } + + c.logger.Debug("azure eventhub container client setup success") + + // create a checkpoint store that will be used by the event hub + checkpointStore, err := checkpoints.NewBlobStore(containerClient, c.cfg.BlobStoreOptions) + if err != nil { + c.logger.Errorf("error occurred while creating blobstore %v", err) + + return + } + + c.logger.Debug("azure eventhub blobstore client setup success") + + // create a consumer client using a connection string to the namespace and the event hub + consumerClient, err := azeventhubs.NewConsumerClientFromConnectionString(c.cfg.ConnectionString, c.cfg.EventhubName, + c.cfg.ConsumerGroup, c.cfg.ConsumerOptions) + if err != nil { + c.logger.Errorf("error occurred while creating consumer client %v", err) + + return + } + + c.logger.Debug("azure eventhub consumer client setup success") + + // create a processor to receive and process events + processor, err := azeventhubs.NewProcessor(consumerClient, checkpointStore, nil) + if err != nil { + c.logger.Errorf("error occurred while creating processor %v", err) + + return + } + + c.logger.Debug("azure eventhub processor setup success") + + processorCtx, processorCancel := context.WithCancel(context.TODO()) + c.processorCtx = processorCancel + + // it is being run in a go-routine as it is a never ending process and has to be kept running to subscribe to events. + go func() { + if err = processor.Run(processorCtx); err != nil { + c.logger.Errorf("error occurred while running processor %v", err) + + return + } + + c.logger.Debug("azure eventhub processor running successfully") + }() + + c.processor = processor + c.producer = producerClient + c.consumer = consumerClient + c.checkPoint = checkpointStore +} + +// Subscribe checks all partitions for the first available event and returns it. +func (c *Client) Subscribe(ctx context.Context, topic string) (*pubsub.Message, error) { + var ( + msg *pubsub.Message + err error + ) + + // for each partition in the event hub, create a partition client with processEvents as the function to process events + for { + partitionClient := c.processor.NextPartitionClient(ctx) + + if partitionClient == nil { + break + } + + c.metrics.IncrementCounter(ctx, "app_pubsub_subscribe_total_count", "topic", topic, "subscription_name", partitionClient.PartitionID()) + + start := time.Now() + + msg, err = c.processEvents(ctx, partitionClient) + switch err { + case errNoMsgReceived: + // if no message is received, we don't achieve anything by returning error rather check in a different partition. + // this logic may change if we remove the timeout while receiving message, but waiting on just one partition + //might lead to miss data, so spawning one go-routine or having a worker pool can be an option to do this operation faster. + break + default: + return nil, err + } + + end := time.Since(start) + + c.logger.Debug(&Log{ + Mode: "SUB", + MessageValue: strings.Join(strings.Fields(string(msg.Value)), " "), + Topic: topic, + Host: fmt.Sprint(c.cfg.EventhubName + ":" + c.cfg.ConsumerGroup + ":" + partitionClient.PartitionID()), + PubSubBackend: "EVHUB", + Time: end.Microseconds(), + }) + + c.metrics.IncrementCounter(ctx, "app_pubsub_subscribe_success_count", "topic", topic, "subscription_name", partitionClient.PartitionID()) + + return msg, nil + } + + return nil, nil +} + +func (c *Client) processEvents(ctx context.Context, partitionClient *azeventhubs.ProcessorPartitionClient) (*pubsub.Message, error) { + defer closePartitionResources(ctx, partitionClient) + + receiveCtx, receiveCtxCancel := context.WithTimeout(ctx, time.Second) + events, err := partitionClient.ReceiveEvents(receiveCtx, 1, nil) + receiveCtxCancel() + + if err != nil && !errors.Is(err, context.DeadlineExceeded) { + return nil, err + } + + if len(events) == 0 { + return nil, errNoMsgReceived + } + + msg := pubsub.NewMessage(ctx) + + msg.Value = events[0].Body + msg.Committer = &Message{ + event: events[0], + processor: partitionClient, + } + + msg.Topic = partitionClient.PartitionID() + msg.MetaData = events[0].EventData + + return msg, nil +} + +func closePartitionResources(ctx context.Context, partitionClient *azeventhubs.ProcessorPartitionClient) { + defer partitionClient.Close(ctx) +} + +func (c *Client) Publish(ctx context.Context, topic string, message []byte) error { + if topic != c.cfg.EventhubName { + return errors.New("topic should be same as eventhub name") + } + + c.metrics.IncrementCounter(ctx, "app_pubsub_publish_total_count", "topic", topic) + + newBatchOptions := &azeventhubs.EventDataBatchOptions{} + + batch, err := c.producer.NewEventDataBatch(ctx, newBatchOptions) + if err != nil { + c.logger.Errorf("failed to create event batch %v", err) + + return err + } + + data := []*azeventhubs.EventData{{ + Body: message, + }} + + for i := 0; i < len(data); i++ { + err = batch.AddEventData(data[i], nil) + } + + start := time.Now() + + // send the batch of events to the event hub + if err = c.producer.SendEventDataBatch(ctx, batch, nil); err != nil { + return err + } + + end := time.Since(start) + + c.logger.Debug(&Log{ + Mode: "PUB", + MessageValue: strings.Join(strings.Fields(string(message)), " "), + Topic: topic, + Host: fmt.Sprint(c.cfg.EventhubName), + PubSubBackend: "EVHUB", + Time: end.Microseconds(), + }) + + c.metrics.IncrementCounter(ctx, "app_pubsub_publish_success_count", "topic", topic) + + return nil +} + +func (c *Client) Health() datasource.Health { + c.logger.Error("health-check not implemented for eventhub") + + return datasource.Health{} +} + +func (c *Client) CreateTopic(context.Context, string) error { + c.logger.Error("topic creation is not supported in eventhub") + + return nil +} + +func (c *Client) DeleteTopic(context.Context, string) error { + c.logger.Error("topic deletion is not supported in eventhub") + + return nil +} + +func (c *Client) Close() error { + err := c.producer.Close(context.Background()) + if err != nil { + c.logger.Errorf("failed to close eventhub producer %v", err) + } + + err = c.consumer.Close(context.Background()) + if err != nil { + c.logger.Errorf("failed to close eventhub consumer %v", err) + } + + c.processorCtx() + + return err +} diff --git a/pkg/gofr/datasource/pubsub/eventhub/eventhub_test.go b/pkg/gofr/datasource/pubsub/eventhub/eventhub_test.go new file mode 100644 index 000000000..3ca68e618 --- /dev/null +++ b/pkg/gofr/datasource/pubsub/eventhub/eventhub_test.go @@ -0,0 +1,284 @@ +package eventhub + +import ( + "context" + "net" + "testing" + + "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs" + "github.com/stretchr/testify/require" + + "nhooyr.io/websocket" + + "go.uber.org/mock/gomock" + + "gofr.dev/pkg/gofr/testutil" +) + +func TestConnect(t *testing.T) { + ctrl := gomock.NewController(t) + + client := New(getTestConfigs()) + + mockLogger := NewMockLogger(ctrl) + + mockLogger.EXPECT().Debug("azure eventhub connection started using connection string") + mockLogger.EXPECT().Debug("azure eventhub producer client setup success") + mockLogger.EXPECT().Debug("azure eventhub container client setup success") + mockLogger.EXPECT().Debug("azure eventhub blobstore client setup success") + mockLogger.EXPECT().Debug("azure eventhub consumer client setup success") + mockLogger.EXPECT().Debug("azure eventhub processor setup success") + mockLogger.EXPECT().Debug("azure eventhub processor running successfully").AnyTimes() + + client.UseLogger(mockLogger) + client.UseMetrics(NewMockMetrics(ctrl)) + + client.Connect() + + require.True(t, mockLogger.ctrl.Satisfied(), "Eventhub Connection Failed") +} + +func TestConfigValidation(t *testing.T) { + ctrl := gomock.NewController(t) + + mockLogger := NewMockLogger(ctrl) + + client := New(Config{}) + + client.UseLogger(mockLogger) + + mockLogger.EXPECT().Error("eventhubName cannot be an empty") + mockLogger.EXPECT().Error("connectionString cannot be an empty") + mockLogger.EXPECT().Error("storageServiceURL cannot be an empty") + mockLogger.EXPECT().Error("storageContainerName cannot be an empty") + mockLogger.EXPECT().Error("containerConnectionString cannot be an empty") + + client.Connect() + + require.True(t, mockLogger.ctrl.Satisfied(), "Config Validation Failed") +} + +func TestConnect_ProducerError(t *testing.T) { + ctrl := gomock.NewController(t) + + logs := testutil.StdoutOutputForFunc(func() { + cfg := getTestConfigs() + cfg.ConnectionString = cfg.ConnectionString + ";EntityPath=" + + client := New(cfg) + + mockLogger := NewMockLogger(ctrl) + + client.UseLogger(mockLogger) + client.UseMetrics(NewMockMetrics(ctrl)) + + mockLogger.EXPECT().Debug(gomock.Any()).AnyTimes() + + mockLogger.EXPECT().Errorf("error occurred while creating producer client %v", gomock.Any()) + + client.Connect() + }) + + require.NotContains(t, logs, "Error") +} + +func TestConnect_ContainerError(t *testing.T) { + ctrl := gomock.NewController(t) + + logs := testutil.StdoutOutputForFunc(func() { + cfg := getTestConfigs() + cfg.ContainerConnectionString = cfg.ContainerConnectionString + "" + + client := New(cfg) + + mockLogger := NewMockLogger(ctrl) + + client.UseLogger(mockLogger) + client.UseMetrics(NewMockMetrics(ctrl)) + + mockLogger.EXPECT().Debug(gomock.Any()).AnyTimes() + + mockLogger.EXPECT().Errorf("error occurred while creating container client %v", gomock.Any()) + + client.Connect() + }) + + require.NotContains(t, logs, "Error") +} + +func TestPublish_FailedBatchCreation(t *testing.T) { + ctrl := gomock.NewController(t) + + client := New(getTestConfigs()) + + mockLogger := NewMockLogger(ctrl) + mockMetrics := NewMockMetrics(ctrl) + + mockLogger.EXPECT().Debug("azure eventhub connection started using connection string") + mockLogger.EXPECT().Debug("azure eventhub producer client setup success") + mockLogger.EXPECT().Debug("azure eventhub container client setup success") + mockLogger.EXPECT().Debug("azure eventhub blobstore client setup success") + mockLogger.EXPECT().Debug("azure eventhub consumer client setup success") + mockLogger.EXPECT().Debug("azure eventhub processor setup success") + mockLogger.EXPECT().Debug("azure eventhub processor running successfully").AnyTimes() + + mockMetrics.EXPECT().IncrementCounter(context.Background(), "app_pubsub_publish_total_count", "topic", client.cfg.EventhubName) + + mockLogger.EXPECT().Errorf(gomock.Any(), gomock.Any()) + + client.UseLogger(mockLogger) + client.UseMetrics(mockMetrics) + + client.Connect() + + err := client.Publish(context.Background(), client.cfg.EventhubName, []byte("my-message")) + + require.NotNil(t, err) + + require.True(t, mockLogger.ctrl.Satisfied(), "Eventhub Publish Failed Batch Creation") +} + +func TestPublish_FailedInvalidTopic(t *testing.T) { + ctrl := gomock.NewController(t) + + client := New(getTestConfigs()) + + mockLogger := NewMockLogger(ctrl) + mockMetrics := NewMockMetrics(ctrl) + + mockLogger.EXPECT().Debug("azure eventhub connection started using connection string") + mockLogger.EXPECT().Debug("azure eventhub producer client setup success") + mockLogger.EXPECT().Debug("azure eventhub container client setup success") + mockLogger.EXPECT().Debug("azure eventhub blobstore client setup success") + mockLogger.EXPECT().Debug("azure eventhub consumer client setup success") + mockLogger.EXPECT().Debug("azure eventhub processor setup success") + mockLogger.EXPECT().Debug("azure eventhub processor running successfully").AnyTimes() + + client.UseLogger(mockLogger) + client.UseMetrics(mockMetrics) + + client.Connect() + + err := client.Publish(context.Background(), "random topic", []byte("my-message")) + + require.Equal(t, "topic should be same as eventhub name", err.Error(), "Eventhub Publish Failed Invalid Topic") + + require.True(t, mockLogger.ctrl.Satisfied(), "Eventhub Publish Failed Invalid Topic") +} + +func Test_CreateTopic(t *testing.T) { + ctrl := gomock.NewController(t) + + client := New(getTestConfigs()) + + mockLogger := NewMockLogger(ctrl) + mockMetrics := NewMockMetrics(ctrl) + + mockLogger.EXPECT().Debug("azure eventhub connection started using connection string") + mockLogger.EXPECT().Debug("azure eventhub producer client setup success") + mockLogger.EXPECT().Debug("azure eventhub container client setup success") + mockLogger.EXPECT().Debug("azure eventhub blobstore client setup success") + mockLogger.EXPECT().Debug("azure eventhub consumer client setup success") + mockLogger.EXPECT().Debug("azure eventhub processor setup success") + mockLogger.EXPECT().Debug("azure eventhub processor running successfully").AnyTimes() + mockLogger.EXPECT().Error("topic deletion is not supported in eventhub") + + client.UseLogger(mockLogger) + client.UseMetrics(mockMetrics) + + client.Connect() + + err := client.DeleteTopic(context.Background(), "random-topic") + + require.Nil(t, err, "Eventhub Topic Creation not allowed failed") + + require.True(t, mockLogger.ctrl.Satisfied(), "Eventhub Topic Creation not allowed failed") +} + +func Test_DeleteTopic(t *testing.T) { + ctrl := gomock.NewController(t) + + client := New(getTestConfigs()) + + mockLogger := NewMockLogger(ctrl) + mockMetrics := NewMockMetrics(ctrl) + + mockLogger.EXPECT().Debug("azure eventhub connection started using connection string") + mockLogger.EXPECT().Debug("azure eventhub producer client setup success") + mockLogger.EXPECT().Debug("azure eventhub container client setup success") + mockLogger.EXPECT().Debug("azure eventhub blobstore client setup success") + mockLogger.EXPECT().Debug("azure eventhub consumer client setup success") + mockLogger.EXPECT().Debug("azure eventhub processor setup success") + mockLogger.EXPECT().Debug("azure eventhub processor running successfully").AnyTimes() + mockLogger.EXPECT().Error("topic creation is not supported in eventhub") + + client.UseLogger(mockLogger) + client.UseMetrics(mockMetrics) + + client.Connect() + + err := client.CreateTopic(context.Background(), "random-topic") + + require.Nil(t, err, "Eventhub Topic Deletion not allowed failed") + + require.True(t, mockLogger.ctrl.Satisfied(), "Eventhub Topic Deletion not allowed failed") +} + +func Test_HealthCheck(t *testing.T) { + ctrl := gomock.NewController(t) + + client := New(getTestConfigs()) + + mockLogger := NewMockLogger(ctrl) + mockMetrics := NewMockMetrics(ctrl) + + mockLogger.EXPECT().Debug("azure eventhub connection started using connection string") + mockLogger.EXPECT().Debug("azure eventhub producer client setup success") + mockLogger.EXPECT().Debug("azure eventhub container client setup success") + mockLogger.EXPECT().Debug("azure eventhub blobstore client setup success") + mockLogger.EXPECT().Debug("azure eventhub consumer client setup success") + mockLogger.EXPECT().Debug("azure eventhub processor setup success") + mockLogger.EXPECT().Debug("azure eventhub processor running successfully").AnyTimes() + mockLogger.EXPECT().Error("health-check not implemented for eventhub") + + client.UseLogger(mockLogger) + client.UseMetrics(mockMetrics) + + client.Connect() + + _ = client.Health() + + require.True(t, mockLogger.ctrl.Satisfied(), "Eventhub Topic Deletion not allowed failed") +} + +func getTestConfigs() Config { + newWebSocketConnFn := func(ctx context.Context, args azeventhubs.WebSocketConnParams) (net.Conn, error) { + opts := &websocket.DialOptions{ + Subprotocols: []string{"amqp"}, + } + wssConn, _, err := websocket.Dial(ctx, args.Host, opts) + + if err != nil { + return nil, err + } + + return websocket.NetConn(ctx, wssConn, websocket.MessageBinary), nil + } + + // For more details on the configuration refer https://github.com/Azure/azure-sdk-for-go/blob/main/sdk/messaging/azeventhubs/consumer_client_test.go + return Config{ + ConnectionString: "Endpoint=sb://.servicebus.windows.net/;SharedAccessKeyName=;SharedAccessKey=", + ContainerConnectionString: "DefaultEndpointsProtocol=https;AccountName=;AccountKey=" + + "SGVsbG8gV29ybGQ=", + StorageServiceURL: "core.windows.net", + StorageContainerName: "", + EventhubName: "event-hub-name", + ConsumerOptions: &azeventhubs.ConsumerClientOptions{ + RetryOptions: azeventhubs.RetryOptions{}, + }, + ProducerOptions: &azeventhubs.ProducerClientOptions{ + NewWebSocketConn: newWebSocketConnFn, + }, + } +} diff --git a/pkg/gofr/datasource/pubsub/eventhub/go.mod b/pkg/gofr/datasource/pubsub/eventhub/go.mod new file mode 100644 index 000000000..a13d76db3 --- /dev/null +++ b/pkg/gofr/datasource/pubsub/eventhub/go.mod @@ -0,0 +1,29 @@ +module gofr.dev/pkg/gofr/datasource/pubsub/azeventhub + +go 1.22.3 + +replace gofr.dev => ../../../../../../gofr + +require ( + github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs v1.2.2 + github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.4.1 + github.com/stretchr/testify v1.9.0 + go.opentelemetry.io/otel/trace v1.30.0 + go.uber.org/mock v0.4.0 + gofr.dev v1.21.0 + nhooyr.io/websocket v1.8.11 +) + +require ( + github.com/Azure/azure-sdk-for-go/sdk/azcore v1.14.0 // indirect + github.com/Azure/azure-sdk-for-go/sdk/internal v1.10.0 // indirect + github.com/Azure/go-amqp v1.0.5 // indirect + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/joho/godotenv v1.5.1 // indirect + github.com/pkg/errors v0.9.1 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + go.opentelemetry.io/otel v1.30.0 // indirect + golang.org/x/net v0.29.0 // indirect + golang.org/x/text v0.18.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) diff --git a/pkg/gofr/datasource/pubsub/eventhub/go.sum b/pkg/gofr/datasource/pubsub/eventhub/go.sum new file mode 100644 index 000000000..fe0586c39 --- /dev/null +++ b/pkg/gofr/datasource/pubsub/eventhub/go.sum @@ -0,0 +1,71 @@ +github.com/Azure/azure-sdk-for-go/sdk/azcore v1.14.0 h1:nyQWyZvwGTvunIMxi1Y9uXkcyr+I7TeNrr/foo4Kpk8= +github.com/Azure/azure-sdk-for-go/sdk/azcore v1.14.0/go.mod h1:l38EPgmsp71HHLq9j7De57JcKOWPyhrsW1Awm1JS6K0= +github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.7.0 h1:tfLQ34V6F7tVSwoTf/4lH5sE0o6eCJuNDTmH09nDpbc= +github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.7.0/go.mod h1:9kIvujWAA58nmPmWB1m23fyWic1kYZMxD9CxaWn4Qpg= +github.com/Azure/azure-sdk-for-go/sdk/internal v1.10.0 h1:ywEEhmNahHBihViHepv3xPBn1663uRv2t2q/ESv9seY= +github.com/Azure/azure-sdk-for-go/sdk/internal v1.10.0/go.mod h1:iZDifYGJTIgIIkYRNWPENUnqx6bJ2xnSDFI2tjwZNuY= +github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs v1.2.2 h1:B+TQ/DzOEn9CsiiosdD/IAyZ5gZiyC+0T19iwxCCnaY= +github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs v1.2.2/go.mod h1:qf3s/6aV9ePKYGeEYPsbndK6GGfeS7SrbA6OE/T7NIA= +github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/eventhub/armeventhub v1.2.0 h1:+dggnR89/BIIlRlQ6d19dkhhdd/mQUiQbXhyHUFiB4w= +github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/eventhub/armeventhub v1.2.0/go.mod h1:tI9M2Q/ueFi287QRkdrhb9LHm6ZnXgkVYLRC3FhYkPw= +github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/storage/armstorage v1.6.0 h1:PiSrjRPpkQNjrM8H0WwKMnZUdu1RGMtd/LdGKUrOo+c= +github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/storage/armstorage v1.6.0/go.mod h1:oDrbWx4ewMylP7xHivfgixbfGBT6APAwsSoHRKotnIc= +github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.4.1 h1:cf+OIKbkmMHBaC3u78AXomweqM0oxQSgBXRZf3WH4yM= +github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.4.1/go.mod h1:ap1dmS6vQKJxSMNiGJcq4QuUQkOynyD93gLw6MDF7ek= +github.com/Azure/go-amqp v1.0.5 h1:po5+ljlcNSU8xtapHTe8gIc8yHxCzC03E8afH2g1ftU= +github.com/Azure/go-amqp v1.0.5/go.mod h1:vZAogwdrkbyK3Mla8m/CxSc/aKdnTZ4IbPxl51Y5WZE= +github.com/AzureAD/microsoft-authentication-library-for-go v1.2.2 h1:XHOnouVk1mxXfQidrMEnLlPk9UMeRtyBTnEFtxkV0kU= +github.com/AzureAD/microsoft-authentication-library-for-go v1.2.2/go.mod h1:wP83P5OoQ5p6ip3ScPr0BAq0BvuPAvacpEuSzyouqAI= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw= +github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= +github.com/golang-jwt/jwt/v5 v5.2.1 h1:OuVbFODueb089Lh128TAcimifWaLhJwVflnrgM17wHk= +github.com/golang-jwt/jwt/v5 v5.2.1/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk= +github.com/golang/mock v1.6.0 h1:ErTB+efbowRARo13NNdxyJji2egdxLGQhRaY+DUumQc= +github.com/golang/mock v1.6.0/go.mod h1:p6yTPP+5HYm5mzsMV8JkE6ZKdX+/wYM6Hr+LicevLPs= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0= +github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc= +github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= +github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c h1:+mdjkGKdHQG3305AYmdv1U2eRNDiU2ErMBj1gwrq8eQ= +github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c/go.mod h1:7rwL4CYBLnjLxUqIJNnCWiEdr3bn6IUYi15bNlnbCCU= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8= +github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +go.opentelemetry.io/otel v1.30.0 h1:F2t8sK4qf1fAmY9ua4ohFS/K+FUuOPemHUIXHtktrts= +go.opentelemetry.io/otel v1.30.0/go.mod h1:tFw4Br9b7fOS+uEao81PJjVMjW/5fvNCbpsDIXqP0pc= +go.opentelemetry.io/otel/trace v1.30.0 h1:7UBkkYzeg3C7kQX8VAidWh2biiQbtAKjyIML8dQ9wmc= +go.opentelemetry.io/otel/trace v1.30.0/go.mod h1:5EyKqTzzmyqB9bwtCCq6pDLktPK6fmGf/Dph+8VI02o= +go.uber.org/mock v0.4.0 h1:VcM4ZOtdbR4f6VXfiOpwpVJDL6lCReaZ6mw31wqh7KU= +go.uber.org/mock v0.4.0/go.mod h1:a6FSlNadKUHUa9IP5Vyt1zh4fC7uAwxMutEAscFbkZc= +golang.org/x/crypto v0.27.0 h1:GXm2NjJrPaiv/h1tb2UH8QfgC/hOf/+z0p6PT8o1w7A= +golang.org/x/crypto v0.27.0/go.mod h1:1Xngt8kV6Dvbssa53Ziq6Eqn0HqbZi5Z6R0ZpwQzt70= +golang.org/x/net v0.29.0 h1:5ORfpBpCs4HzDYoodCDBbwHzdR5UrLBZ3sOnUJmFoHo= +golang.org/x/net v0.29.0/go.mod h1:gLkgy8jTGERgjzMic6DS9+SP0ajcu6Xu3Orq/SpETg0= +golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34= +golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/term v0.24.0 h1:Mh5cbb+Zk2hqqXNO7S1iTjEphVL+jb8ZWaqh/g+JWkM= +golang.org/x/term v0.24.0/go.mod h1:lOBK/LVxemqiMij05LGJ0tzNr8xlmwBRJ81PX6wVLH8= +golang.org/x/text v0.18.0 h1:XvMDiNzPAl0jr17s6W9lcaIhGUfUORdGCNsuLmPG224= +golang.org/x/text v0.18.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +nhooyr.io/websocket v1.8.11 h1:f/qXNc2/3DpoSZkHt1DQu6rj4zGC8JmkkLkWss0MgN0= +nhooyr.io/websocket v1.8.11/go.mod h1:rN9OFWIUwuxg4fR5tELlYC04bXYowCP9GX47ivo2l+c= diff --git a/pkg/gofr/datasource/pubsub/eventhub/logger.go b/pkg/gofr/datasource/pubsub/eventhub/logger.go new file mode 100644 index 000000000..0f7c60ca1 --- /dev/null +++ b/pkg/gofr/datasource/pubsub/eventhub/logger.go @@ -0,0 +1,31 @@ +package eventhub + +import ( + "fmt" + "io" +) + +// Logger interface with required methods +type Logger interface { + Debug(args ...interface{}) + Debugf(pattern string, args ...interface{}) + Log(args ...interface{}) + Logf(pattern string, args ...interface{}) + Error(args ...interface{}) + Fatal(args ...interface{}) + Errorf(pattern string, args ...interface{}) +} + +type Log struct { + Mode string `json:"mode"` + MessageValue string `json:"messageValue"` + Topic string `json:"topic"` + Host string `json:"host"` + PubSubBackend string `json:"pubSubBackend"` + Time int64 `json:"time"` +} + +func (l *Log) PrettyPrint(writer io.Writer) { + fmt.Fprintf(writer, "\u001B[38;5;8m%-32s \u001B[38;5;24m%-6s\u001B[0m %8d\u001B[38;5;8mµs\u001B[0m %-4s%s \u001b[38;5;101m\n", + l.Topic, l.PubSubBackend, l.Time, l.Mode, l.MessageValue) +} diff --git a/pkg/gofr/datasource/pubsub/eventhub/logger_test.go b/pkg/gofr/datasource/pubsub/eventhub/logger_test.go new file mode 100644 index 000000000..e105a6b5a --- /dev/null +++ b/pkg/gofr/datasource/pubsub/eventhub/logger_test.go @@ -0,0 +1,39 @@ +package eventhub + +import ( + "bytes" + "testing" + + "go.uber.org/mock/gomock" + + "github.com/stretchr/testify/require" +) + +func Test_PrettyPrint(t *testing.T) { + queryLog := Log{ + Mode: "PUB", + MessageValue: `{"myorder":"1"}`, + Topic: "test-topic", + Host: "localhost", + PubSubBackend: "AZHUB", + Time: 10, + } + + logger := NewMockLogger(gomock.NewController(t)) + + logger.EXPECT().Log(gomock.Any()) + + logger.Log(queryLog) + + b := make([]byte, 100) + + writer := bytes.NewBuffer(b) + + queryLog.PrettyPrint(writer) + + require.Contains(t, writer.String(), "test-topic") + require.Contains(t, writer.String(), "AZHUB") + require.Contains(t, writer.String(), `{"myorder":"1"}`) + + require.True(t, logger.ctrl.Satisfied(), "Test_PrettyPrint Failed!") +} diff --git a/pkg/gofr/datasource/pubsub/eventhub/message.go b/pkg/gofr/datasource/pubsub/eventhub/message.go new file mode 100644 index 000000000..79d9fcfcb --- /dev/null +++ b/pkg/gofr/datasource/pubsub/eventhub/message.go @@ -0,0 +1,21 @@ +package eventhub + +import ( + "context" + + "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs" +) + +type Message struct { + event *azeventhubs.ReceivedEventData + processor *azeventhubs.ProcessorPartitionClient + logger Logger +} + +func (a *Message) Commit() { + // Update the checkpoint with the latest event received + err := a.processor.UpdateCheckpoint(context.Background(), a.event, nil) + if err != nil { + a.logger.Errorf("failed to acknowledge event with eventID %v", a.event.MessageID) + } +} diff --git a/pkg/gofr/datasource/pubsub/eventhub/metrics.go b/pkg/gofr/datasource/pubsub/eventhub/metrics.go new file mode 100644 index 000000000..7fd3ca744 --- /dev/null +++ b/pkg/gofr/datasource/pubsub/eventhub/metrics.go @@ -0,0 +1,9 @@ +package eventhub + +import ( + "context" +) + +type Metrics interface { + IncrementCounter(ctx context.Context, name string, labels ...string) +} diff --git a/pkg/gofr/datasource/pubsub/eventhub/mock_logger.go b/pkg/gofr/datasource/pubsub/eventhub/mock_logger.go new file mode 100644 index 000000000..13f050b39 --- /dev/null +++ b/pkg/gofr/datasource/pubsub/eventhub/mock_logger.go @@ -0,0 +1,154 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: logger.go +// +// Generated by this command: +// +// mockgen -source=logger.go -destination=mock_logger.go -package=azeventhub +// + +// Package azeventhub is a generated GoMock package. +package eventhub + +import ( + reflect "reflect" + + gomock "go.uber.org/mock/gomock" +) + +// MockLogger is a mock of Logger interface. +type MockLogger struct { + ctrl *gomock.Controller + recorder *MockLoggerMockRecorder +} + +// MockLoggerMockRecorder is the mock recorder for MockLogger. +type MockLoggerMockRecorder struct { + mock *MockLogger +} + +// NewMockLogger creates a new mock instance. +func NewMockLogger(ctrl *gomock.Controller) *MockLogger { + mock := &MockLogger{ctrl: ctrl} + mock.recorder = &MockLoggerMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockLogger) EXPECT() *MockLoggerMockRecorder { + return m.recorder +} + +// Debug mocks base method. +func (m *MockLogger) Debug(args ...any) { + m.ctrl.T.Helper() + varargs := []any{} + for _, a := range args { + varargs = append(varargs, a) + } + m.ctrl.Call(m, "Debug", varargs...) +} + +// Debug indicates an expected call of Debug. +func (mr *MockLoggerMockRecorder) Debug(args ...any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Debug", reflect.TypeOf((*MockLogger)(nil).Debug), args...) +} + +// Debugf mocks base method. +func (m *MockLogger) Debugf(pattern string, args ...any) { + m.ctrl.T.Helper() + varargs := []any{pattern} + for _, a := range args { + varargs = append(varargs, a) + } + m.ctrl.Call(m, "Debugf", varargs...) +} + +// Debugf indicates an expected call of Debugf. +func (mr *MockLoggerMockRecorder) Debugf(pattern any, args ...any) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]any{pattern}, args...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Debugf", reflect.TypeOf((*MockLogger)(nil).Debugf), varargs...) +} + +// Error mocks base method. +func (m *MockLogger) Error(args ...any) { + m.ctrl.T.Helper() + varargs := []any{} + for _, a := range args { + varargs = append(varargs, a) + } + m.ctrl.Call(m, "Error", varargs...) +} + +// Error indicates an expected call of Error. +func (mr *MockLoggerMockRecorder) Error(args ...any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Error", reflect.TypeOf((*MockLogger)(nil).Error), args...) +} + +// Errorf mocks base method. +func (m *MockLogger) Errorf(pattern string, args ...any) { + m.ctrl.T.Helper() + varargs := []any{pattern} + for _, a := range args { + varargs = append(varargs, a) + } + m.ctrl.Call(m, "Errorf", varargs...) +} + +// Errorf indicates an expected call of Errorf. +func (mr *MockLoggerMockRecorder) Errorf(pattern any, args ...any) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]any{pattern}, args...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Errorf", reflect.TypeOf((*MockLogger)(nil).Errorf), varargs...) +} + +// Fatal mocks base method. +func (m *MockLogger) Fatal(args ...any) { + m.ctrl.T.Helper() + varargs := []any{} + for _, a := range args { + varargs = append(varargs, a) + } + m.ctrl.Call(m, "Fatal", varargs...) +} + +// Fatal indicates an expected call of Fatal. +func (mr *MockLoggerMockRecorder) Fatal(args ...any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Fatal", reflect.TypeOf((*MockLogger)(nil).Fatal), args...) +} + +// Log mocks base method. +func (m *MockLogger) Log(args ...any) { + m.ctrl.T.Helper() + varargs := []any{} + for _, a := range args { + varargs = append(varargs, a) + } + m.ctrl.Call(m, "Log", varargs...) +} + +// Log indicates an expected call of Log. +func (mr *MockLoggerMockRecorder) Log(args ...any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Log", reflect.TypeOf((*MockLogger)(nil).Log), args...) +} + +// Logf mocks base method. +func (m *MockLogger) Logf(pattern string, args ...any) { + m.ctrl.T.Helper() + varargs := []any{pattern} + for _, a := range args { + varargs = append(varargs, a) + } + m.ctrl.Call(m, "Logf", varargs...) +} + +// Logf indicates an expected call of Logf. +func (mr *MockLoggerMockRecorder) Logf(pattern any, args ...any) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]any{pattern}, args...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Logf", reflect.TypeOf((*MockLogger)(nil).Logf), varargs...) +} diff --git a/pkg/gofr/datasource/pubsub/eventhub/mock_metrics.go b/pkg/gofr/datasource/pubsub/eventhub/mock_metrics.go new file mode 100644 index 000000000..e183ccb33 --- /dev/null +++ b/pkg/gofr/datasource/pubsub/eventhub/mock_metrics.go @@ -0,0 +1,57 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: metrics.go +// +// Generated by this command: +// +// mockgen -source=metrics.go -destination=mock_metrics.go -package=eventhub +// + +// Package eventhub is a generated GoMock package. +package eventhub + +import ( + context "context" + reflect "reflect" + + gomock "go.uber.org/mock/gomock" +) + +// MockMetrics is a mock of Metrics interface. +type MockMetrics struct { + ctrl *gomock.Controller + recorder *MockMetricsMockRecorder +} + +// MockMetricsMockRecorder is the mock recorder for MockMetrics. +type MockMetricsMockRecorder struct { + mock *MockMetrics +} + +// NewMockMetrics creates a new mock instance. +func NewMockMetrics(ctrl *gomock.Controller) *MockMetrics { + mock := &MockMetrics{ctrl: ctrl} + mock.recorder = &MockMetricsMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockMetrics) EXPECT() *MockMetricsMockRecorder { + return m.recorder +} + +// IncrementCounter mocks base method. +func (m *MockMetrics) IncrementCounter(ctx context.Context, name string, labels ...string) { + m.ctrl.T.Helper() + varargs := []any{ctx, name} + for _, a := range labels { + varargs = append(varargs, a) + } + m.ctrl.Call(m, "IncrementCounter", varargs...) +} + +// IncrementCounter indicates an expected call of IncrementCounter. +func (mr *MockMetricsMockRecorder) IncrementCounter(ctx, name any, labels ...any) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]any{ctx, name}, labels...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IncrementCounter", reflect.TypeOf((*MockMetrics)(nil).IncrementCounter), varargs...) +} diff --git a/pkg/gofr/external_db.go b/pkg/gofr/external_db.go index ba20f4bbf..5ef8fb4e0 100644 --- a/pkg/gofr/external_db.go +++ b/pkg/gofr/external_db.go @@ -27,6 +27,16 @@ func (a *App) AddFTP(fs file.FileSystemProvider) { a.container.File = fs } +// AddPubSub sets the PubSub client in the app's container. +func (a *App) AddPubSub(pubsub container.PubSubProvider) { + pubsub.UseLogger(a.Logger()) + pubsub.UseMetrics(a.Metrics()) + + pubsub.Connect() + + a.container.PubSub = pubsub +} + // AddFile sets the FTP,SFTP,S3 datasource in the app's container. func (a *App) AddFileStore(fs file.FileSystemProvider) { fs.UseLogger(a.Logger())