Skip to content

Commit

Permalink
Fix stream, consumer error handling
Browse files Browse the repository at this point in the history
Signed-off-by: Masudur Rahman <masud@appscode.com>
  • Loading branch information
masudur-rahman committed Oct 5, 2023
1 parent 1054527 commit 135daef
Showing 1 changed file with 4 additions and 3 deletions.
7 changes: 4 additions & 3 deletions manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,14 +119,14 @@ func New(nc *nats.Conn, opts Options) *TaskManager {
}
}

func (mgr *TaskManager) Start(ctx context.Context, jsmOpts ...nats.JSOpt) error {
func (mgr *TaskManager) Start(ctx context.Context) error {
jsm, err := jetstream.New(mgr.nc)
if err != nil {
return err
}

stream, err := jsm.Stream(ctx, mgr.stream)
if stream == nil || err != nil && err.Error() == "nats: stream not found" {
if stream == nil || err == jetstream.ErrStreamNotFound {
_, err = jsm.CreateStream(ctx, jetstream.StreamConfig{
Name: mgr.stream,
Subjects: []string{mgr.stream + ".queue.*"},
Expand All @@ -148,7 +148,8 @@ func (mgr *TaskManager) Start(ctx context.Context, jsmOpts ...nats.JSOpt) error

// create nats consumer
consumerName := "workers"
consumer, err := jsm.CreateConsumer(ctx, mgr.stream, jetstream.ConsumerConfig{
consumer, err := jsm.CreateOrUpdateConsumer(ctx, mgr.stream, jetstream.ConsumerConfig{
Name: consumerName,
Durable: consumerName,
AckPolicy: jetstream.AckExplicitPolicy,
AckWait: mgr.ackWait, // TODO: max for any task type
Expand Down

0 comments on commit 135daef

Please sign in to comment.