From 135daefea76fed143cebacea77ca6c1855e4c3fa Mon Sep 17 00:00:00 2001 From: Masudur Rahman Date: Thu, 5 Oct 2023 13:30:20 +0600 Subject: [PATCH] Fix stream, consumer error handling Signed-off-by: Masudur Rahman --- manager/manager.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/manager/manager.go b/manager/manager.go index cbb7231..10b1795 100644 --- a/manager/manager.go +++ b/manager/manager.go @@ -119,14 +119,14 @@ func New(nc *nats.Conn, opts Options) *TaskManager { } } -func (mgr *TaskManager) Start(ctx context.Context, jsmOpts ...nats.JSOpt) error { +func (mgr *TaskManager) Start(ctx context.Context) error { jsm, err := jetstream.New(mgr.nc) if err != nil { return err } stream, err := jsm.Stream(ctx, mgr.stream) - if stream == nil || err != nil && err.Error() == "nats: stream not found" { + if stream == nil || err == jetstream.ErrStreamNotFound { _, err = jsm.CreateStream(ctx, jetstream.StreamConfig{ Name: mgr.stream, Subjects: []string{mgr.stream + ".queue.*"}, @@ -148,7 +148,8 @@ func (mgr *TaskManager) Start(ctx context.Context, jsmOpts ...nats.JSOpt) error // create nats consumer consumerName := "workers" - consumer, err := jsm.CreateConsumer(ctx, mgr.stream, jetstream.ConsumerConfig{ + consumer, err := jsm.CreateOrUpdateConsumer(ctx, mgr.stream, jetstream.ConsumerConfig{ + Name: consumerName, Durable: consumerName, AckPolicy: jetstream.AckExplicitPolicy, AckWait: mgr.ackWait, // TODO: max for any task type