-
Notifications
You must be signed in to change notification settings - Fork 0
/
amqp_utils.go
121 lines (106 loc) · 3.69 KB
/
amqp_utils.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
package artifacts
import (
"fmt"
"log"
"github.com/getsentry/sentry-go"
amqp "github.com/rabbitmq/amqp091-go"
)
var ConsumerChannelActive = true
// SetupRMQConnection sets up the RabbitMQ connection and channels. It also spawns a goroutine that listens for any
// "Close" events from the broker.
func SetupRMQConnection(retryFunc func() error, brokerUrl, exchangeName, exchangeType string) (*amqp.Connection, *amqp.Channel, error) {
// Establish connection with RabbitMQ
rmqConn, err := dialRMQ(brokerUrl)
if err != nil {
return nil, nil, err
}
// Spawn a goroutine to handle any RabbitMQ related errors.
go handleRabbitMQErrors(rmqConn, retryFunc)
// Create Channel
rmqChannel, err := createChannel(rmqConn)
if err != nil {
return nil, nil, err
}
return rmqConn, rmqChannel, declareRealtimeExchange(rmqChannel, exchangeName, exchangeType)
}
// createChannel opens channel with janus virtualhost connection.
func createChannel(rmqConn *amqp.Connection) (*amqp.Channel, error) {
rmqChan, err := rmqConn.Channel()
return rmqChan, err
}
// Declare exchange `janus-realtime-notifications` to listen for realtime notification pushes from asgard
func declareRealtimeExchange(rmqChan *amqp.Channel, exchangeName, exchangeType string) (err error) {
if err = rmqChan.ExchangeDeclare(
exchangeName, // name of the exchange
exchangeType, // type
true, // durable
false, // delete when complete
false, // internal
false, // noWait
nil, // arguments
); err != nil {
return fmt.Errorf("Error declaring exchange: janus-realtime-notifications on vhost: janus. Error: %s", err)
}
return nil
}
// handleRabbitMQErrors looks out for any RabbitMQ errors/closure and re-establishes connection
// and initializes the realtime consumer.
func handleRabbitMQErrors(rmqConn *amqp.Connection, consumerInitFn func() error) {
rmqError := <-rmqConn.NotifyClose(make(chan *amqp.Error))
rmqCloseErr := fmt.Errorf("RabbitMQ connection closed")
// RabbitMQ connection closure triggered. Log and send a sentry alert.
if rmqError != nil {
rmqCloseErr = fmt.Errorf("RMQ Connection closed. Code: %d: Reason: %s", rmqError.Code, rmqError.Reason)
}
log.Println(rmqCloseErr)
sentry.CaptureException(rmqCloseErr)
// Check if the error is a "ConnectionError" or a "ChannelError".
// Other error codes don't trigger retrial.
if !ConsumerChannelActive || rmqError != nil && isBrokerError(rmqError.Code) {
for {
if retryError := retryBrokerConnections(consumerInitFn); retryError == nil {
break
}
}
}
}
// retryBrokerConnections retries broker connection function once the broker error/closure is detected.
func retryBrokerConnections(consumerSetupFn func() error) error {
// Setup consumers
if reloadErr := consumerSetupFn(); reloadErr != nil {
log.Println("Failed to restore RMQ consumers: ", reloadErr)
// raven.CaptureErrorAndWait(fmt.Errorf("Failed to restore RMQ consumers: %v", reloadErr), nil)
return reloadErr
}
return nil
}
// isBrokerError checks if the error code is a signifact broker error to re-establish connection and channel.
func isBrokerError(code int) bool {
switch code {
case
// Channel Errors
311, // amqp.ContentTooLarge
313, // amqp.NoConsumers
403, // amqp.AccessRefused
404, // amqp.NotFound
405, // amqp.ResourceLocked
406: // amqp.PreconditionFailed
return true
case
// Connection Errors
320, // amqp.ConnectionForced
402, // amqp.InvalidPath
501, // amqp.FrameError
502, // amqp.SyntaxError
503, // amqp.CommandInvalid
504, // amqp.ChannelError
505, // amqp.UnexpectedFrame
506, // amqp.ResourceError
530, // amqp.NotAllowed
540, // amqp.NotImplemented
541: // amqp.InternalError
return true
default:
return false
}
}