-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathqueueservice.go
90 lines (80 loc) · 2.91 KB
/
queueservice.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
package goqueue
import (
"context"
"errors"
"time"
"github.com/bxcodec/goqueue/interfaces"
"github.com/bxcodec/goqueue/internal/consumer"
"github.com/bxcodec/goqueue/internal/publisher"
"github.com/bxcodec/goqueue/options"
"golang.org/x/sync/errgroup"
)
// QueueService represents a service that handles message queuing operations.
type QueueService struct {
consumer consumer.Consumer // The consumer responsible for consuming messages from the queue.
handler interfaces.InboundMessageHandler // The handler responsible for processing incoming messages.
publisher publisher.Publisher // The publisher responsible for publishing messages to the queue.
NumberOfConsumer int // The number of consumers to process messages concurrently.
}
// NewQueueService creates a new instance of QueueService with the provided options.
// It accepts a variadic parameter `opts` which allows configuring the QueueService.
// The options are applied in the order they are provided.
// Returns a pointer to the created QueueService.
func NewQueueService(opts ...options.GoQueueOptionFunc) *QueueService {
opt := options.DefaultGoQueueOption()
for _, o := range opts {
o(opt)
}
return &QueueService{
consumer: opt.Consumer,
handler: opt.MessageHandler,
publisher: opt.Publisher,
NumberOfConsumer: opt.NumberOfConsumer,
}
}
// Start starts the queue service by spawning multiple consumers to process messages.
// It returns an error if the consumer or handler is not defined.
// The method uses the provided context to manage the lifecycle of the consumers.
func (qs *QueueService) Start(ctx context.Context) (err error) {
if qs.consumer == nil {
return errors.New("consumer is not defined")
}
if qs.handler == nil {
return errors.New("handler is not defined")
}
g, ctx := errgroup.WithContext(ctx)
for i := 0; i < qs.NumberOfConsumer; i++ {
meta := map[string]interface{}{
"consumer_id": i,
"started_time": time.Now(),
}
g.Go(func() error {
return qs.consumer.Consume(ctx, qs.handler, meta)
})
}
return g.Wait()
}
// Stop stops the queue service by stopping the consumer and closing the publisher.
// It returns an error if there was an issue stopping the consumer or closing the publisher.
func (qs *QueueService) Stop(ctx context.Context) error {
if qs.consumer == nil {
return errors.New("consumer is not defined")
}
err := qs.consumer.Stop(ctx)
if err != nil {
return err
}
err = qs.publisher.Close(ctx)
if err != nil {
return err
}
return nil
}
// Publish publishes a message to the queue.
// It returns an error if the publisher is not defined or if there was an error while publishing the message.
func (qs *QueueService) Publish(ctx context.Context, m interfaces.Message) (err error) {
if qs.publisher == nil {
return errors.New("publisher is not defined")
}
return qs.publisher.Publish(ctx, m)
}