-
Notifications
You must be signed in to change notification settings - Fork 0
/
consumer.go
148 lines (133 loc) · 3.29 KB
/
consumer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
package rabbitmq
import (
"context"
"fmt"
"os"
"os/signal"
"syscall"
amqp "github.com/rabbitmq/amqp091-go"
)
// 函数带了上下文,可以用来中间件获取一些内容,如执行时间。
type MessageHandler func(ctx context.Context, msg []byte) error
type Subscription struct {
Exchange string
Queue string
Key []string
}
func (m *Subscription) GetExchange() string {
if m != nil {
return m.Exchange
}
return ""
}
func (m *Subscription) GetQueue() string {
if m != nil {
return m.Queue
}
return ""
}
func (m *Subscription) GetKey() []string {
if m != nil {
return m.Key
}
return nil
}
func AMQPTopology(channel *amqp.Channel, sub *Subscription) error {
if err := channel.ExchangeDeclare(
sub.GetExchange(), // name of the exchange
"topic", // type
true, // durable
false, // delete when complete
false, // internal
false, // noWait
nil, // arguments
); err != nil {
return fmt.Errorf("Exchange Declare: %s", err)
}
queue, err := channel.QueueDeclare(
sub.GetQueue(), // name of the queue
true, // durable
false, // delete when unused
false, // exclusive
false, // noWait
nil, // arguments
)
if err != nil {
return fmt.Errorf("Queue Declare: %s", err)
}
for _, key := range sub.GetKey() {
if err = channel.QueueBind(
queue.Name, // name of the queue
key, // bindingKey
sub.GetExchange(), // sourceExchange
false, // noWait
nil, // arguments
); err != nil {
return fmt.Errorf("Queue Bind: %s", err)
}
}
return nil
}
type AMQPConsumer struct {
// RabbitMQ Topology
Bind string
Subscription *Subscription
MessageHandler MessageHandler
// Setup Hooks
SetupHooks []func(*AMQPConsumer, *amqp.Channel) error
}
func (c *AMQPConsumer) Run(ctx context.Context) error {
quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
done := make(chan error, 1)
sess, returnFn, err := Borrow(ctx, c.Bind)
if err != nil {
return err
}
defer returnFn(ctx)
// Init Hook
if c.SetupHooks == nil {
// Default AMQPTopology
c.SetupHooks = []func(*AMQPConsumer, *amqp.Channel) error{
func(c *AMQPConsumer, ch *amqp.Channel) error {
return AMQPTopology(ch, c.Subscription)
},
}
}
for _, f := range c.SetupHooks {
err := f(c, sess.Channel)
if err != nil {
return err
}
}
deliveries, err := sess.Channel.Consume(
c.Subscription.GetQueue(), // name
"", // consumerTag,
false, // noAck
false, // exclusive
false, // noLocal
false, // noWait
nil, // arguments
)
if err != nil {
return fmt.Errorf("Queue Consume: %s", err)
}
// Stop On Channel Close
// TODO: Reconnect instead of close
go func(done chan<- error) {
<-sess.Channel.NotifyClose(make(chan *amqp.Error))
done <- nil
}(done)
go func(ctx context.Context, deliveries <-chan amqp.Delivery) {
handler := c.MessageHandler
for delivery := range deliveries {
handler(ctx, delivery.Body)
delivery.Ack(false) // 消息确认 ack。
}
}(ctx, deliveries)
select {
case <-quit:
case <-done:
}
return nil
}