-
Notifications
You must be signed in to change notification settings - Fork 9
/
Copy pathqueue.go
133 lines (110 loc) · 3.06 KB
/
queue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
package main
import (
"encoding/json"
"errors"
"strings"
"sync"
log "github.com/sirupsen/logrus"
)
// Queue is an encasulation for processing an SQS queue and enqueueing the
// results in sidekiq
type Queue interface {
// Poll gets the next batch of messages from SQS and processes them.
// When it's finished, it downs the sempahore
Poll()
// Semaphore returns the lock used to ensure that all the work is
// done before terminating the queue
Semaphore() *sync.WaitGroup
}
// queue is the actual implementation
type queue struct {
WorkerClient WorkerClient
SQSClient SQSClient
Topics map[string]string
Sem *sync.WaitGroup
}
// NewQueue creates a new Queue from the given Config. Returns an error if
// something about the config is invalid
func NewQueue(config *Config) (Queue, error) {
queue := new(queue)
var err error
queue.SQSClient, err = NewAWSSQSClient(config.AWS, config.Queue.Name, config.SQS)
if err != nil {
return nil, err
}
queue.WorkerClient, err = NewRedisWorkerClient(config.Redis)
if err != nil {
return nil, err
}
queue.Topics = config.Queue.Topics
if len(queue.Topics) == 0 {
return nil, errors.New("No topics defined")
}
queue.Sem = new(sync.WaitGroup)
return queue, nil
}
func (q *queue) Semaphore() *sync.WaitGroup {
return q.Sem
}
func (q *queue) Poll() {
if q.Sem != nil {
defer q.Sem.Done()
}
messages, err := q.SQSClient.Fetch()
if err != nil {
log.Error("Error fetching messages: ", err.Error())
}
for _, msg := range messages {
ctx := log.WithField("MessageID", msg.MessageID)
ctx.Info("Processing message")
deletable := q.enqueueMessage(msg, ctx)
if deletable {
q.deleteMessage(msg, ctx)
}
}
}
// deleteMessage deletes a single message from SQS
func (q *queue) deleteMessage(msg Message, ctx log.FieldLogger) {
err := q.SQSClient.Delete(msg)
if err != nil {
ctx.Error("Couldn't delete message: ", err.Error())
} else {
ctx.Info("Deleted message")
}
}
// enqueueMessage pushes a single message from SQS into redis
func (q *queue) enqueueMessage(msg Message, ctx log.FieldLogger) bool {
body := make(map[string]json.RawMessage)
err := json.Unmarshal([]byte(msg.Body), &body)
if err != nil {
ctx.Warn("Message body could not be parsed: ", err.Error())
return true
}
var topicARN string
err = json.Unmarshal(body["TopicArn"], &topicARN)
if err != nil {
ctx.Warn("Topic ARN could not be parsed: ", err.Error())
return true
}
workerClass, ok := q.Topics[topicName(topicARN)]
if !ok {
ctx.Warn("No worker for topic: ", topicName(topicARN))
return true
}
var bodyMessage string
err = json.Unmarshal(body["Message"], &bodyMessage)
if err != nil {
ctx.Warn("'Message' field could not be parsed: ", err.Error())
}
jid, err := q.WorkerClient.Push(workerClass, bodyMessage)
if err != nil {
ctx.WithField("Class", workerClass).Error("Couldn't enqueue worker: ", err.Error())
return false
}
ctx.WithField("Args", bodyMessage).Info("Enqueued job: ", jid)
return true
}
func topicName(topicARN string) string {
toks := strings.Split(topicARN, ":")
return toks[len(toks)-1]
}