diff --git a/docs/advanced-guide/using-publisher-subscriber/page.md b/docs/advanced-guide/using-publisher-subscriber/page.md index 4135cb874..814845067 100644 --- a/docs/advanced-guide/using-publisher-subscriber/page.md +++ b/docs/advanced-guide/using-publisher-subscriber/page.md @@ -9,8 +9,8 @@ scaled and maintained according to its own requirement. ## Design choice -In GoFr application if a user wants to use the Publisher-Subscriber design, it supports two message brokers—Apache Kafka -and Google PubSub. +In GoFr application if a user wants to use the Publisher-Subscriber design, it supports several message brokers, +including Apache Kafka, Google PubSub, MQTT, and NATS JetStream. The initialization of the PubSub is done in an IoC container which handles the PubSub client dependency. With this, the control lies with the framework and thus promotes modularity, testability, and re-usability. Users can do publish and subscribe to multiple topics in a single application, by providing the topic name. @@ -175,6 +175,81 @@ docker run -d \ eclipse-mosquitto:latest ``` > **Note**: find the default mosquitto config file {% new-tab-link title="here" href="https://github.com/eclipse/mosquitto/blob/master/mosquitto.conf" /%} + +### NATS JetStream + +NATS JetStream is supported as an external pubsub provider, meaning if you're not using it, it won't be added to your binary. + +#### Configs +```dotenv +PUBSUB_BACKEND=NATS +PUBSUB_BROKER=nats://localhost:4222 +NATS_STREAM=mystream +NATS_SUBJECTS=orders.*,shipments.* +NATS_MAX_WAIT=5s +NATS_BATCH_SIZE=100 +NATS_MAX_PULL_WAIT=500ms +NATS_CONSUMER=my-consumer +NATS_CREDS_FILE=/path/to/creds.json +``` + +#### Setup + +To set up NATS JetStream, follow these steps: + +1. Import the external driver for NATS JetStream: + +```bash +go get gofr.dev/pkg/gofr/datasources/pubsub/nats +``` + +2. Use the `AddPubSub` method to add the NATS JetStream driver to your application: + +```go +app := gofr.New() + +app.AddPubSub(nats.New(nats.Config{ + Server: "nats://localhost:4222", + Stream: nats.StreamConfig{ + Stream: "mystream", + Subjects: []string{"orders.*", "shipments.*"}, + }, + MaxWait: 5 * time.Second, + BatchSize: 100, + MaxPullWait: 500 * time.Millisecond, + Consumer: "my-consumer", + CredsFile: "/path/to/creds.json", +})) +``` + +#### Docker setup +```shell +docker run -d \ + --name nats \ + -p 4222:4222 \ + -p 8222:8222 \ + -v /nats.conf:/nats/config/nats.conf \ + nats:2.9.16 +``` + +#### Configuration Options + +| Name | Description | Required | Default | Example | +|------|-------------|----------|---------|---------| +| `PUBSUB_BACKEND` | Set to "NATS" to use NATS JetStream as the message broker | Yes | - | `NATS` | +| `PUBSUB_BROKER` | NATS server URL | Yes | - | `nats://localhost:4222` | +| `NATS_STREAM` | Name of the NATS stream | Yes | - | `mystream` | +| `NATS_SUBJECTS` | Comma-separated list of subjects to subscribe to | Yes | - | `orders.*,shipments.*` | +| `NATS_MAX_WAIT` | Maximum wait time for batch requests | No | - | `5s` | +| `NATS_BATCH_SIZE` | Maximum number of messages to pull in a single request | No | 0 | `100` | +| `NATS_MAX_PULL_WAIT` | Maximum wait time for individual pull requests | No | 0 | `500ms` | +| `NATS_CONSUMER` | Name of the NATS consumer | No | - | `my-consumer` | +| `NATS_CREDS_FILE` | Path to the credentials file for authentication | No | - | `/path/to/creds.json` | + +#### Usage + +When subscribing or publishing using NATS JetStream, make sure to use the appropriate subject name that matches your stream configuration. +For more information on setting up and using NATS JetStream, refer to the official NATS documentation. ### Azure Eventhub GoFr supports eventhub starting gofr version v1.22.0. diff --git a/docs/references/configs/page.md b/docs/references/configs/page.md index 4c43d85ae..e7d71889b 100644 --- a/docs/references/configs/page.md +++ b/docs/references/configs/page.md @@ -220,7 +220,7 @@ This document lists all the configuration options supported by the GoFr framewor - PUBSUB_BACKEND - Pub/Sub message broker backend -- kafka, google, mqtt +- kafka, google, mqtt, nats {% /table %} @@ -338,3 +338,25 @@ This document lists all the configuration options supported by the GoFr framewor - Sends regular messages to check the link is active. May not work as expected if handling func is blocking execution {% /table %} + +**NATS JetStream** + +{% table %} + +- Name +- Description +- Default Value + +--- + +- NATS_SERVER +- URL of the NATS server +- nats://localhost:4222 + +--- + +- NATS_CREDS_FILE +- File containing the NATS credentials +- creds.json + +{% /table %} diff --git a/go.mod b/go.mod index 05f266d07..d38ddd1fb 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module gofr.dev -go 1.22 +go 1.22.3 require ( cloud.google.com/go/pubsub v1.42.0 diff --git a/pkg/gofr/container/mock_container.go b/pkg/gofr/container/mock_container.go index 8875ab594..e53b68e58 100644 --- a/pkg/gofr/container/mock_container.go +++ b/pkg/gofr/container/mock_container.go @@ -7,7 +7,6 @@ import ( "testing" "go.uber.org/mock/gomock" - "gofr.dev/pkg/gofr/datasource" "gofr.dev/pkg/gofr/datasource/file" "gofr.dev/pkg/gofr/datasource/pubsub" diff --git a/pkg/gofr/datasource/file/ftp/go.mod b/pkg/gofr/datasource/file/ftp/go.mod index 69d4422eb..d262b2caa 100644 --- a/pkg/gofr/datasource/file/ftp/go.mod +++ b/pkg/gofr/datasource/file/ftp/go.mod @@ -1,6 +1,8 @@ module gofr.dev/pkg/gofr/datasource/file/ftp -go 1.22 +go 1.22.3 + +toolchain go1.23.1 replace gofr.dev => ../../../../../../gofr diff --git a/pkg/gofr/datasource/file/sftp/go.mod b/pkg/gofr/datasource/file/sftp/go.mod index 784769eaa..74ce66fba 100644 --- a/pkg/gofr/datasource/file/sftp/go.mod +++ b/pkg/gofr/datasource/file/sftp/go.mod @@ -1,6 +1,8 @@ module gofr.dev/pkg/gofr/datasource/file/sftp -go 1.22 +go 1.22.3 + +toolchain go1.23.1 replace gofr.dev => ../../../../../../gofr @@ -20,4 +22,4 @@ require ( github.com/pmezard/go-difflib v1.0.0 // indirect golang.org/x/sys v0.25.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect -) \ No newline at end of file +) diff --git a/pkg/gofr/datasource/pubsub/nats/client.go b/pkg/gofr/datasource/pubsub/nats/client.go new file mode 100644 index 000000000..24aa96b4a --- /dev/null +++ b/pkg/gofr/datasource/pubsub/nats/client.go @@ -0,0 +1,477 @@ +package nats + +import ( + "context" + "errors" + "fmt" + "strings" + "sync" + "time" + + "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" + "go.opentelemetry.io/otel/trace" + "gofr.dev/pkg/gofr/datasource/pubsub" +) + +//go:generate mockgen -destination=mock_jetstream.go -package=nats github.com/nats-io/nats.go/jetstream JetStream,Stream,Consumer,Msg,MessageBatch + +const consumeMessageDelay = 100 * time.Millisecond + +// Config defines the Client Client configuration. +type Config struct { + Server string + CredsFile string + Stream StreamConfig + Consumer string + MaxWait time.Duration + MaxPullWait int + BatchSize int +} + +// StreamConfig holds stream settings for Client JetStream. +type StreamConfig struct { + Stream string + Subjects []string + MaxDeliver int + MaxWait time.Duration +} + +// subscription holds subscription information for Client JetStream. +type subscription struct { + cancel context.CancelFunc +} + +type messageHandler func(context.Context, jetstream.Msg) error + +// Client represents a Client for Client JetStream operations. +type Client struct { + Conn ConnInterface + JetStream jetstream.JetStream + Logger pubsub.Logger + Config *Config + Metrics Metrics + Subscriptions map[string]*subscription + subMu sync.Mutex + Tracer trace.Tracer + messageBuffer chan *pubsub.Message + bufferSize int + topicBuffers map[string]chan *pubsub.Message + bufferMu sync.RWMutex +} + +// CreateTopic creates a new topic (stream) in Client JetStream. +func (n *Client) CreateTopic(ctx context.Context, name string) error { + return n.CreateStream(ctx, StreamConfig{ + Stream: name, + Subjects: []string{name}, + }) +} + +// DeleteTopic deletes a topic (stream) in Client JetStream. +func (n *Client) DeleteTopic(ctx context.Context, name string) error { + n.Logger.Debugf("deleting topic (stream) %s", name) + + err := n.JetStream.DeleteStream(ctx, name) + if err != nil { + if errors.Is(err, jetstream.ErrStreamNotFound) { + n.Logger.Debugf("stream %s not found, considering delete successful", name) + + return nil // If the stream doesn't exist, we consider it a success + } + + n.Logger.Errorf("failed to delete stream (topic) %s: %v", name, err) + + return err + } + + n.Logger.Debugf("successfully deleted topic (stream) %s", name) + + return nil +} + +// natsConnWrapper wraps a nats.Conn to implement the ConnInterface. +type natsConnWrapper struct { + *nats.Conn +} + +func (w *natsConnWrapper) Status() nats.Status { + return w.Conn.Status() +} + +func (w *natsConnWrapper) Close() { + w.Conn.Close() +} + +func (w *natsConnWrapper) NatsConn() *nats.Conn { + return w.Conn +} + +// New creates a new Client. +func New(cfg *Config) *PubSubWrapper { + if cfg == nil { + cfg = &Config{} + } + + if cfg.BatchSize == 0 { + cfg.BatchSize = 100 // Default batch size + } + + client := &Client{ + Config: cfg, + Subscriptions: make(map[string]*subscription), + topicBuffers: make(map[string]chan *pubsub.Message), + bufferSize: cfg.BatchSize, + } + + return &PubSubWrapper{Client: client} +} + +// UseLogger sets the logger for the NATS client. +func (n *Client) UseLogger(logger any) { + if l, ok := logger.(pubsub.Logger); ok { + n.Logger = l + } +} + +// UseTracer sets the tracer for the NATS client. +func (n *Client) UseTracer(tracer any) { + if t, ok := tracer.(trace.Tracer); ok { + n.Tracer = t + } +} + +// UseMetrics sets the metrics for the NATS client. +func (n *Client) UseMetrics(metrics any) { + if m, ok := metrics.(Metrics); ok { + n.Metrics = m + } +} + +// Connect establishes a connection to NATS and sets up JetStream. +func (n *Client) Connect() { + if err := n.validateAndPrepare(); err != nil { + return + } + + nc, err := n.createNATSConnection() + if err != nil { + return + } + + js, err := n.createJetStreamContext(nc) + if err != nil { + nc.Close() + return + } + + n.Conn = &natsConnWrapper{nc} + n.JetStream = js + + n.logSuccessfulConnection() +} + +func (n *Client) validateAndPrepare() error { + if n.Config == nil { + n.Logger.Errorf("NATS configuration is nil") + return errNATSConnNil + } + + if err := ValidateConfigs(n.Config); err != nil { + n.Logger.Errorf("could not initialize NATS JetStream: %v", err) + return err + } + + return nil +} + +func (n *Client) createNATSConnection() (*nats.Conn, error) { + opts := []nats.Option{nats.Name("GoFr NATS JetStreamClient")} + if n.Config.CredsFile != "" { + opts = append(opts, nats.UserCredentials(n.Config.CredsFile)) + } + + nc, err := nats.Connect(n.Config.Server, opts...) + if err != nil { + n.Logger.Errorf("failed to connect to NATS server at %v: %v", n.Config.Server, err) + return nil, err + } + + return nc, nil +} + +func (n *Client) createJetStreamContext(nc *nats.Conn) (jetstream.JetStream, error) { + js, err := jetstream.New(nc) + if err != nil { + n.Logger.Errorf("failed to create JetStream context: %v", err) + return nil, err + } + + return js, nil +} + +func (n *Client) logSuccessfulConnection() { + if n.Logger != nil { + n.Logger.Logf("connected to NATS server '%s'", n.Config.Server) + } +} + +// Publish publishes a message to a topic. +func (n *Client) Publish(ctx context.Context, subject string, message []byte) error { + n.Metrics.IncrementCounter(ctx, "app_pubsub_publish_total_count", "subject", subject) + + if n.JetStream == nil || subject == "" { + err := errJetStreamNotConfigured + n.Logger.Error(err.Error()) + + return err + } + + _, err := n.JetStream.Publish(ctx, subject, message) + if err != nil { + n.Logger.Errorf("failed to publish message to Client JetStream: %v", err) + + return err + } + + n.Metrics.IncrementCounter(ctx, "app_pubsub_publish_success_count", "subject", subject) + + return nil +} + +func (n *Client) getOrCreateBuffer(topic string) chan *pubsub.Message { + n.bufferMu.Lock() + defer n.bufferMu.Unlock() + + if buffer, exists := n.topicBuffers[topic]; exists { + return buffer + } + + buffer := make(chan *pubsub.Message, n.bufferSize) + n.topicBuffers[topic] = buffer + + return buffer +} + +func (n *Client) Subscribe(ctx context.Context, topic string) (*pubsub.Message, error) { + n.Metrics.IncrementCounter(ctx, "app_pubsub_subscribe_total_count", "topic", topic) + + if err := n.validateSubscribePrerequisites(); err != nil { + return nil, err + } + + n.subMu.Lock() + + _, exists := n.Subscriptions[topic] + if !exists { + cons, err := n.createOrUpdateConsumer(ctx, topic) + if err != nil { + n.subMu.Unlock() + return nil, err + } + + subCtx, cancel := context.WithCancel(context.Background()) + n.Subscriptions[topic] = &subscription{cancel: cancel} + + buffer := n.getOrCreateBuffer(topic) + go n.consumeMessages(subCtx, cons, topic, buffer) + } + + n.subMu.Unlock() + + buffer := n.getOrCreateBuffer(topic) + + select { + case msg := <-buffer: + n.Metrics.IncrementCounter(ctx, "app_pubsub_subscribe_success_count", "topic", topic) + return msg, nil + case <-ctx.Done(): + return nil, ctx.Err() + } +} + +func (n *Client) consumeMessages(ctx context.Context, cons jetstream.Consumer, topic string, buffer chan *pubsub.Message) { + for { + select { + case <-ctx.Done(): + return + default: + msgs, err := cons.Fetch(1, jetstream.FetchMaxWait(n.Config.MaxWait)) + if err != nil { + if !errors.Is(err, context.DeadlineExceeded) { + n.Logger.Errorf("Error fetching messages for topic %s: %v", topic, err) + } + + time.Sleep(consumeMessageDelay) // Add a small delay to avoid tight loop + + continue + } + + for msg := range msgs.Messages() { + pubsubMsg := pubsub.NewMessage(ctx) + pubsubMsg.Topic = topic + pubsubMsg.Value = msg.Data() + pubsubMsg.MetaData = msg.Headers() + pubsubMsg.Committer = &natsCommitter{msg: msg} + + select { + case buffer <- pubsubMsg: + // Message sent successfully + default: + // Buffer is full, log a warning + // TODO: implement backoff strategy + n.Logger.Logf("Message buffer is full for topic %s. Consider increasing buffer size or processing messages faster.", topic) + } + } + + if err := msgs.Error(); err != nil { + n.Logger.Errorf("Error in message batch for topic %s: %v", topic, err) + } + } + } +} + +func (n *Client) validateSubscribePrerequisites() error { + if n.JetStream == nil { + return errJetStreamNotConfigured + } + + if n.Config.Consumer == "" { + return errConsumerNotProvided + } + + return nil +} + +func (n *Client) createOrUpdateConsumer(ctx context.Context, topic string) (jetstream.Consumer, error) { + consumerName := fmt.Sprintf("%s_%s", n.Config.Consumer, strings.ReplaceAll(topic, ".", "_")) + cons, err := n.JetStream.CreateOrUpdateConsumer(ctx, n.Config.Stream.Stream, jetstream.ConsumerConfig{ + Durable: consumerName, + AckPolicy: jetstream.AckExplicitPolicy, + FilterSubject: topic, + MaxDeliver: n.Config.Stream.MaxDeliver, + DeliverPolicy: jetstream.DeliverNewPolicy, + AckWait: 30 * time.Second, + }) + + if err != nil { + n.Logger.Errorf("failed to create or update consumer: %v", err) + + return nil, err + } + + return cons, nil +} + +// HandleMessage handles a message from a consumer. +func (n *Client) HandleMessage(ctx context.Context, msg jetstream.Msg, handler messageHandler) error { + if err := handler(ctx, msg); err != nil { + n.Logger.Errorf("error handling message: %v", err) + + return n.NakMessage(msg) + } + + return nil +} + +// NakMessage naks a message from a consumer. +func (n *Client) NakMessage(msg jetstream.Msg) error { + if err := msg.Nak(); err != nil { + n.Logger.Errorf("failed to NAK message: %v", err) + + return err + } + + return nil +} + +// HandleFetchError handles fetch errors. +func (n *Client) HandleFetchError(err error) { + n.Logger.Errorf("failed to fetch messages: %v", err) + time.Sleep(time.Second) // Backoff on error +} + +// Close closes the Client. +func (n *Client) Close() error { + n.subMu.Lock() + for _, sub := range n.Subscriptions { + sub.cancel() + } + + n.Subscriptions = make(map[string]*subscription) + n.subMu.Unlock() + + n.bufferMu.Lock() + for _, buffer := range n.topicBuffers { + close(buffer) + } + + n.topicBuffers = make(map[string]chan *pubsub.Message) + n.bufferMu.Unlock() + + if n.Conn != nil { + n.Conn.Close() + } + + return nil +} + +// DeleteStream deletes a stream in Client JetStream. +func (n *Client) DeleteStream(ctx context.Context, name string) error { + err := n.JetStream.DeleteStream(ctx, name) + if err != nil { + n.Logger.Errorf("failed to delete stream: %v", err) + + return err + } + + return nil +} + +// CreateStream creates a stream in Client JetStream. +func (n *Client) CreateStream(ctx context.Context, cfg StreamConfig) error { + n.Logger.Debugf("creating stream %s", cfg.Stream) + jsCfg := jetstream.StreamConfig{ + Name: cfg.Stream, + Subjects: cfg.Subjects, + } + + _, err := n.JetStream.CreateStream(ctx, jsCfg) + if err != nil { + n.Logger.Errorf("failed to create stream: %v", err) + + return err + } + + return nil +} + +// CreateOrUpdateStream creates or updates a stream in Client JetStream. +func (n *Client) CreateOrUpdateStream(ctx context.Context, cfg *jetstream.StreamConfig) (jetstream.Stream, error) { + n.Logger.Debugf("creating or updating stream %s", cfg.Name) + + stream, err := n.JetStream.CreateOrUpdateStream(ctx, *cfg) + if err != nil { + n.Logger.Errorf("failed to create or update stream: %v", err) + + return nil, err + } + + return stream, nil +} + +// ValidateConfigs validates the configuration for Client JetStream. +func ValidateConfigs(conf *Config) error { + err := error(nil) + + if conf.Server == "" { + err = errServerNotProvided + } + + // check if subjects are provided + if err == nil && len(conf.Stream.Subjects) == 0 { + err = errSubjectsNotProvided + } + + return err +} diff --git a/pkg/gofr/datasource/pubsub/nats/client_test.go b/pkg/gofr/datasource/pubsub/nats/client_test.go new file mode 100644 index 000000000..8398be065 --- /dev/null +++ b/pkg/gofr/datasource/pubsub/nats/client_test.go @@ -0,0 +1,766 @@ +package nats + +import ( + "context" + "testing" + "time" + + "github.com/nats-io/nats.go/jetstream" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/mock/gomock" + "gofr.dev/pkg/gofr/datasource/pubsub" + "gofr.dev/pkg/gofr/logging" + "gofr.dev/pkg/gofr/testutil" +) + +func TestValidateConfigs(t *testing.T) { + testCases := []struct { + name string + config Config + expected error + }{ + { + name: "Valid Config", + config: Config{ + Server: NatsServer, + Stream: StreamConfig{ + Stream: "test-stream", + Subjects: []string{"test-subject"}, + }, + }, + expected: nil, + }, + { + name: "Empty Server", + config: Config{}, + expected: errServerNotProvided, + }, + { + name: "Empty Stream Subject", + config: Config{ + Server: NatsServer, + Stream: StreamConfig{ + Stream: "test-stream", + // Subjects is intentionally left empty + }, + }, + expected: errSubjectsNotProvided, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + err := ValidateConfigs(&tc.config) + assert.Equal(t, tc.expected, err) + }) + } +} + +func TestNATSClient_Publish(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mockJS := NewMockJetStream(ctrl) + mockLogger := logging.NewMockLogger(logging.DEBUG) + mockMetrics := NewMockMetrics(ctrl) + mockConn := NewMockConnInterface(ctrl) + + conf := &Config{ + Server: NatsServer, + Stream: StreamConfig{ + Stream: "test-stream", + Subjects: []string{"test-subject"}, + }, + Consumer: "test-consumer", + } + + client := &Client{ + Conn: mockConn, + JetStream: mockJS, + Config: conf, + Logger: mockLogger, + Metrics: mockMetrics, + } + + ctx := context.Background() + subject := "test-subject" + message := []byte("test-message") + + // Set up expected calls + mockMetrics.EXPECT(). + IncrementCounter(gomock.Any(), "app_pubsub_publish_total_count", "subject", subject) + mockJS.EXPECT(). + Publish(gomock.Any(), subject, message). + Return(&jetstream.PubAck{}, nil) + mockMetrics.EXPECT(). + IncrementCounter(gomock.Any(), "app_pubsub_publish_success_count", "subject", subject) + + // We don't need to set an expectation for NatsConn() in this test, + // as we're not using it in the Publish method. + + // Call Publish + err := client.Publish(ctx, subject, message) + require.NoError(t, err) +} + +func TestNATSClient_PublishError(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + metrics := NewMockMetrics(ctrl) + mockConn := NewMockConnInterface(ctrl) + + config := &Config{ + Server: NatsServer, + Stream: StreamConfig{ + Stream: "test-stream", + Subjects: []string{"test-subject"}, + }, + Consumer: "test-consumer", + } + + client := &Client{ + Conn: mockConn, + JetStream: nil, // Simulate JetStream being nil + Metrics: metrics, + Config: config, + } + + ctx := context.TODO() + subject := "test" + message := []byte("test-message") + + metrics.EXPECT(). + IncrementCounter(ctx, "app_pubsub_publish_total_count", "subject", subject) + + logs := testutil.StderrOutputForFunc(func() { + client.Logger = logging.NewMockLogger(logging.DEBUG) + err := client.Publish(ctx, subject, message) + require.Error(t, err) + assert.Contains(t, err.Error(), "JetStream is not configured") + }) + + assert.Contains(t, logs, "JetStream is not configured") +} + +func TestNATSClient_SubscribeSuccess(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mockJS := NewMockJetStream(ctrl) + mockConsumer := NewMockConsumer(ctrl) + mockMsgBatch := NewMockMessageBatch(ctrl) + mockMetrics := NewMockMetrics(ctrl) + mockMsg := NewMockMsg(ctrl) + + client := &Client{ + JetStream: mockJS, + Config: &Config{ + Stream: StreamConfig{ + Stream: "test-stream", + Subjects: []string{"test-subject"}, + }, + Consumer: "test-consumer", + MaxWait: time.Second, + BatchSize: 1, + }, + Metrics: mockMetrics, + Subscriptions: make(map[string]*subscription), + topicBuffers: make(map[string]chan *pubsub.Message), + bufferSize: 1, + } + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + mockMetrics.EXPECT().IncrementCounter(gomock.Any(), "app_pubsub_subscribe_total_count", "topic", "test-subject") + mockJS.EXPECT().CreateOrUpdateConsumer(gomock.Any(), client.Config.Stream.Stream, gomock.Any()).Return(mockConsumer, nil) + mockConsumer.EXPECT().Fetch(gomock.Any(), gomock.Any()).Return(mockMsgBatch, nil).AnyTimes() + + msgChan := make(chan jetstream.Msg, 1) + msgChan <- mockMsg + close(msgChan) + + mockMsgBatch.EXPECT().Messages().Return(msgChan).AnyTimes() // Allow multiple calls to Messages() + mockMsgBatch.EXPECT().Error().Return(nil).AnyTimes() + + mockMsg.EXPECT().Data().Return([]byte("test message")) + mockMsg.EXPECT().Headers().Return(nil) + + mockMetrics.EXPECT().IncrementCounter(gomock.Any(), "app_pubsub_subscribe_success_count", "topic", "test-subject") + + // Call Subscribe + msg, err := client.Subscribe(ctx, "test-subject") + + require.NoError(t, err) + assert.NotNil(t, msg) + assert.Equal(t, "test-subject", msg.Topic) + assert.Equal(t, []byte("test message"), msg.Value) +} + +func TestNATSClient_SubscribeTimeout(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mockJS := NewMockJetStream(ctrl) + mockConsumer := NewMockConsumer(ctrl) + mockMsgBatch := NewMockMessageBatch(ctrl) + mockMetrics := NewMockMetrics(ctrl) + + client := &Client{ + JetStream: mockJS, + Config: &Config{ + Stream: StreamConfig{ + Stream: "test-stream", + Subjects: []string{"test-subject"}, + }, + Consumer: "test-consumer", + MaxWait: 10 * time.Millisecond, // Reduced timeout for faster test + BatchSize: 1, + }, + Metrics: mockMetrics, + Subscriptions: make(map[string]*subscription), + topicBuffers: make(map[string]chan *pubsub.Message), + bufferSize: 1, + Logger: logging.NewMockLogger(logging.DEBUG), + } + + ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond) + defer cancel() + + mockMetrics.EXPECT().IncrementCounter(gomock.Any(), "app_pubsub_subscribe_total_count", "topic", "test-subject") + mockJS.EXPECT().CreateOrUpdateConsumer(gomock.Any(), client.Config.Stream.Stream, gomock.Any()).Return(mockConsumer, nil) + mockConsumer.EXPECT().Fetch(gomock.Any(), gomock.Any()).Return(mockMsgBatch, nil).AnyTimes() + mockMsgBatch.EXPECT().Messages().Return(make(chan jetstream.Msg)).AnyTimes() // Return an empty channel to simulate timeout + mockMsgBatch.EXPECT().Error().Return(nil).AnyTimes() + + msg, err := client.Subscribe(ctx, "test-subject") + + require.Error(t, err) + assert.Nil(t, msg) + assert.ErrorIs(t, err, context.DeadlineExceeded) +} + +func TestNATSClient_SubscribeError(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mockJS := NewMockJetStream(ctrl) + mockMetrics := NewMockMetrics(ctrl) + + client := &Client{ + JetStream: mockJS, + Config: &Config{ + Stream: StreamConfig{ + Stream: "test-stream", + Subjects: []string{"test-subject"}, + }, + Consumer: "test-consumer", + }, + Metrics: mockMetrics, + Subscriptions: make(map[string]*subscription), + } + + ctx := context.Background() + + mockMetrics.EXPECT().IncrementCounter(gomock.Any(), "app_pubsub_subscribe_total_count", "topic", "test-subject") + + expectedErr := errFailedToCreateConsumer + mockJS.EXPECT().CreateOrUpdateConsumer(gomock.Any(), client.Config.Stream.Stream, gomock.Any()).Return(nil, expectedErr) + + logs := testutil.StderrOutputForFunc(func() { + client.Logger = logging.NewMockLogger(logging.DEBUG) + msg, err := client.Subscribe(ctx, "test-subject") + + require.Error(t, err) + assert.Nil(t, msg) + assert.Equal(t, expectedErr, err) + }) + + assert.Contains(t, logs, "failed to create or update consumer") +} + +func TestNATSClient_Close(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mockJS := NewMockJetStream(ctrl) + mockLogger := logging.NewMockLogger(logging.DEBUG) + mockMetrics := NewMockMetrics(ctrl) + mockConn := NewMockConnInterface(ctrl) + + client := &Client{ + Conn: mockConn, + JetStream: mockJS, + Logger: mockLogger, + Metrics: mockMetrics, + Config: &Config{ + Stream: StreamConfig{ + Stream: "test-stream", + }, + }, + Subscriptions: map[string]*subscription{ + "test-subject": { + cancel: func() {}, + }, + }, + } + + mockConn.EXPECT().Close() + + err := client.Close() + require.NoError(t, err) + assert.Empty(t, client.Subscriptions) +} + +func TestNew(t *testing.T) { + config := &Config{ + Server: NatsServer, + Stream: StreamConfig{ + Stream: "test-stream", + Subjects: []string{"test-subject"}, + }, + Consumer: "test-consumer", + MaxWait: 5 * time.Second, + BatchSize: 100, + } + + natsClient := New(config) + assert.NotNil(t, natsClient) + + // Check PubSubWrapper struct + assert.NotNil(t, natsClient) + assert.NotNil(t, natsClient.Client) + + // Check Client struct + assert.Equal(t, config, natsClient.Client.Config) + assert.NotNil(t, natsClient.Client.Subscriptions) + assert.NotNil(t, natsClient.Client.topicBuffers) + assert.Equal(t, config.BatchSize, natsClient.Client.bufferSize) + + // Check methods + assert.NotNil(t, natsClient.DeleteTopic) + assert.NotNil(t, natsClient.CreateTopic) + assert.NotNil(t, natsClient.Subscribe) + assert.NotNil(t, natsClient.Publish) + assert.NotNil(t, natsClient.Close) + + // Check new methods + assert.NotNil(t, natsClient.UseLogger) + assert.NotNil(t, natsClient.UseMetrics) + assert.NotNil(t, natsClient.UseTracer) + assert.NotNil(t, natsClient.Connect) + + // Check that Connect hasn't been called yet + assert.Nil(t, natsClient.Client.Conn) + assert.Nil(t, natsClient.Client.JetStream) +} + +func TestNew_Error(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + testCases := []struct { + name string + config *Config + expectedErr error + }{ + { + name: "Invalid Config", + config: &Config{ + Server: "", // Invalid: empty server + }, + expectedErr: errServerNotProvided, + }, + // Add more test cases for other error scenarios + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + client := New(tc.config) + assert.NotNil(t, client, "Client should not be nil even with invalid config") + }) + } +} + +func TestNatsClient_DeleteStream(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mockJS := NewMockJetStream(ctrl) + client := &Client{JetStream: mockJS} + + ctx := context.Background() + streamName := "test-stream" + + mockJS.EXPECT().DeleteStream(ctx, streamName).Return(nil) + + err := client.DeleteStream(ctx, streamName) + assert.NoError(t, err) +} + +func TestNatsClient_CreateStream(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mockJS := NewMockJetStream(ctrl) + mockLogger := logging.NewMockLogger(logging.DEBUG) + + client := &Client{ + JetStream: mockJS, + Logger: mockLogger, + Config: &Config{ + Stream: StreamConfig{ + Stream: "test-stream", + }, + }, + } + + ctx := context.Background() + + mockJS.EXPECT(). + CreateStream(ctx, gomock.Any()). + Return(nil, nil) + + // setup test config + client.Config.Stream.Stream = "test-stream" + + logs := testutil.StdoutOutputForFunc(func() { + client.Logger = logging.NewMockLogger(logging.DEBUG) + err := client.CreateStream(ctx, client.Config.Stream) + require.NoError(t, err) + }) + + assert.Contains(t, logs, "creating stream") + assert.Contains(t, logs, "test-stream") +} + +func TestNATSClient_CreateOrUpdateStream(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mockJS := NewMockJetStream(ctrl) + mockLogger := logging.NewMockLogger(logging.DEBUG) + mockMetrics := NewMockMetrics(ctrl) + mockStream := NewMockStream(ctrl) + + client := &Client{ + JetStream: mockJS, + Logger: mockLogger, + Metrics: mockMetrics, + Config: &Config{ + Stream: StreamConfig{ + Stream: "test-stream", + }, + }, + } + + ctx := context.Background() + cfg := &jetstream.StreamConfig{ + Name: "test-stream", + Subjects: []string{"test.subject"}, + } + + // Expect the CreateOrUpdateStream call + mockJS.EXPECT(). + CreateOrUpdateStream(ctx, *cfg). + Return(mockStream, nil) + + // Capture log output + logs := testutil.StdoutOutputForFunc(func() { + client.Logger = logging.NewMockLogger(logging.DEBUG) + stream, err := client.CreateOrUpdateStream(ctx, cfg) + + // Assert the results + require.NoError(t, err) + assert.Equal(t, mockStream, stream) + }) + + // Check the logs + assert.Contains(t, logs, "creating or updating stream test-stream") +} + +func TestNATSClient_CreateTopic(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mockJS := NewMockJetStream(ctrl) + mockLogger := logging.NewMockLogger(logging.DEBUG) + client := &Client{ + JetStream: mockJS, + Logger: mockLogger, + Config: &Config{}, + } + + ctx := context.Background() + + mockJS.EXPECT(). + CreateStream(ctx, gomock.Any()). + Return(nil, nil) + + err := client.CreateTopic(ctx, "test-topic") + require.NoError(t, err) +} + +func TestNATSClient_DeleteTopic(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mockJS := NewMockJetStream(ctrl) + mockLogger := logging.NewMockLogger(logging.DEBUG) + client := &Client{ + JetStream: mockJS, + Logger: mockLogger, + Config: &Config{}, + } + + ctx := context.Background() + + mockJS.EXPECT().DeleteStream(ctx, "test-topic").Return(nil) + + err := client.DeleteTopic(ctx, "test-topic") + require.NoError(t, err) +} + +func TestNATSClient_NakMessage(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mockMsg := NewMockMsg(ctrl) + mockLogger := logging.NewMockLogger(logging.DEBUG) + client := &Client{ + Logger: mockLogger, + } + + // Successful Nak + mockMsg.EXPECT().Nak().Return(nil) + err := client.NakMessage(mockMsg) + require.NoError(t, err) + + // Failed Nak + mockMsg.EXPECT().Nak().Return(assert.AnError) + err = client.NakMessage(mockMsg) + assert.Error(t, err) +} + +func TestNATSClient_HandleFetchError(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mockLogger := logging.NewMockLogger(logging.DEBUG) + client := &Client{ + Logger: mockLogger, + } + + stdoutLogs := testutil.StdoutOutputForFunc(func() { + client.Logger = logging.NewMockLogger(logging.DEBUG) + client.HandleFetchError(assert.AnError) + }) + + stderrLogs := testutil.StderrOutputForFunc(func() { + client.Logger = logging.NewMockLogger(logging.DEBUG) + client.HandleFetchError(assert.AnError) + }) + + allLogs := stdoutLogs + stderrLogs + + assert.Contains(t, allLogs, "failed to fetch messages: assert.AnError", "Expected log not found") +} + +func TestNATSClient_DeleteTopic_Error(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mockJS := NewMockJetStream(ctrl) + mockLogger := logging.NewMockLogger(logging.DEBUG) + client := &Client{ + JetStream: mockJS, + Logger: mockLogger, + Config: &Config{}, + } + + ctx := context.Background() + + expectedErr := errFailedToDeleteStream + mockJS.EXPECT().DeleteStream(ctx, "test-topic").Return(expectedErr) + + err := client.DeleteTopic(ctx, "test-topic") + require.Error(t, err) + assert.Equal(t, expectedErr, err) +} + +func TestNATSClient_Publish_Error(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mockJS := NewMockJetStream(ctrl) + mockLogger := logging.NewMockLogger(logging.DEBUG) + mockMetrics := NewMockMetrics(ctrl) + mockConn := NewMockConnInterface(ctrl) + + client := &Client{ + Conn: mockConn, + JetStream: mockJS, + Logger: mockLogger, + Metrics: mockMetrics, + Config: &Config{}, + } + + ctx := context.Background() + subject := "test-subject" + message := []byte("test-message") + + expectedErr := errPublishError + + mockMetrics.EXPECT().IncrementCounter(gomock.Any(), "app_pubsub_publish_total_count", "subject", subject) + mockJS.EXPECT().Publish(gomock.Any(), subject, message).Return(nil, expectedErr) + + err := client.Publish(ctx, subject, message) + require.Error(t, err) + assert.Equal(t, expectedErr, err) +} + +func TestNATSClient_SubscribeCreateConsumerError(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mockJS := NewMockJetStream(ctrl) + mockMetrics := NewMockMetrics(ctrl) + + client := &Client{ + JetStream: mockJS, + Metrics: mockMetrics, + Config: &Config{ + Stream: StreamConfig{ + Stream: "test-stream", + Subjects: []string{"test-subject"}, + }, + Consumer: "test-consumer", + }, + Subscriptions: make(map[string]*subscription), + messageBuffer: make(chan *pubsub.Message, 1), + } + + ctx := context.Background() + expectedErr := errFailedToCreateConsumer + + mockMetrics.EXPECT().IncrementCounter(gomock.Any(), "app_pubsub_subscribe_total_count", "topic", "test-subject") + mockJS.EXPECT().CreateOrUpdateConsumer(gomock.Any(), client.Config.Stream.Stream, gomock.Any()).Return(nil, expectedErr) + + logs := testutil.StderrOutputForFunc(func() { + client.Logger = logging.NewMockLogger(logging.DEBUG) + msg, err := client.Subscribe(ctx, "test-subject") + + require.Error(t, err) + assert.Nil(t, msg) + assert.Equal(t, expectedErr, err) + }) + + assert.Contains(t, logs, "failed to create or update consumer") +} + +func TestNATSClient_HandleMessageError(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mockMsg := NewMockMsg(ctrl) + logger := logging.NewMockLogger(logging.DEBUG) + client := &Client{ + Logger: logger, + } + + ctx := context.Background() + + // Set up expectations + mockMsg.EXPECT().Nak().Return(nil) + + handlerErr := errHandlerError + handler := func(_ context.Context, _ jetstream.Msg) error { + return handlerErr + } + + // Capture log output + logs := testutil.StderrOutputForFunc(func() { + client.Logger = logging.NewMockLogger(logging.DEBUG) + err := client.HandleMessage(ctx, mockMsg, handler) + assert.NoError(t, err) + }) + + // Assert on the captured log output + assert.Contains(t, logs, "error handling message: handler error") +} + +func TestNATSClient_DeleteStreamError(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mockJS := NewMockJetStream(ctrl) + mockLogger := logging.NewMockLogger(logging.DEBUG) + client := &Client{ + JetStream: mockJS, + Logger: mockLogger, + } + + ctx := context.Background() + streamName := "test-stream" + expectedErr := errFailedToDeleteStream + + mockJS.EXPECT().DeleteStream(ctx, streamName).Return(expectedErr) + + err := client.DeleteStream(ctx, streamName) + require.Error(t, err) + assert.Equal(t, expectedErr, err) +} + +func TestNATSClient_CreateStreamError(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mockJS := NewMockJetStream(ctrl) + mockLogger := logging.NewMockLogger(logging.DEBUG) + client := &Client{ + JetStream: mockJS, + Logger: mockLogger, + Config: &Config{ + Stream: StreamConfig{ + Stream: "test-stream", + }, + }, + } + + ctx := context.Background() + expectedErr := errFailedToCreateStream + + mockJS.EXPECT().CreateStream(ctx, gomock.Any()).Return(nil, expectedErr) + + err := client.CreateStream(ctx, client.Config.Stream) + require.Error(t, err) + assert.Equal(t, expectedErr, err) +} + +func TestNATSClient_CreateOrUpdateStreamError(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mockJS := NewMockJetStream(ctrl) + mockLogger := logging.NewMockLogger(logging.DEBUG) + client := &Client{ + JetStream: mockJS, + Logger: mockLogger, + } + + ctx := context.Background() + cfg := &jetstream.StreamConfig{ + Name: "test-stream", + Subjects: []string{"test.subject"}, + } + expectedErr := errFailedCreateOrUpdateStream + + mockJS.EXPECT().CreateOrUpdateStream(ctx, *cfg).Return(nil, expectedErr) + + stream, err := client.CreateOrUpdateStream(ctx, cfg) + require.Error(t, err) + assert.Nil(t, stream) + assert.Equal(t, expectedErr, err) +} diff --git a/pkg/gofr/datasource/pubsub/nats/committer.go b/pkg/gofr/datasource/pubsub/nats/committer.go new file mode 100644 index 000000000..e89b2969c --- /dev/null +++ b/pkg/gofr/datasource/pubsub/nats/committer.go @@ -0,0 +1,41 @@ +package nats + +import ( + "log" + + "github.com/nats-io/nats.go/jetstream" +) + +// createTestCommitter is a helper function for tests to create a natsCommitter. +func createTestCommitter(msg jetstream.Msg) *natsCommitter { + return &natsCommitter{msg: msg} +} + +// natsCommitter implements the pubsub.Committer interface for Client messages. +type natsCommitter struct { + msg jetstream.Msg +} + +// Commit commits the message. +func (c *natsCommitter) Commit() { + if err := c.msg.Ack(); err != nil { + log.Println("Error committing message:", err) + + // nak the message + if err := c.msg.Nak(); err != nil { + log.Println("Error naking message:", err) + } + + return + } +} + +// Nak naks the message. +func (c *natsCommitter) Nak() error { + return c.msg.Nak() +} + +// Rollback rolls back the message. +func (c *natsCommitter) Rollback() error { + return c.msg.Nak() +} diff --git a/pkg/gofr/datasource/pubsub/nats/committer_test.go b/pkg/gofr/datasource/pubsub/nats/committer_test.go new file mode 100644 index 000000000..00f051718 --- /dev/null +++ b/pkg/gofr/datasource/pubsub/nats/committer_test.go @@ -0,0 +1,80 @@ +package nats + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "go.uber.org/mock/gomock" +) + +func TestNatsCommitter_Commit(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mockMsg := NewMockMsg(ctrl) + committer := createTestCommitter(mockMsg) + + t.Run("Successful Commit", func(_ *testing.T) { + mockMsg.EXPECT().Ack().Return(nil) + + committer.Commit() + }) + + t.Run("Failed Commit with Successful Nak", func(_ *testing.T) { + mockMsg.EXPECT().Ack().Return(assert.AnError) + mockMsg.EXPECT().Nak().Return(nil) + + committer.Commit() + }) + + t.Run("Failed Commit with Failed Nak", func(_ *testing.T) { + mockMsg.EXPECT().Ack().Return(assert.AnError) + mockMsg.EXPECT().Nak().Return(assert.AnError) + + committer.Commit() + }) +} + +func TestNatsCommitter_Nak(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mockMsg := NewMockMsg(ctrl) + committer := createTestCommitter(mockMsg) + + t.Run("Successful Nak", func(t *testing.T) { + mockMsg.EXPECT().Nak().Return(nil) + + err := committer.Nak() + assert.NoError(t, err) + }) + + t.Run("Failed Nak", func(t *testing.T) { + mockMsg.EXPECT().Nak().Return(assert.AnError) + + err := committer.Nak() + assert.Error(t, err) + }) +} + +func TestNatsCommitter_Rollback(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mockMsg := NewMockMsg(ctrl) + committer := createTestCommitter(mockMsg) + + t.Run("Successful Rollback", func(t *testing.T) { + mockMsg.EXPECT().Nak().Return(nil) + + err := committer.Rollback() + assert.NoError(t, err) + }) + + t.Run("Failed Rollback", func(t *testing.T) { + mockMsg.EXPECT().Nak().Return(assert.AnError) + + err := committer.Rollback() + assert.Error(t, err) + }) +} diff --git a/pkg/gofr/datasource/pubsub/nats/errors.go b/pkg/gofr/datasource/pubsub/nats/errors.go new file mode 100644 index 000000000..0cdd2e8b5 --- /dev/null +++ b/pkg/gofr/datasource/pubsub/nats/errors.go @@ -0,0 +1,21 @@ +package nats + +import "errors" + +var ( + // Client Errors. + errServerNotProvided = errors.New("client server address not provided") + errSubjectsNotProvided = errors.New("subjects not provided") + errConsumerNotProvided = errors.New("consumer name not provided") + errFailedToCreateStream = errors.New("failed to create stream") + errFailedToDeleteStream = errors.New("failed to delete stream") + errFailedToCreateConsumer = errors.New("failed to create consumer") + errPublishError = errors.New("publish error") + errFailedCreateOrUpdateStream = errors.New("create or update stream error") + errJetStreamNotConfigured = errors.New("JetStream is not configured") + errJetStream = errors.New("JetStream error") + errNATSConnNil = errors.New("NATS connection is nil") + + // Message Errors. + errHandlerError = errors.New("handler error") +) diff --git a/pkg/gofr/datasource/pubsub/nats/go.mod b/pkg/gofr/datasource/pubsub/nats/go.mod new file mode 100644 index 000000000..ad7c1e727 --- /dev/null +++ b/pkg/gofr/datasource/pubsub/nats/go.mod @@ -0,0 +1,31 @@ +module github.com/carverauto/gofr-nats + +go 1.23.1 + +require ( + github.com/nats-io/nats-server/v2 v2.10.21 + github.com/nats-io/nats.go v1.37.0 + github.com/stretchr/testify v1.9.0 + go.opentelemetry.io/otel/trace v1.30.0 + go.uber.org/mock v0.4.0 + gofr.dev v1.22.0 +) + +require ( + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/joho/godotenv v1.5.1 // indirect + github.com/klauspost/compress v1.17.10 // indirect + github.com/minio/highwayhash v1.0.3 // indirect + github.com/nats-io/jwt/v2 v2.5.8 // indirect + github.com/nats-io/nkeys v0.4.7 // indirect + github.com/nats-io/nuid v1.0.1 // indirect + github.com/pkg/errors v0.9.1 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + go.opentelemetry.io/otel v1.30.0 // indirect + golang.org/x/crypto v0.27.0 // indirect + golang.org/x/sys v0.25.0 // indirect + golang.org/x/term v0.24.0 // indirect + golang.org/x/text v0.18.0 // indirect + golang.org/x/time v0.6.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) diff --git a/pkg/gofr/datasource/pubsub/nats/go.sum b/pkg/gofr/datasource/pubsub/nats/go.sum new file mode 100644 index 000000000..8fd29a5b1 --- /dev/null +++ b/pkg/gofr/datasource/pubsub/nats/go.sum @@ -0,0 +1,49 @@ +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0= +github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4= +github.com/klauspost/compress v1.17.10 h1:oXAz+Vh0PMUvJczoi+flxpnBEPxoER1IaAnU/NMPtT0= +github.com/klauspost/compress v1.17.10/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0= +github.com/minio/highwayhash v1.0.3 h1:kbnuUMoHYyVl7szWjSxJnxw11k2U709jqFPPmIUyD6Q= +github.com/minio/highwayhash v1.0.3/go.mod h1:GGYsuwP/fPD6Y9hMiXuapVvlIUEhFhMTh0rxU3ik1LQ= +github.com/nats-io/jwt/v2 v2.5.8 h1:uvdSzwWiEGWGXf+0Q+70qv6AQdvcvxrv9hPM0RiPamE= +github.com/nats-io/jwt/v2 v2.5.8/go.mod h1:ZdWS1nZa6WMZfFwwgpEaqBV8EPGVgOTDHN/wTbz0Y5A= +github.com/nats-io/nats-server/v2 v2.10.21 h1:gfG6T06wBdI25XyY2IsauarOc2srWoFxxfsOKjrzoRA= +github.com/nats-io/nats-server/v2 v2.10.21/go.mod h1:I1YxSAEWbXCfy0bthwvNb5X43WwIWMz7gx5ZVPDr5Rc= +github.com/nats-io/nats.go v1.37.0 h1:07rauXbVnnJvv1gfIyghFEo6lUcYRY0WXc3x7x0vUxE= +github.com/nats-io/nats.go v1.37.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8= +github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI= +github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc= +github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= +github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +go.opentelemetry.io/otel v1.30.0 h1:F2t8sK4qf1fAmY9ua4ohFS/K+FUuOPemHUIXHtktrts= +go.opentelemetry.io/otel v1.30.0/go.mod h1:tFw4Br9b7fOS+uEao81PJjVMjW/5fvNCbpsDIXqP0pc= +go.opentelemetry.io/otel/trace v1.30.0 h1:7UBkkYzeg3C7kQX8VAidWh2biiQbtAKjyIML8dQ9wmc= +go.opentelemetry.io/otel/trace v1.30.0/go.mod h1:5EyKqTzzmyqB9bwtCCq6pDLktPK6fmGf/Dph+8VI02o= +go.uber.org/mock v0.4.0 h1:VcM4ZOtdbR4f6VXfiOpwpVJDL6lCReaZ6mw31wqh7KU= +go.uber.org/mock v0.4.0/go.mod h1:a6FSlNadKUHUa9IP5Vyt1zh4fC7uAwxMutEAscFbkZc= +gofr.dev v1.22.0 h1:bO2BXHqah+RCV6tU+rv1SLRT3nUJj9bWyNYKyVunjP0= +gofr.dev v1.22.0/go.mod h1:jldZJGrUKxD6BUEFwdlODcBCGBSvgkVoMy9q15sJm2Q= +golang.org/x/crypto v0.27.0 h1:GXm2NjJrPaiv/h1tb2UH8QfgC/hOf/+z0p6PT8o1w7A= +golang.org/x/crypto v0.27.0/go.mod h1:1Xngt8kV6Dvbssa53Ziq6Eqn0HqbZi5Z6R0ZpwQzt70= +golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34= +golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/term v0.24.0 h1:Mh5cbb+Zk2hqqXNO7S1iTjEphVL+jb8ZWaqh/g+JWkM= +golang.org/x/term v0.24.0/go.mod h1:lOBK/LVxemqiMij05LGJ0tzNr8xlmwBRJ81PX6wVLH8= +golang.org/x/text v0.18.0 h1:XvMDiNzPAl0jr17s6W9lcaIhGUfUORdGCNsuLmPG224= +golang.org/x/text v0.18.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= +golang.org/x/time v0.6.0 h1:eTDhh4ZXt5Qf0augr54TN6suAUudPcawVZeIAPU7D4U= +golang.org/x/time v0.6.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/pkg/gofr/datasource/pubsub/nats/health.go b/pkg/gofr/datasource/pubsub/nats/health.go new file mode 100644 index 000000000..f36d79430 --- /dev/null +++ b/pkg/gofr/datasource/pubsub/nats/health.go @@ -0,0 +1,84 @@ +package nats + +import ( + "context" + "time" + + "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" + "gofr.dev/pkg/gofr/datasource" +) + +const ( + natsBackend = "Client" + jetStreamStatusOK = "OK" + jetStreamStatusError = "Error" + jetStreamConnected = "CONNECTED" + jetStreamConnecting = "CONNECTING" + jetStreamDisconnecting = "DISCONNECTED" + natsHealthCheckTimeout = 5 * time.Second +) + +// Health returns the health status of the Client Client. +func (n *Client) Health() datasource.Health { + h := datasource.Health{ + Status: datasource.StatusUp, + Details: make(map[string]interface{}), + } + + connectionStatus := n.Conn.Status() + + switch connectionStatus { + case nats.CONNECTING: + h.Status = datasource.StatusUp + h.Details["connection_status"] = jetStreamConnecting + + n.Logger.Debug("Client health check: Connecting") + case nats.CONNECTED: + h.Details["connection_status"] = jetStreamConnected + + n.Logger.Debug("Client health check: Connected") + case nats.CLOSED, nats.DISCONNECTED, nats.RECONNECTING, nats.DRAINING_PUBS, nats.DRAINING_SUBS: + h.Status = datasource.StatusDown + h.Details["connection_status"] = jetStreamDisconnecting + + n.Logger.Error("Client health check: Disconnected") + default: + h.Status = datasource.StatusDown + h.Details["connection_status"] = connectionStatus.String() + + n.Logger.Error("Client health check: Unknown status", connectionStatus) + } + + h.Details["host"] = n.Config.Server + h.Details["backend"] = natsBackend + h.Details["jetstream_enabled"] = n.JetStream != nil + + ctx, cancel := context.WithTimeout(context.Background(), natsHealthCheckTimeout) + defer cancel() + + if n.JetStream != nil && connectionStatus == nats.CONNECTED { + status := getJetStreamStatus(ctx, n.JetStream) + + h.Details["jetstream_status"] = status + + if status != jetStreamStatusOK { + n.Logger.Error("Client health check: JetStream error:", status) + } else { + n.Logger.Debug("Client health check: JetStream enabled") + } + } else if n.JetStream == nil { + n.Logger.Debug("Client health check: JetStream not enabled") + } + + return h +} + +func getJetStreamStatus(ctx context.Context, js jetstream.JetStream) string { + _, err := js.AccountInfo(ctx) + if err != nil { + return jetStreamStatusError + ": " + err.Error() + } + + return jetStreamStatusOK +} diff --git a/pkg/gofr/datasource/pubsub/nats/health_test.go b/pkg/gofr/datasource/pubsub/nats/health_test.go new file mode 100644 index 000000000..a7b3a2b91 --- /dev/null +++ b/pkg/gofr/datasource/pubsub/nats/health_test.go @@ -0,0 +1,270 @@ +package nats + +import ( + "context" + "testing" + + "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" + "github.com/stretchr/testify/assert" + "go.uber.org/mock/gomock" + "gofr.dev/pkg/gofr/datasource" + "gofr.dev/pkg/gofr/logging" + "gofr.dev/pkg/gofr/testutil" +) + +const ( + // NatsServer is the address of a local Client server. Used for testing. + NatsServer = "nats://localhost:4222" +) + +// mockConn is a minimal mock implementation of nats.Conn. +type mockConn struct { + status nats.Status +} + +func (m *mockConn) Status() nats.Status { + return m.status +} + +// mockJetStream is a minimal mock implementation of jetstream.JetStream. +type mockJetStream struct { + accountInfoErr error +} + +func (m *mockJetStream) AccountInfo(_ context.Context) (*jetstream.AccountInfo, error) { + if m.accountInfoErr != nil { + return nil, m.accountInfoErr + } + + return &jetstream.AccountInfo{}, nil +} + +// testNATSClient is a test-specific implementation of Client. +type testNATSClient struct { + Client + mockConn *mockConn + mockJetStream *mockJetStream +} + +func (c *testNATSClient) Health() datasource.Health { + h := datasource.Health{ + Details: make(map[string]interface{}), + } + + h.Status = datasource.StatusUp + connectionStatus := c.mockConn.Status() + + switch connectionStatus { + case nats.CONNECTING: + h.Status = datasource.StatusUp + h.Details["connection_status"] = jetStreamConnecting + case nats.CONNECTED: + h.Details["connection_status"] = jetStreamConnected + case nats.CLOSED, nats.DISCONNECTED, nats.RECONNECTING, nats.DRAINING_PUBS, nats.DRAINING_SUBS: + h.Status = datasource.StatusDown + h.Details["connection_status"] = jetStreamDisconnecting + default: + h.Status = datasource.StatusDown + h.Details["connection_status"] = connectionStatus.String() + } + + h.Details["host"] = c.Config.Server + h.Details["backend"] = natsBackend + h.Details["jetstream_enabled"] = c.mockJetStream != nil + + if c.mockJetStream != nil { + _, err := c.mockJetStream.AccountInfo(context.Background()) + if err != nil { + h.Details["jetstream_status"] = jetStreamStatusError + ": " + err.Error() + } else { + h.Details["jetstream_status"] = jetStreamStatusOK + } + } + + return h +} + +func TestNATSClient_HealthStatusUP(t *testing.T) { + client := &testNATSClient{ + Client: Client{ + Config: &Config{Server: NatsServer}, + Logger: logging.NewMockLogger(logging.DEBUG), + }, + mockConn: &mockConn{status: nats.CONNECTED}, + mockJetStream: &mockJetStream{}, + } + + h := client.Health() + + assert.Equal(t, datasource.StatusUp, h.Status) + assert.Equal(t, NatsServer, h.Details["host"]) + assert.Equal(t, natsBackend, h.Details["backend"]) + assert.Equal(t, jetStreamConnected, h.Details["connection_status"]) + assert.Equal(t, true, h.Details["jetstream_enabled"]) + assert.Equal(t, jetStreamStatusOK, h.Details["jetstream_status"]) +} + +func TestNATSClient_HealthStatusDown(t *testing.T) { + client := &testNATSClient{ + Client: Client{ + Config: &Config{Server: NatsServer}, + Logger: logging.NewMockLogger(logging.DEBUG), + }, + mockConn: &mockConn{status: nats.CLOSED}, + } + + h := client.Health() + + assert.Equal(t, datasource.StatusDown, h.Status) + assert.Equal(t, NatsServer, h.Details["host"]) + assert.Equal(t, natsBackend, h.Details["backend"]) + assert.Equal(t, jetStreamDisconnecting, h.Details["connection_status"]) + assert.Equal(t, false, h.Details["jetstream_enabled"]) +} + +func TestNATSClient_HealthJetStreamError(t *testing.T) { + client := &testNATSClient{ + Client: Client{ + Config: &Config{Server: NatsServer}, + Logger: logging.NewMockLogger(logging.DEBUG), + }, + mockConn: &mockConn{status: nats.CONNECTED}, + mockJetStream: &mockJetStream{accountInfoErr: errJetStream}, + } + + h := client.Health() + + assert.Equal(t, datasource.StatusUp, h.Status) + assert.Equal(t, NatsServer, h.Details["host"]) + assert.Equal(t, natsBackend, h.Details["backend"]) + assert.Equal(t, jetStreamConnected, h.Details["connection_status"]) + assert.Equal(t, true, h.Details["jetstream_enabled"]) + assert.Equal(t, jetStreamStatusError+": "+errJetStream.Error(), h.Details["jetstream_status"]) +} + +func TestNATSClient_Health(t *testing.T) { + testCases := defineHealthTestCases() + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + runHealthTestCase(t, tc) + }) + } +} + +func defineHealthTestCases() []healthTestCase { + return []healthTestCase{ + { + name: "HealthyConnection", + setupMocks: func(mockConn *MockConnInterface, mockJS *MockJetStream) { + mockConn.EXPECT().Status().Return(nats.CONNECTED).Times(2) + mockJS.EXPECT().AccountInfo(gomock.Any()).Return(&jetstream.AccountInfo{}, nil).Times(2) + }, + expectedStatus: datasource.StatusUp, + expectedDetails: map[string]interface{}{ + "host": NatsServer, + "backend": natsBackend, + "connection_status": jetStreamConnected, + "jetstream_enabled": true, + "jetstream_status": jetStreamStatusOK, + }, + expectedLogs: []string{"Client health check: Connected", "Client health check: JetStream enabled"}, + }, + { + name: "DisconnectedStatus", + setupMocks: func(mockConn *MockConnInterface, _ *MockJetStream) { + mockConn.EXPECT().Status().Return(nats.DISCONNECTED).Times(2) + }, + expectedStatus: datasource.StatusDown, + expectedDetails: map[string]interface{}{ + "host": NatsServer, + "backend": natsBackend, + "connection_status": jetStreamDisconnecting, + "jetstream_enabled": true, + }, + expectedLogs: []string{"Client health check: Disconnected"}, + }, + { + name: "JetStreamError", + setupMocks: func(mockConn *MockConnInterface, mockJS *MockJetStream) { + mockConn.EXPECT().Status().Return(nats.CONNECTED).Times(2) + mockJS.EXPECT().AccountInfo(gomock.Any()).Return(nil, errJetStream).Times(2) + }, + expectedStatus: datasource.StatusUp, + expectedDetails: map[string]interface{}{ + "host": NatsServer, + "backend": natsBackend, + "connection_status": jetStreamConnected, + "jetstream_enabled": true, + "jetstream_status": jetStreamStatusError + ": " + errJetStream.Error(), + }, + expectedLogs: []string{"Client health check: Connected", "Client health check: JetStream error"}, + }, + { + name: "NoJetStream", + setupMocks: func(mockConn *MockConnInterface, _ *MockJetStream) { + mockConn.EXPECT().Status().Return(nats.CONNECTED).Times(2) + }, + expectedStatus: datasource.StatusUp, + expectedDetails: map[string]interface{}{ + "host": NatsServer, + "backend": natsBackend, + "connection_status": jetStreamConnected, + "jetstream_enabled": false, + }, + expectedLogs: []string{"Client health check: Connected", "Client health check: JetStream not enabled"}, + }, + } +} + +func runHealthTestCase(t *testing.T, tc healthTestCase) { + t.Helper() + + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mockConn := NewMockConnInterface(ctrl) + mockJS := NewMockJetStream(ctrl) + + tc.setupMocks(mockConn, mockJS) + + client := &Client{ + Conn: mockConn, + JetStream: mockJS, + Config: &Config{Server: NatsServer}, + } + + if tc.name == "NoJetStream" { + client.JetStream = nil + } + + var h datasource.Health + + combinedLogs := getCombinedLogs(func() { + client.Logger = logging.NewMockLogger(logging.DEBUG) + h = client.Health() + }) + + assert.Equal(t, tc.expectedStatus, h.Status) + assert.Equal(t, tc.expectedDetails, h.Details) + + for _, expectedLog := range tc.expectedLogs { + assert.Contains(t, combinedLogs, expectedLog, "Expected log message not found: %s", expectedLog) + } +} + +func getCombinedLogs(f func()) string { + stdoutLogs := testutil.StdoutOutputForFunc(f) + stderrLogs := testutil.StderrOutputForFunc(f) + + return stdoutLogs + stderrLogs +} + +type healthTestCase struct { + name string + setupMocks func(*MockConnInterface, *MockJetStream) + expectedStatus string + expectedDetails map[string]interface{} + expectedLogs []string +} diff --git a/pkg/gofr/datasource/pubsub/nats/interfaces.go b/pkg/gofr/datasource/pubsub/nats/interfaces.go new file mode 100644 index 000000000..162479687 --- /dev/null +++ b/pkg/gofr/datasource/pubsub/nats/interfaces.go @@ -0,0 +1,39 @@ +package nats + +import ( + "context" + + "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" + "gofr.dev/pkg/gofr/datasource" +) + +//go:generate mockgen -destination=mock_client.go -package=nats -source=./interfaces.go Client,Subscription,ConnInterface + +// ConnInterface represents the main Client connection. +type ConnInterface interface { + Status() nats.Status + Close() + NatsConn() *nats.Conn +} + +// NATSConnector represents the main Client connection. +type NATSConnector interface { + Connect(string, ...nats.Option) (ConnInterface, error) +} + +// JetStreamCreator represents the main Client JetStream Client. +type JetStreamCreator interface { + New(*nats.Conn) (jetstream.JetStream, error) +} + +// JetStreamClient represents the main Client JetStream Client. +type JetStreamClient interface { + Publish(ctx context.Context, subject string, message []byte) error + Subscribe(ctx context.Context, subject string, handler messageHandler) error + Close(ctx context.Context) error + DeleteStream(ctx context.Context, name string) error + CreateStream(ctx context.Context, cfg StreamConfig) error + CreateOrUpdateStream(ctx context.Context, cfg jetstream.StreamConfig) (jetstream.Stream, error) + Health() datasource.Health +} diff --git a/pkg/gofr/datasource/pubsub/nats/message.go b/pkg/gofr/datasource/pubsub/nats/message.go new file mode 100644 index 000000000..fd7b6ae5d --- /dev/null +++ b/pkg/gofr/datasource/pubsub/nats/message.go @@ -0,0 +1,24 @@ +package nats + +import ( + "github.com/nats-io/nats.go/jetstream" + "gofr.dev/pkg/gofr/datasource/pubsub" +) + +type natsMessage struct { + msg jetstream.Msg + logger pubsub.Logger +} + +func newNATSMessage(msg jetstream.Msg, logger pubsub.Logger) *natsMessage { + return &natsMessage{ + msg: msg, + logger: logger, + } +} + +func (nmsg *natsMessage) Commit() { + if err := nmsg.msg.Ack(); err != nil { + nmsg.logger.Errorf("unable to acknowledge message on Client JetStream: %v", err) + } +} diff --git a/pkg/gofr/datasource/pubsub/nats/message_test.go b/pkg/gofr/datasource/pubsub/nats/message_test.go new file mode 100644 index 000000000..0ea7b43bb --- /dev/null +++ b/pkg/gofr/datasource/pubsub/nats/message_test.go @@ -0,0 +1,55 @@ +package nats + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "go.uber.org/mock/gomock" + + "gofr.dev/pkg/gofr/logging" + "gofr.dev/pkg/gofr/testutil" +) + +func TestNewNATSMessage(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mockMsg := NewMockMsg(ctrl) + logger := logging.NewMockLogger(logging.ERROR) + n := newNATSMessage(mockMsg, logger) + + assert.NotNil(t, n) + assert.Equal(t, mockMsg, n.msg) + assert.Equal(t, logger, n.logger) +} + +func TestNATSMessage_Commit(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mockMsg := NewMockMsg(ctrl) + logger := logging.NewMockLogger(logging.ERROR) + n := newNATSMessage(mockMsg, logger) + + mockMsg.EXPECT().Ack().Return(nil) + + n.Commit() +} + +func TestNATSMessage_CommitError(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mockMsg := NewMockMsg(ctrl) + + out := testutil.StderrOutputForFunc(func() { + logger := logging.NewMockLogger(logging.ERROR) + n := newNATSMessage(mockMsg, logger) + + mockMsg.EXPECT().Ack().Return(testutil.CustomError{ErrorMessage: "ack error"}) + + n.Commit() + }) + + assert.Contains(t, out, "unable to acknowledge message on Client JetStream") +} diff --git a/pkg/gofr/datasource/pubsub/nats/metrics.go b/pkg/gofr/datasource/pubsub/nats/metrics.go new file mode 100644 index 000000000..a5342fbb0 --- /dev/null +++ b/pkg/gofr/datasource/pubsub/nats/metrics.go @@ -0,0 +1,10 @@ +package nats + +import "context" + +//go:generate mockgen -destination=mock_metrics.go -package=nats -source=./metrics.go + +// Metrics represents the metrics interface. +type Metrics interface { + IncrementCounter(ctx context.Context, name string, labels ...string) +} diff --git a/pkg/gofr/datasource/pubsub/nats/mock_client.go b/pkg/gofr/datasource/pubsub/nats/mock_client.go new file mode 100644 index 000000000..2345ff7af --- /dev/null +++ b/pkg/gofr/datasource/pubsub/nats/mock_client.go @@ -0,0 +1,286 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: ./interfaces.go +// +// Generated by this command: +// +// mockgen -destination=mock_client.go -package=nats -source=./interfaces.go Client,Subscription,ConnInterface +// + +// Package nats is a generated GoMock package. +package nats + +import ( + context "context" + reflect "reflect" + + nats "github.com/nats-io/nats.go" + jetstream "github.com/nats-io/nats.go/jetstream" + gomock "go.uber.org/mock/gomock" + datasource "gofr.dev/pkg/gofr/datasource" +) + +// MockConnInterface is a mock of ConnInterface interface. +type MockConnInterface struct { + ctrl *gomock.Controller + recorder *MockConnInterfaceMockRecorder +} + +// MockConnInterfaceMockRecorder is the mock recorder for MockConnInterface. +type MockConnInterfaceMockRecorder struct { + mock *MockConnInterface +} + +// NewMockConnInterface creates a new mock instance. +func NewMockConnInterface(ctrl *gomock.Controller) *MockConnInterface { + mock := &MockConnInterface{ctrl: ctrl} + mock.recorder = &MockConnInterfaceMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockConnInterface) EXPECT() *MockConnInterfaceMockRecorder { + return m.recorder +} + +// Close mocks base method. +func (m *MockConnInterface) Close() { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Close") +} + +// Close indicates an expected call of Close. +func (mr *MockConnInterfaceMockRecorder) Close() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockConnInterface)(nil).Close)) +} + +// NatsConn mocks base method. +func (m *MockConnInterface) NatsConn() *nats.Conn { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "NatsConn") + ret0, _ := ret[0].(*nats.Conn) + return ret0 +} + +// NatsConn indicates an expected call of NatsConn. +func (mr *MockConnInterfaceMockRecorder) NatsConn() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NatsConn", reflect.TypeOf((*MockConnInterface)(nil).NatsConn)) +} + +// Status mocks base method. +func (m *MockConnInterface) Status() nats.Status { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Status") + ret0, _ := ret[0].(nats.Status) + return ret0 +} + +// Status indicates an expected call of Status. +func (mr *MockConnInterfaceMockRecorder) Status() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Status", reflect.TypeOf((*MockConnInterface)(nil).Status)) +} + +// MockNATSConnector is a mock of NATSConnector interface. +type MockNATSConnector struct { + ctrl *gomock.Controller + recorder *MockNATSConnectorMockRecorder +} + +// MockNATSConnectorMockRecorder is the mock recorder for MockNATSConnector. +type MockNATSConnectorMockRecorder struct { + mock *MockNATSConnector +} + +// NewMockNATSConnector creates a new mock instance. +func NewMockNATSConnector(ctrl *gomock.Controller) *MockNATSConnector { + mock := &MockNATSConnector{ctrl: ctrl} + mock.recorder = &MockNATSConnectorMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockNATSConnector) EXPECT() *MockNATSConnectorMockRecorder { + return m.recorder +} + +// Connect mocks base method. +func (m *MockNATSConnector) Connect(arg0 string, arg1 ...nats.Option) (ConnInterface, error) { + m.ctrl.T.Helper() + varargs := []any{arg0} + for _, a := range arg1 { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "Connect", varargs...) + ret0, _ := ret[0].(ConnInterface) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Connect indicates an expected call of Connect. +func (mr *MockNATSConnectorMockRecorder) Connect(arg0 any, arg1 ...any) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]any{arg0}, arg1...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Connect", reflect.TypeOf((*MockNATSConnector)(nil).Connect), varargs...) +} + +// MockJetStreamCreator is a mock of JetStreamCreator interface. +type MockJetStreamCreator struct { + ctrl *gomock.Controller + recorder *MockJetStreamCreatorMockRecorder +} + +// MockJetStreamCreatorMockRecorder is the mock recorder for MockJetStreamCreator. +type MockJetStreamCreatorMockRecorder struct { + mock *MockJetStreamCreator +} + +// NewMockJetStreamCreator creates a new mock instance. +func NewMockJetStreamCreator(ctrl *gomock.Controller) *MockJetStreamCreator { + mock := &MockJetStreamCreator{ctrl: ctrl} + mock.recorder = &MockJetStreamCreatorMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockJetStreamCreator) EXPECT() *MockJetStreamCreatorMockRecorder { + return m.recorder +} + +// New mocks base method. +func (m *MockJetStreamCreator) New(arg0 *nats.Conn) (jetstream.JetStream, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "New", arg0) + ret0, _ := ret[0].(jetstream.JetStream) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// New indicates an expected call of New. +func (mr *MockJetStreamCreatorMockRecorder) New(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "New", reflect.TypeOf((*MockJetStreamCreator)(nil).New), arg0) +} + +// MockJetStreamClient is a mock of JetStreamClient interface. +type MockJetStreamClient struct { + ctrl *gomock.Controller + recorder *MockJetStreamClientMockRecorder +} + +// MockJetStreamClientMockRecorder is the mock recorder for MockJetStreamClient. +type MockJetStreamClientMockRecorder struct { + mock *MockJetStreamClient +} + +// NewMockJetStreamClient creates a new mock instance. +func NewMockJetStreamClient(ctrl *gomock.Controller) *MockJetStreamClient { + mock := &MockJetStreamClient{ctrl: ctrl} + mock.recorder = &MockJetStreamClientMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockJetStreamClient) EXPECT() *MockJetStreamClientMockRecorder { + return m.recorder +} + +// Close mocks base method. +func (m *MockJetStreamClient) Close(ctx context.Context) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Close", ctx) + ret0, _ := ret[0].(error) + return ret0 +} + +// Close indicates an expected call of Close. +func (mr *MockJetStreamClientMockRecorder) Close(ctx any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockJetStreamClient)(nil).Close), ctx) +} + +// CreateOrUpdateStream mocks base method. +func (m *MockJetStreamClient) CreateOrUpdateStream(ctx context.Context, cfg jetstream.StreamConfig) (jetstream.Stream, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CreateOrUpdateStream", ctx, cfg) + ret0, _ := ret[0].(jetstream.Stream) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// CreateOrUpdateStream indicates an expected call of CreateOrUpdateStream. +func (mr *MockJetStreamClientMockRecorder) CreateOrUpdateStream(ctx, cfg any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateOrUpdateStream", reflect.TypeOf((*MockJetStreamClient)(nil).CreateOrUpdateStream), ctx, cfg) +} + +// CreateStream mocks base method. +func (m *MockJetStreamClient) CreateStream(ctx context.Context, cfg StreamConfig) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CreateStream", ctx, cfg) + ret0, _ := ret[0].(error) + return ret0 +} + +// CreateStream indicates an expected call of CreateStream. +func (mr *MockJetStreamClientMockRecorder) CreateStream(ctx, cfg any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateStream", reflect.TypeOf((*MockJetStreamClient)(nil).CreateStream), ctx, cfg) +} + +// DeleteStream mocks base method. +func (m *MockJetStreamClient) DeleteStream(ctx context.Context, name string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DeleteStream", ctx, name) + ret0, _ := ret[0].(error) + return ret0 +} + +// DeleteStream indicates an expected call of DeleteStream. +func (mr *MockJetStreamClientMockRecorder) DeleteStream(ctx, name any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteStream", reflect.TypeOf((*MockJetStreamClient)(nil).DeleteStream), ctx, name) +} + +// Health mocks base method. +func (m *MockJetStreamClient) Health() datasource.Health { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Health") + ret0, _ := ret[0].(datasource.Health) + return ret0 +} + +// Health indicates an expected call of Health. +func (mr *MockJetStreamClientMockRecorder) Health() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Health", reflect.TypeOf((*MockJetStreamClient)(nil).Health)) +} + +// Publish mocks base method. +func (m *MockJetStreamClient) Publish(ctx context.Context, subject string, message []byte) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Publish", ctx, subject, message) + ret0, _ := ret[0].(error) + return ret0 +} + +// Publish indicates an expected call of Publish. +func (mr *MockJetStreamClientMockRecorder) Publish(ctx, subject, message any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Publish", reflect.TypeOf((*MockJetStreamClient)(nil).Publish), ctx, subject, message) +} + +// Subscribe mocks base method. +func (m *MockJetStreamClient) Subscribe(ctx context.Context, subject string, handler messageHandler) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Subscribe", ctx, subject, handler) + ret0, _ := ret[0].(error) + return ret0 +} + +// Subscribe indicates an expected call of Subscribe. +func (mr *MockJetStreamClientMockRecorder) Subscribe(ctx, subject, handler any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Subscribe", reflect.TypeOf((*MockJetStreamClient)(nil).Subscribe), ctx, subject, handler) +} diff --git a/pkg/gofr/datasource/pubsub/nats/mock_jetstream.go b/pkg/gofr/datasource/pubsub/nats/mock_jetstream.go new file mode 100644 index 000000000..22b124267 --- /dev/null +++ b/pkg/gofr/datasource/pubsub/nats/mock_jetstream.go @@ -0,0 +1,1262 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/nats-io/nats.go/jetstream (interfaces: JetStream,Stream,Consumer,Msg,MessageBatch) +// +// Generated by this command: +// +// mockgen -destination=mock_jetstream.go -package=nats github.com/nats-io/nats.go/jetstream JetStream,Stream,Consumer,Msg,MessageBatch +// + +// Package nats is a generated GoMock package. +package nats + +import ( + context "context" + reflect "reflect" + time "time" + + nats "github.com/nats-io/nats.go" + jetstream "github.com/nats-io/nats.go/jetstream" + gomock "go.uber.org/mock/gomock" +) + +// MockJetStream is a mock of JetStream interface. +type MockJetStream struct { + ctrl *gomock.Controller + recorder *MockJetStreamMockRecorder +} + +// MockJetStreamMockRecorder is the mock recorder for MockJetStream. +type MockJetStreamMockRecorder struct { + mock *MockJetStream +} + +// NewMockJetStream creates a new mock instance. +func NewMockJetStream(ctrl *gomock.Controller) *MockJetStream { + mock := &MockJetStream{ctrl: ctrl} + mock.recorder = &MockJetStreamMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockJetStream) EXPECT() *MockJetStreamMockRecorder { + return m.recorder +} + +// AccountInfo mocks base method. +func (m *MockJetStream) AccountInfo(arg0 context.Context) (*jetstream.AccountInfo, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "AccountInfo", arg0) + ret0, _ := ret[0].(*jetstream.AccountInfo) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// AccountInfo indicates an expected call of AccountInfo. +func (mr *MockJetStreamMockRecorder) AccountInfo(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AccountInfo", reflect.TypeOf((*MockJetStream)(nil).AccountInfo), arg0) +} + +// CleanupPublisher mocks base method. +func (m *MockJetStream) CleanupPublisher() { + m.ctrl.T.Helper() + m.ctrl.Call(m, "CleanupPublisher") +} + +// CleanupPublisher indicates an expected call of CleanupPublisher. +func (mr *MockJetStreamMockRecorder) CleanupPublisher() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CleanupPublisher", reflect.TypeOf((*MockJetStream)(nil).CleanupPublisher)) +} + +// Consumer mocks base method. +func (m *MockJetStream) Consumer(arg0 context.Context, arg1, arg2 string) (jetstream.Consumer, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Consumer", arg0, arg1, arg2) + ret0, _ := ret[0].(jetstream.Consumer) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Consumer indicates an expected call of Consumer. +func (mr *MockJetStreamMockRecorder) Consumer(arg0, arg1, arg2 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Consumer", reflect.TypeOf((*MockJetStream)(nil).Consumer), arg0, arg1, arg2) +} + +// CreateConsumer mocks base method. +func (m *MockJetStream) CreateConsumer(arg0 context.Context, arg1 string, arg2 jetstream.ConsumerConfig) (jetstream.Consumer, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CreateConsumer", arg0, arg1, arg2) + ret0, _ := ret[0].(jetstream.Consumer) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// CreateConsumer indicates an expected call of CreateConsumer. +func (mr *MockJetStreamMockRecorder) CreateConsumer(arg0, arg1, arg2 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateConsumer", reflect.TypeOf((*MockJetStream)(nil).CreateConsumer), arg0, arg1, arg2) +} + +// CreateKeyValue mocks base method. +func (m *MockJetStream) CreateKeyValue(arg0 context.Context, arg1 jetstream.KeyValueConfig) (jetstream.KeyValue, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CreateKeyValue", arg0, arg1) + ret0, _ := ret[0].(jetstream.KeyValue) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// CreateKeyValue indicates an expected call of CreateKeyValue. +func (mr *MockJetStreamMockRecorder) CreateKeyValue(arg0, arg1 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateKeyValue", reflect.TypeOf((*MockJetStream)(nil).CreateKeyValue), arg0, arg1) +} + +// CreateObjectStore mocks base method. +func (m *MockJetStream) CreateObjectStore(arg0 context.Context, arg1 jetstream.ObjectStoreConfig) (jetstream.ObjectStore, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CreateObjectStore", arg0, arg1) + ret0, _ := ret[0].(jetstream.ObjectStore) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// CreateObjectStore indicates an expected call of CreateObjectStore. +func (mr *MockJetStreamMockRecorder) CreateObjectStore(arg0, arg1 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateObjectStore", reflect.TypeOf((*MockJetStream)(nil).CreateObjectStore), arg0, arg1) +} + +// CreateOrUpdateConsumer mocks base method. +func (m *MockJetStream) CreateOrUpdateConsumer(arg0 context.Context, arg1 string, arg2 jetstream.ConsumerConfig) (jetstream.Consumer, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CreateOrUpdateConsumer", arg0, arg1, arg2) + ret0, _ := ret[0].(jetstream.Consumer) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// CreateOrUpdateConsumer indicates an expected call of CreateOrUpdateConsumer. +func (mr *MockJetStreamMockRecorder) CreateOrUpdateConsumer(arg0, arg1, arg2 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateOrUpdateConsumer", reflect.TypeOf((*MockJetStream)(nil).CreateOrUpdateConsumer), arg0, arg1, arg2) +} + +// CreateOrUpdateKeyValue mocks base method. +func (m *MockJetStream) CreateOrUpdateKeyValue(arg0 context.Context, arg1 jetstream.KeyValueConfig) (jetstream.KeyValue, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CreateOrUpdateKeyValue", arg0, arg1) + ret0, _ := ret[0].(jetstream.KeyValue) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// CreateOrUpdateKeyValue indicates an expected call of CreateOrUpdateKeyValue. +func (mr *MockJetStreamMockRecorder) CreateOrUpdateKeyValue(arg0, arg1 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateOrUpdateKeyValue", reflect.TypeOf((*MockJetStream)(nil).CreateOrUpdateKeyValue), arg0, arg1) +} + +// CreateOrUpdateObjectStore mocks base method. +func (m *MockJetStream) CreateOrUpdateObjectStore(arg0 context.Context, arg1 jetstream.ObjectStoreConfig) (jetstream.ObjectStore, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CreateOrUpdateObjectStore", arg0, arg1) + ret0, _ := ret[0].(jetstream.ObjectStore) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// CreateOrUpdateObjectStore indicates an expected call of CreateOrUpdateObjectStore. +func (mr *MockJetStreamMockRecorder) CreateOrUpdateObjectStore(arg0, arg1 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateOrUpdateObjectStore", reflect.TypeOf((*MockJetStream)(nil).CreateOrUpdateObjectStore), arg0, arg1) +} + +// CreateOrUpdateStream mocks base method. +func (m *MockJetStream) CreateOrUpdateStream(arg0 context.Context, arg1 jetstream.StreamConfig) (jetstream.Stream, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CreateOrUpdateStream", arg0, arg1) + ret0, _ := ret[0].(jetstream.Stream) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// CreateOrUpdateStream indicates an expected call of CreateOrUpdateStream. +func (mr *MockJetStreamMockRecorder) CreateOrUpdateStream(arg0, arg1 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateOrUpdateStream", reflect.TypeOf((*MockJetStream)(nil).CreateOrUpdateStream), arg0, arg1) +} + +// CreateStream mocks base method. +func (m *MockJetStream) CreateStream(arg0 context.Context, arg1 jetstream.StreamConfig) (jetstream.Stream, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CreateStream", arg0, arg1) + ret0, _ := ret[0].(jetstream.Stream) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// CreateStream indicates an expected call of CreateStream. +func (mr *MockJetStreamMockRecorder) CreateStream(arg0, arg1 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateStream", reflect.TypeOf((*MockJetStream)(nil).CreateStream), arg0, arg1) +} + +// DeleteConsumer mocks base method. +func (m *MockJetStream) DeleteConsumer(arg0 context.Context, arg1, arg2 string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DeleteConsumer", arg0, arg1, arg2) + ret0, _ := ret[0].(error) + return ret0 +} + +// DeleteConsumer indicates an expected call of DeleteConsumer. +func (mr *MockJetStreamMockRecorder) DeleteConsumer(arg0, arg1, arg2 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteConsumer", reflect.TypeOf((*MockJetStream)(nil).DeleteConsumer), arg0, arg1, arg2) +} + +// DeleteKeyValue mocks base method. +func (m *MockJetStream) DeleteKeyValue(arg0 context.Context, arg1 string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DeleteKeyValue", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// DeleteKeyValue indicates an expected call of DeleteKeyValue. +func (mr *MockJetStreamMockRecorder) DeleteKeyValue(arg0, arg1 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteKeyValue", reflect.TypeOf((*MockJetStream)(nil).DeleteKeyValue), arg0, arg1) +} + +// DeleteObjectStore mocks base method. +func (m *MockJetStream) DeleteObjectStore(arg0 context.Context, arg1 string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DeleteObjectStore", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// DeleteObjectStore indicates an expected call of DeleteObjectStore. +func (mr *MockJetStreamMockRecorder) DeleteObjectStore(arg0, arg1 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteObjectStore", reflect.TypeOf((*MockJetStream)(nil).DeleteObjectStore), arg0, arg1) +} + +// DeleteStream mocks base method. +func (m *MockJetStream) DeleteStream(arg0 context.Context, arg1 string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DeleteStream", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// DeleteStream indicates an expected call of DeleteStream. +func (mr *MockJetStreamMockRecorder) DeleteStream(arg0, arg1 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteStream", reflect.TypeOf((*MockJetStream)(nil).DeleteStream), arg0, arg1) +} + +// KeyValue mocks base method. +func (m *MockJetStream) KeyValue(arg0 context.Context, arg1 string) (jetstream.KeyValue, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "KeyValue", arg0, arg1) + ret0, _ := ret[0].(jetstream.KeyValue) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// KeyValue indicates an expected call of KeyValue. +func (mr *MockJetStreamMockRecorder) KeyValue(arg0, arg1 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "KeyValue", reflect.TypeOf((*MockJetStream)(nil).KeyValue), arg0, arg1) +} + +// KeyValueStoreNames mocks base method. +func (m *MockJetStream) KeyValueStoreNames(arg0 context.Context) jetstream.KeyValueNamesLister { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "KeyValueStoreNames", arg0) + ret0, _ := ret[0].(jetstream.KeyValueNamesLister) + return ret0 +} + +// KeyValueStoreNames indicates an expected call of KeyValueStoreNames. +func (mr *MockJetStreamMockRecorder) KeyValueStoreNames(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "KeyValueStoreNames", reflect.TypeOf((*MockJetStream)(nil).KeyValueStoreNames), arg0) +} + +// KeyValueStores mocks base method. +func (m *MockJetStream) KeyValueStores(arg0 context.Context) jetstream.KeyValueLister { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "KeyValueStores", arg0) + ret0, _ := ret[0].(jetstream.KeyValueLister) + return ret0 +} + +// KeyValueStores indicates an expected call of KeyValueStores. +func (mr *MockJetStreamMockRecorder) KeyValueStores(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "KeyValueStores", reflect.TypeOf((*MockJetStream)(nil).KeyValueStores), arg0) +} + +// ListStreams mocks base method. +func (m *MockJetStream) ListStreams(arg0 context.Context, arg1 ...jetstream.StreamListOpt) jetstream.StreamInfoLister { + m.ctrl.T.Helper() + varargs := []any{arg0} + for _, a := range arg1 { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "ListStreams", varargs...) + ret0, _ := ret[0].(jetstream.StreamInfoLister) + return ret0 +} + +// ListStreams indicates an expected call of ListStreams. +func (mr *MockJetStreamMockRecorder) ListStreams(arg0 any, arg1 ...any) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]any{arg0}, arg1...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListStreams", reflect.TypeOf((*MockJetStream)(nil).ListStreams), varargs...) +} + +// ObjectStore mocks base method. +func (m *MockJetStream) ObjectStore(arg0 context.Context, arg1 string) (jetstream.ObjectStore, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ObjectStore", arg0, arg1) + ret0, _ := ret[0].(jetstream.ObjectStore) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ObjectStore indicates an expected call of ObjectStore. +func (mr *MockJetStreamMockRecorder) ObjectStore(arg0, arg1 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ObjectStore", reflect.TypeOf((*MockJetStream)(nil).ObjectStore), arg0, arg1) +} + +// ObjectStoreNames mocks base method. +func (m *MockJetStream) ObjectStoreNames(arg0 context.Context) jetstream.ObjectStoreNamesLister { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ObjectStoreNames", arg0) + ret0, _ := ret[0].(jetstream.ObjectStoreNamesLister) + return ret0 +} + +// ObjectStoreNames indicates an expected call of ObjectStoreNames. +func (mr *MockJetStreamMockRecorder) ObjectStoreNames(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ObjectStoreNames", reflect.TypeOf((*MockJetStream)(nil).ObjectStoreNames), arg0) +} + +// ObjectStores mocks base method. +func (m *MockJetStream) ObjectStores(arg0 context.Context) jetstream.ObjectStoresLister { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ObjectStores", arg0) + ret0, _ := ret[0].(jetstream.ObjectStoresLister) + return ret0 +} + +// ObjectStores indicates an expected call of ObjectStores. +func (mr *MockJetStreamMockRecorder) ObjectStores(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ObjectStores", reflect.TypeOf((*MockJetStream)(nil).ObjectStores), arg0) +} + +// OrderedConsumer mocks base method. +func (m *MockJetStream) OrderedConsumer(arg0 context.Context, arg1 string, arg2 jetstream.OrderedConsumerConfig) (jetstream.Consumer, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "OrderedConsumer", arg0, arg1, arg2) + ret0, _ := ret[0].(jetstream.Consumer) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// OrderedConsumer indicates an expected call of OrderedConsumer. +func (mr *MockJetStreamMockRecorder) OrderedConsumer(arg0, arg1, arg2 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OrderedConsumer", reflect.TypeOf((*MockJetStream)(nil).OrderedConsumer), arg0, arg1, arg2) +} + +// Publish mocks base method. +func (m *MockJetStream) Publish(arg0 context.Context, arg1 string, arg2 []byte, arg3 ...jetstream.PublishOpt) (*jetstream.PubAck, error) { + m.ctrl.T.Helper() + varargs := []any{arg0, arg1, arg2} + for _, a := range arg3 { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "Publish", varargs...) + ret0, _ := ret[0].(*jetstream.PubAck) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Publish indicates an expected call of Publish. +func (mr *MockJetStreamMockRecorder) Publish(arg0, arg1, arg2 any, arg3 ...any) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]any{arg0, arg1, arg2}, arg3...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Publish", reflect.TypeOf((*MockJetStream)(nil).Publish), varargs...) +} + +// PublishAsync mocks base method. +func (m *MockJetStream) PublishAsync(arg0 string, arg1 []byte, arg2 ...jetstream.PublishOpt) (jetstream.PubAckFuture, error) { + m.ctrl.T.Helper() + varargs := []any{arg0, arg1} + for _, a := range arg2 { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "PublishAsync", varargs...) + ret0, _ := ret[0].(jetstream.PubAckFuture) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// PublishAsync indicates an expected call of PublishAsync. +func (mr *MockJetStreamMockRecorder) PublishAsync(arg0, arg1 any, arg2 ...any) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]any{arg0, arg1}, arg2...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PublishAsync", reflect.TypeOf((*MockJetStream)(nil).PublishAsync), varargs...) +} + +// PublishAsyncComplete mocks base method. +func (m *MockJetStream) PublishAsyncComplete() <-chan struct{} { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "PublishAsyncComplete") + ret0, _ := ret[0].(<-chan struct{}) + return ret0 +} + +// PublishAsyncComplete indicates an expected call of PublishAsyncComplete. +func (mr *MockJetStreamMockRecorder) PublishAsyncComplete() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PublishAsyncComplete", reflect.TypeOf((*MockJetStream)(nil).PublishAsyncComplete)) +} + +// PublishAsyncPending mocks base method. +func (m *MockJetStream) PublishAsyncPending() int { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "PublishAsyncPending") + ret0, _ := ret[0].(int) + return ret0 +} + +// PublishAsyncPending indicates an expected call of PublishAsyncPending. +func (mr *MockJetStreamMockRecorder) PublishAsyncPending() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PublishAsyncPending", reflect.TypeOf((*MockJetStream)(nil).PublishAsyncPending)) +} + +// PublishMsg mocks base method. +func (m *MockJetStream) PublishMsg(arg0 context.Context, arg1 *nats.Msg, arg2 ...jetstream.PublishOpt) (*jetstream.PubAck, error) { + m.ctrl.T.Helper() + varargs := []any{arg0, arg1} + for _, a := range arg2 { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "PublishMsg", varargs...) + ret0, _ := ret[0].(*jetstream.PubAck) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// PublishMsg indicates an expected call of PublishMsg. +func (mr *MockJetStreamMockRecorder) PublishMsg(arg0, arg1 any, arg2 ...any) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]any{arg0, arg1}, arg2...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PublishMsg", reflect.TypeOf((*MockJetStream)(nil).PublishMsg), varargs...) +} + +// PublishMsgAsync mocks base method. +func (m *MockJetStream) PublishMsgAsync(arg0 *nats.Msg, arg1 ...jetstream.PublishOpt) (jetstream.PubAckFuture, error) { + m.ctrl.T.Helper() + varargs := []any{arg0} + for _, a := range arg1 { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "PublishMsgAsync", varargs...) + ret0, _ := ret[0].(jetstream.PubAckFuture) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// PublishMsgAsync indicates an expected call of PublishMsgAsync. +func (mr *MockJetStreamMockRecorder) PublishMsgAsync(arg0 any, arg1 ...any) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]any{arg0}, arg1...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PublishMsgAsync", reflect.TypeOf((*MockJetStream)(nil).PublishMsgAsync), varargs...) +} + +// Stream mocks base method. +func (m *MockJetStream) Stream(arg0 context.Context, arg1 string) (jetstream.Stream, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Stream", arg0, arg1) + ret0, _ := ret[0].(jetstream.Stream) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Stream indicates an expected call of Stream. +func (mr *MockJetStreamMockRecorder) Stream(arg0, arg1 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Stream", reflect.TypeOf((*MockJetStream)(nil).Stream), arg0, arg1) +} + +// StreamNameBySubject mocks base method. +func (m *MockJetStream) StreamNameBySubject(arg0 context.Context, arg1 string) (string, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "StreamNameBySubject", arg0, arg1) + ret0, _ := ret[0].(string) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// StreamNameBySubject indicates an expected call of StreamNameBySubject. +func (mr *MockJetStreamMockRecorder) StreamNameBySubject(arg0, arg1 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StreamNameBySubject", reflect.TypeOf((*MockJetStream)(nil).StreamNameBySubject), arg0, arg1) +} + +// StreamNames mocks base method. +func (m *MockJetStream) StreamNames(arg0 context.Context, arg1 ...jetstream.StreamListOpt) jetstream.StreamNameLister { + m.ctrl.T.Helper() + varargs := []any{arg0} + for _, a := range arg1 { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "StreamNames", varargs...) + ret0, _ := ret[0].(jetstream.StreamNameLister) + return ret0 +} + +// StreamNames indicates an expected call of StreamNames. +func (mr *MockJetStreamMockRecorder) StreamNames(arg0 any, arg1 ...any) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]any{arg0}, arg1...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StreamNames", reflect.TypeOf((*MockJetStream)(nil).StreamNames), varargs...) +} + +// UpdateConsumer mocks base method. +func (m *MockJetStream) UpdateConsumer(arg0 context.Context, arg1 string, arg2 jetstream.ConsumerConfig) (jetstream.Consumer, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "UpdateConsumer", arg0, arg1, arg2) + ret0, _ := ret[0].(jetstream.Consumer) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// UpdateConsumer indicates an expected call of UpdateConsumer. +func (mr *MockJetStreamMockRecorder) UpdateConsumer(arg0, arg1, arg2 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateConsumer", reflect.TypeOf((*MockJetStream)(nil).UpdateConsumer), arg0, arg1, arg2) +} + +// UpdateKeyValue mocks base method. +func (m *MockJetStream) UpdateKeyValue(arg0 context.Context, arg1 jetstream.KeyValueConfig) (jetstream.KeyValue, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "UpdateKeyValue", arg0, arg1) + ret0, _ := ret[0].(jetstream.KeyValue) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// UpdateKeyValue indicates an expected call of UpdateKeyValue. +func (mr *MockJetStreamMockRecorder) UpdateKeyValue(arg0, arg1 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateKeyValue", reflect.TypeOf((*MockJetStream)(nil).UpdateKeyValue), arg0, arg1) +} + +// UpdateObjectStore mocks base method. +func (m *MockJetStream) UpdateObjectStore(arg0 context.Context, arg1 jetstream.ObjectStoreConfig) (jetstream.ObjectStore, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "UpdateObjectStore", arg0, arg1) + ret0, _ := ret[0].(jetstream.ObjectStore) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// UpdateObjectStore indicates an expected call of UpdateObjectStore. +func (mr *MockJetStreamMockRecorder) UpdateObjectStore(arg0, arg1 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateObjectStore", reflect.TypeOf((*MockJetStream)(nil).UpdateObjectStore), arg0, arg1) +} + +// UpdateStream mocks base method. +func (m *MockJetStream) UpdateStream(arg0 context.Context, arg1 jetstream.StreamConfig) (jetstream.Stream, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "UpdateStream", arg0, arg1) + ret0, _ := ret[0].(jetstream.Stream) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// UpdateStream indicates an expected call of UpdateStream. +func (mr *MockJetStreamMockRecorder) UpdateStream(arg0, arg1 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateStream", reflect.TypeOf((*MockJetStream)(nil).UpdateStream), arg0, arg1) +} + +// MockStream is a mock of Stream interface. +type MockStream struct { + ctrl *gomock.Controller + recorder *MockStreamMockRecorder +} + +// MockStreamMockRecorder is the mock recorder for MockStream. +type MockStreamMockRecorder struct { + mock *MockStream +} + +// NewMockStream creates a new mock instance. +func NewMockStream(ctrl *gomock.Controller) *MockStream { + mock := &MockStream{ctrl: ctrl} + mock.recorder = &MockStreamMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockStream) EXPECT() *MockStreamMockRecorder { + return m.recorder +} + +// CachedInfo mocks base method. +func (m *MockStream) CachedInfo() *jetstream.StreamInfo { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CachedInfo") + ret0, _ := ret[0].(*jetstream.StreamInfo) + return ret0 +} + +// CachedInfo indicates an expected call of CachedInfo. +func (mr *MockStreamMockRecorder) CachedInfo() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CachedInfo", reflect.TypeOf((*MockStream)(nil).CachedInfo)) +} + +// Consumer mocks base method. +func (m *MockStream) Consumer(arg0 context.Context, arg1 string) (jetstream.Consumer, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Consumer", arg0, arg1) + ret0, _ := ret[0].(jetstream.Consumer) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Consumer indicates an expected call of Consumer. +func (mr *MockStreamMockRecorder) Consumer(arg0, arg1 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Consumer", reflect.TypeOf((*MockStream)(nil).Consumer), arg0, arg1) +} + +// ConsumerNames mocks base method. +func (m *MockStream) ConsumerNames(arg0 context.Context) jetstream.ConsumerNameLister { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ConsumerNames", arg0) + ret0, _ := ret[0].(jetstream.ConsumerNameLister) + return ret0 +} + +// ConsumerNames indicates an expected call of ConsumerNames. +func (mr *MockStreamMockRecorder) ConsumerNames(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ConsumerNames", reflect.TypeOf((*MockStream)(nil).ConsumerNames), arg0) +} + +// CreateConsumer mocks base method. +func (m *MockStream) CreateConsumer(arg0 context.Context, arg1 jetstream.ConsumerConfig) (jetstream.Consumer, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CreateConsumer", arg0, arg1) + ret0, _ := ret[0].(jetstream.Consumer) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// CreateConsumer indicates an expected call of CreateConsumer. +func (mr *MockStreamMockRecorder) CreateConsumer(arg0, arg1 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateConsumer", reflect.TypeOf((*MockStream)(nil).CreateConsumer), arg0, arg1) +} + +// CreateOrUpdateConsumer mocks base method. +func (m *MockStream) CreateOrUpdateConsumer(arg0 context.Context, arg1 jetstream.ConsumerConfig) (jetstream.Consumer, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CreateOrUpdateConsumer", arg0, arg1) + ret0, _ := ret[0].(jetstream.Consumer) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// CreateOrUpdateConsumer indicates an expected call of CreateOrUpdateConsumer. +func (mr *MockStreamMockRecorder) CreateOrUpdateConsumer(arg0, arg1 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateOrUpdateConsumer", reflect.TypeOf((*MockStream)(nil).CreateOrUpdateConsumer), arg0, arg1) +} + +// DeleteConsumer mocks base method. +func (m *MockStream) DeleteConsumer(arg0 context.Context, arg1 string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DeleteConsumer", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// DeleteConsumer indicates an expected call of DeleteConsumer. +func (mr *MockStreamMockRecorder) DeleteConsumer(arg0, arg1 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteConsumer", reflect.TypeOf((*MockStream)(nil).DeleteConsumer), arg0, arg1) +} + +// DeleteMsg mocks base method. +func (m *MockStream) DeleteMsg(arg0 context.Context, arg1 uint64) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DeleteMsg", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// DeleteMsg indicates an expected call of DeleteMsg. +func (mr *MockStreamMockRecorder) DeleteMsg(arg0, arg1 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteMsg", reflect.TypeOf((*MockStream)(nil).DeleteMsg), arg0, arg1) +} + +// GetLastMsgForSubject mocks base method. +func (m *MockStream) GetLastMsgForSubject(arg0 context.Context, arg1 string) (*jetstream.RawStreamMsg, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetLastMsgForSubject", arg0, arg1) + ret0, _ := ret[0].(*jetstream.RawStreamMsg) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetLastMsgForSubject indicates an expected call of GetLastMsgForSubject. +func (mr *MockStreamMockRecorder) GetLastMsgForSubject(arg0, arg1 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetLastMsgForSubject", reflect.TypeOf((*MockStream)(nil).GetLastMsgForSubject), arg0, arg1) +} + +// GetMsg mocks base method. +func (m *MockStream) GetMsg(arg0 context.Context, arg1 uint64, arg2 ...jetstream.GetMsgOpt) (*jetstream.RawStreamMsg, error) { + m.ctrl.T.Helper() + varargs := []any{arg0, arg1} + for _, a := range arg2 { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "GetMsg", varargs...) + ret0, _ := ret[0].(*jetstream.RawStreamMsg) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetMsg indicates an expected call of GetMsg. +func (mr *MockStreamMockRecorder) GetMsg(arg0, arg1 any, arg2 ...any) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]any{arg0, arg1}, arg2...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetMsg", reflect.TypeOf((*MockStream)(nil).GetMsg), varargs...) +} + +// Info mocks base method. +func (m *MockStream) Info(arg0 context.Context, arg1 ...jetstream.StreamInfoOpt) (*jetstream.StreamInfo, error) { + m.ctrl.T.Helper() + varargs := []any{arg0} + for _, a := range arg1 { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "Info", varargs...) + ret0, _ := ret[0].(*jetstream.StreamInfo) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Info indicates an expected call of Info. +func (mr *MockStreamMockRecorder) Info(arg0 any, arg1 ...any) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]any{arg0}, arg1...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Info", reflect.TypeOf((*MockStream)(nil).Info), varargs...) +} + +// ListConsumers mocks base method. +func (m *MockStream) ListConsumers(arg0 context.Context) jetstream.ConsumerInfoLister { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ListConsumers", arg0) + ret0, _ := ret[0].(jetstream.ConsumerInfoLister) + return ret0 +} + +// ListConsumers indicates an expected call of ListConsumers. +func (mr *MockStreamMockRecorder) ListConsumers(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListConsumers", reflect.TypeOf((*MockStream)(nil).ListConsumers), arg0) +} + +// OrderedConsumer mocks base method. +func (m *MockStream) OrderedConsumer(arg0 context.Context, arg1 jetstream.OrderedConsumerConfig) (jetstream.Consumer, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "OrderedConsumer", arg0, arg1) + ret0, _ := ret[0].(jetstream.Consumer) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// OrderedConsumer indicates an expected call of OrderedConsumer. +func (mr *MockStreamMockRecorder) OrderedConsumer(arg0, arg1 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OrderedConsumer", reflect.TypeOf((*MockStream)(nil).OrderedConsumer), arg0, arg1) +} + +// Purge mocks base method. +func (m *MockStream) Purge(arg0 context.Context, arg1 ...jetstream.StreamPurgeOpt) error { + m.ctrl.T.Helper() + varargs := []any{arg0} + for _, a := range arg1 { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "Purge", varargs...) + ret0, _ := ret[0].(error) + return ret0 +} + +// Purge indicates an expected call of Purge. +func (mr *MockStreamMockRecorder) Purge(arg0 any, arg1 ...any) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]any{arg0}, arg1...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Purge", reflect.TypeOf((*MockStream)(nil).Purge), varargs...) +} + +// SecureDeleteMsg mocks base method. +func (m *MockStream) SecureDeleteMsg(arg0 context.Context, arg1 uint64) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SecureDeleteMsg", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// SecureDeleteMsg indicates an expected call of SecureDeleteMsg. +func (mr *MockStreamMockRecorder) SecureDeleteMsg(arg0, arg1 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SecureDeleteMsg", reflect.TypeOf((*MockStream)(nil).SecureDeleteMsg), arg0, arg1) +} + +// UpdateConsumer mocks base method. +func (m *MockStream) UpdateConsumer(arg0 context.Context, arg1 jetstream.ConsumerConfig) (jetstream.Consumer, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "UpdateConsumer", arg0, arg1) + ret0, _ := ret[0].(jetstream.Consumer) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// UpdateConsumer indicates an expected call of UpdateConsumer. +func (mr *MockStreamMockRecorder) UpdateConsumer(arg0, arg1 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateConsumer", reflect.TypeOf((*MockStream)(nil).UpdateConsumer), arg0, arg1) +} + +// MockConsumer is a mock of Consumer interface. +type MockConsumer struct { + ctrl *gomock.Controller + recorder *MockConsumerMockRecorder +} + +// MockConsumerMockRecorder is the mock recorder for MockConsumer. +type MockConsumerMockRecorder struct { + mock *MockConsumer +} + +// NewMockConsumer creates a new mock instance. +func NewMockConsumer(ctrl *gomock.Controller) *MockConsumer { + mock := &MockConsumer{ctrl: ctrl} + mock.recorder = &MockConsumerMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockConsumer) EXPECT() *MockConsumerMockRecorder { + return m.recorder +} + +// CachedInfo mocks base method. +func (m *MockConsumer) CachedInfo() *jetstream.ConsumerInfo { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CachedInfo") + ret0, _ := ret[0].(*jetstream.ConsumerInfo) + return ret0 +} + +// CachedInfo indicates an expected call of CachedInfo. +func (mr *MockConsumerMockRecorder) CachedInfo() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CachedInfo", reflect.TypeOf((*MockConsumer)(nil).CachedInfo)) +} + +// Consume mocks base method. +func (m *MockConsumer) Consume(arg0 jetstream.MessageHandler, arg1 ...jetstream.PullConsumeOpt) (jetstream.ConsumeContext, error) { + m.ctrl.T.Helper() + varargs := []any{arg0} + for _, a := range arg1 { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "Consume", varargs...) + ret0, _ := ret[0].(jetstream.ConsumeContext) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Consume indicates an expected call of Consume. +func (mr *MockConsumerMockRecorder) Consume(arg0 any, arg1 ...any) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]any{arg0}, arg1...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Consume", reflect.TypeOf((*MockConsumer)(nil).Consume), varargs...) +} + +// Fetch mocks base method. +func (m *MockConsumer) Fetch(arg0 int, arg1 ...jetstream.FetchOpt) (jetstream.MessageBatch, error) { + m.ctrl.T.Helper() + varargs := []any{arg0} + for _, a := range arg1 { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "Fetch", varargs...) + ret0, _ := ret[0].(jetstream.MessageBatch) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Fetch indicates an expected call of Fetch. +func (mr *MockConsumerMockRecorder) Fetch(arg0 any, arg1 ...any) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]any{arg0}, arg1...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Fetch", reflect.TypeOf((*MockConsumer)(nil).Fetch), varargs...) +} + +// FetchBytes mocks base method. +func (m *MockConsumer) FetchBytes(arg0 int, arg1 ...jetstream.FetchOpt) (jetstream.MessageBatch, error) { + m.ctrl.T.Helper() + varargs := []any{arg0} + for _, a := range arg1 { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "FetchBytes", varargs...) + ret0, _ := ret[0].(jetstream.MessageBatch) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// FetchBytes indicates an expected call of FetchBytes. +func (mr *MockConsumerMockRecorder) FetchBytes(arg0 any, arg1 ...any) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]any{arg0}, arg1...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchBytes", reflect.TypeOf((*MockConsumer)(nil).FetchBytes), varargs...) +} + +// FetchNoWait mocks base method. +func (m *MockConsumer) FetchNoWait(arg0 int) (jetstream.MessageBatch, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "FetchNoWait", arg0) + ret0, _ := ret[0].(jetstream.MessageBatch) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// FetchNoWait indicates an expected call of FetchNoWait. +func (mr *MockConsumerMockRecorder) FetchNoWait(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchNoWait", reflect.TypeOf((*MockConsumer)(nil).FetchNoWait), arg0) +} + +// Info mocks base method. +func (m *MockConsumer) Info(arg0 context.Context) (*jetstream.ConsumerInfo, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Info", arg0) + ret0, _ := ret[0].(*jetstream.ConsumerInfo) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Info indicates an expected call of Info. +func (mr *MockConsumerMockRecorder) Info(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Info", reflect.TypeOf((*MockConsumer)(nil).Info), arg0) +} + +// Messages mocks base method. +func (m *MockConsumer) Messages(arg0 ...jetstream.PullMessagesOpt) (jetstream.MessagesContext, error) { + m.ctrl.T.Helper() + varargs := []any{} + for _, a := range arg0 { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "Messages", varargs...) + ret0, _ := ret[0].(jetstream.MessagesContext) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Messages indicates an expected call of Messages. +func (mr *MockConsumerMockRecorder) Messages(arg0 ...any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Messages", reflect.TypeOf((*MockConsumer)(nil).Messages), arg0...) +} + +// Next mocks base method. +func (m *MockConsumer) Next(arg0 ...jetstream.FetchOpt) (jetstream.Msg, error) { + m.ctrl.T.Helper() + varargs := []any{} + for _, a := range arg0 { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "Next", varargs...) + ret0, _ := ret[0].(jetstream.Msg) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Next indicates an expected call of Next. +func (mr *MockConsumerMockRecorder) Next(arg0 ...any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Next", reflect.TypeOf((*MockConsumer)(nil).Next), arg0...) +} + +// MockMsg is a mock of Msg interface. +type MockMsg struct { + ctrl *gomock.Controller + recorder *MockMsgMockRecorder +} + +// MockMsgMockRecorder is the mock recorder for MockMsg. +type MockMsgMockRecorder struct { + mock *MockMsg +} + +// NewMockMsg creates a new mock instance. +func NewMockMsg(ctrl *gomock.Controller) *MockMsg { + mock := &MockMsg{ctrl: ctrl} + mock.recorder = &MockMsgMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockMsg) EXPECT() *MockMsgMockRecorder { + return m.recorder +} + +// Ack mocks base method. +func (m *MockMsg) Ack() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Ack") + ret0, _ := ret[0].(error) + return ret0 +} + +// Ack indicates an expected call of Ack. +func (mr *MockMsgMockRecorder) Ack() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Ack", reflect.TypeOf((*MockMsg)(nil).Ack)) +} + +// Data mocks base method. +func (m *MockMsg) Data() []byte { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Data") + ret0, _ := ret[0].([]byte) + return ret0 +} + +// Data indicates an expected call of Data. +func (mr *MockMsgMockRecorder) Data() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Data", reflect.TypeOf((*MockMsg)(nil).Data)) +} + +// DoubleAck mocks base method. +func (m *MockMsg) DoubleAck(arg0 context.Context) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DoubleAck", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// DoubleAck indicates an expected call of DoubleAck. +func (mr *MockMsgMockRecorder) DoubleAck(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DoubleAck", reflect.TypeOf((*MockMsg)(nil).DoubleAck), arg0) +} + +// Headers mocks base method. +func (m *MockMsg) Headers() nats.Header { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Headers") + ret0, _ := ret[0].(nats.Header) + return ret0 +} + +// Headers indicates an expected call of Headers. +func (mr *MockMsgMockRecorder) Headers() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Headers", reflect.TypeOf((*MockMsg)(nil).Headers)) +} + +// InProgress mocks base method. +func (m *MockMsg) InProgress() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "InProgress") + ret0, _ := ret[0].(error) + return ret0 +} + +// InProgress indicates an expected call of InProgress. +func (mr *MockMsgMockRecorder) InProgress() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InProgress", reflect.TypeOf((*MockMsg)(nil).InProgress)) +} + +// Metadata mocks base method. +func (m *MockMsg) Metadata() (*jetstream.MsgMetadata, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Metadata") + ret0, _ := ret[0].(*jetstream.MsgMetadata) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Metadata indicates an expected call of Metadata. +func (mr *MockMsgMockRecorder) Metadata() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Metadata", reflect.TypeOf((*MockMsg)(nil).Metadata)) +} + +// Nak mocks base method. +func (m *MockMsg) Nak() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Nak") + ret0, _ := ret[0].(error) + return ret0 +} + +// Nak indicates an expected call of Nak. +func (mr *MockMsgMockRecorder) Nak() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Nak", reflect.TypeOf((*MockMsg)(nil).Nak)) +} + +// NakWithDelay mocks base method. +func (m *MockMsg) NakWithDelay(arg0 time.Duration) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "NakWithDelay", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// NakWithDelay indicates an expected call of NakWithDelay. +func (mr *MockMsgMockRecorder) NakWithDelay(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NakWithDelay", reflect.TypeOf((*MockMsg)(nil).NakWithDelay), arg0) +} + +// Reply mocks base method. +func (m *MockMsg) Reply() string { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Reply") + ret0, _ := ret[0].(string) + return ret0 +} + +// Reply indicates an expected call of Reply. +func (mr *MockMsgMockRecorder) Reply() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Reply", reflect.TypeOf((*MockMsg)(nil).Reply)) +} + +// Subject mocks base method. +func (m *MockMsg) Subject() string { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Subject") + ret0, _ := ret[0].(string) + return ret0 +} + +// Subject indicates an expected call of Subject. +func (mr *MockMsgMockRecorder) Subject() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Subject", reflect.TypeOf((*MockMsg)(nil).Subject)) +} + +// Term mocks base method. +func (m *MockMsg) Term() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Term") + ret0, _ := ret[0].(error) + return ret0 +} + +// Term indicates an expected call of Term. +func (mr *MockMsgMockRecorder) Term() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Term", reflect.TypeOf((*MockMsg)(nil).Term)) +} + +// TermWithReason mocks base method. +func (m *MockMsg) TermWithReason(arg0 string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "TermWithReason", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// TermWithReason indicates an expected call of TermWithReason. +func (mr *MockMsgMockRecorder) TermWithReason(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TermWithReason", reflect.TypeOf((*MockMsg)(nil).TermWithReason), arg0) +} + +// MockMessageBatch is a mock of MessageBatch interface. +type MockMessageBatch struct { + ctrl *gomock.Controller + recorder *MockMessageBatchMockRecorder +} + +// MockMessageBatchMockRecorder is the mock recorder for MockMessageBatch. +type MockMessageBatchMockRecorder struct { + mock *MockMessageBatch +} + +// NewMockMessageBatch creates a new mock instance. +func NewMockMessageBatch(ctrl *gomock.Controller) *MockMessageBatch { + mock := &MockMessageBatch{ctrl: ctrl} + mock.recorder = &MockMessageBatchMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockMessageBatch) EXPECT() *MockMessageBatchMockRecorder { + return m.recorder +} + +// Error mocks base method. +func (m *MockMessageBatch) Error() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Error") + ret0, _ := ret[0].(error) + return ret0 +} + +// Error indicates an expected call of Error. +func (mr *MockMessageBatchMockRecorder) Error() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Error", reflect.TypeOf((*MockMessageBatch)(nil).Error)) +} + +// Messages mocks base method. +func (m *MockMessageBatch) Messages() <-chan jetstream.Msg { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Messages") + ret0, _ := ret[0].(<-chan jetstream.Msg) + return ret0 +} + +// Messages indicates an expected call of Messages. +func (mr *MockMessageBatchMockRecorder) Messages() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Messages", reflect.TypeOf((*MockMessageBatch)(nil).Messages)) +} diff --git a/pkg/gofr/datasource/pubsub/nats/mock_metrics.go b/pkg/gofr/datasource/pubsub/nats/mock_metrics.go new file mode 100644 index 000000000..072cb18cf --- /dev/null +++ b/pkg/gofr/datasource/pubsub/nats/mock_metrics.go @@ -0,0 +1,57 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: ./metrics.go +// +// Generated by this command: +// +// mockgen -destination=mock_metrics.go -package=nats -source=./metrics.go +// + +// Package nats is a generated GoMock package. +package nats + +import ( + context "context" + reflect "reflect" + + gomock "go.uber.org/mock/gomock" +) + +// MockMetrics is a mock of Metrics interface. +type MockMetrics struct { + ctrl *gomock.Controller + recorder *MockMetricsMockRecorder +} + +// MockMetricsMockRecorder is the mock recorder for MockMetrics. +type MockMetricsMockRecorder struct { + mock *MockMetrics +} + +// NewMockMetrics creates a new mock instance. +func NewMockMetrics(ctrl *gomock.Controller) *MockMetrics { + mock := &MockMetrics{ctrl: ctrl} + mock.recorder = &MockMetricsMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockMetrics) EXPECT() *MockMetricsMockRecorder { + return m.recorder +} + +// IncrementCounter mocks base method. +func (m *MockMetrics) IncrementCounter(ctx context.Context, name string, labels ...string) { + m.ctrl.T.Helper() + varargs := []any{ctx, name} + for _, a := range labels { + varargs = append(varargs, a) + } + m.ctrl.Call(m, "IncrementCounter", varargs...) +} + +// IncrementCounter indicates an expected call of IncrementCounter. +func (mr *MockMetricsMockRecorder) IncrementCounter(ctx, name any, labels ...any) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]any{ctx, name}, labels...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IncrementCounter", reflect.TypeOf((*MockMetrics)(nil).IncrementCounter), varargs...) +} diff --git a/pkg/gofr/datasource/pubsub/nats/pubsub_wrapper.go b/pkg/gofr/datasource/pubsub/nats/pubsub_wrapper.go new file mode 100644 index 000000000..d90cc0b3f --- /dev/null +++ b/pkg/gofr/datasource/pubsub/nats/pubsub_wrapper.go @@ -0,0 +1,74 @@ +package nats + +import ( + "context" + + "github.com/nats-io/nats.go" + "gofr.dev/pkg/gofr/datasource" + "gofr.dev/pkg/gofr/datasource/pubsub" +) + +// PubSubWrapper adapts Client to pubsub.JetStreamClient. +type PubSubWrapper struct { + Client *Client +} + +// Publish publishes a message to a topic. +func (w *PubSubWrapper) Publish(ctx context.Context, topic string, message []byte) error { + return w.Client.Publish(ctx, topic, message) +} + +// Subscribe subscribes to a topic and returns a single message. +func (w *PubSubWrapper) Subscribe(ctx context.Context, topic string) (*pubsub.Message, error) { + return w.Client.Subscribe(ctx, topic) +} + +// CreateTopic creates a new topic (stream) in NATS JetStream. +func (w *PubSubWrapper) CreateTopic(ctx context.Context, name string) error { + return w.Client.CreateTopic(ctx, name) +} + +// DeleteTopic deletes a topic (stream) in NATS JetStream. +func (w *PubSubWrapper) DeleteTopic(ctx context.Context, name string) error { + return w.Client.DeleteTopic(ctx, name) +} + +// Close closes the Client. +func (w *PubSubWrapper) Close() error { + return w.Client.Close() +} + +// Health returns the health status of the Client. +func (w *PubSubWrapper) Health() datasource.Health { + status := datasource.StatusUp + if w.Client.Conn == nil || w.Client.Conn.Status() != nats.CONNECTED { + status = datasource.StatusDown + } + + return datasource.Health{ + Status: status, + Details: map[string]interface{}{ + "server": w.Client.Config.Server, + }, + } +} + +// Connect establishes a connection to NATS. +func (w *PubSubWrapper) Connect() { + w.Client.Connect() +} + +// UseLogger sets the logger for the NATS client. +func (w *PubSubWrapper) UseLogger(logger any) { + w.Client.UseLogger(logger) +} + +// UseMetrics sets the metrics for the NATS client. +func (w *PubSubWrapper) UseMetrics(metrics any) { + w.Client.UseMetrics(metrics) +} + +// UseTracer sets the tracer for the NATS client. +func (w *PubSubWrapper) UseTracer(tracer any) { + w.Client.UseTracer(tracer) +}