-
Notifications
You must be signed in to change notification settings - Fork 1
/
sqs_handler.go
130 lines (108 loc) · 3.35 KB
/
sqs_handler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
package g8
import (
"context"
"encoding/json"
"github.com/aws/aws-lambda-go/events"
"github.com/aws/aws-lambda-go/lambda"
"github.com/google/uuid"
newrelic "github.com/newrelic/go-agent"
"github.com/newrelic/go-agent/_integrations/nrlambda"
"github.com/rs/zerolog"
)
type SQSContext struct {
Context context.Context
Message events.SQSMessage
Logger zerolog.Logger
NewRelicTx newrelic.Transaction
CorrelationID string
}
type SQSHandlerFunc func(c *SQSContext) error
type SQSMessageEnvelope struct {
Data interface{} `json:"data"`
Meta *SQSMessageMeta `json:"meta"`
}
type SQSMessageMeta struct {
CorrelationID string `json:"correlation_id"`
}
func SQSHandler(h SQSHandlerFunc, conf HandlerConfig) func(context.Context, events.SQSEvent) error {
return func(ctx context.Context, e events.SQSEvent) error {
for _, record := range e.Records {
// parse the envelope and get the meta data if available
// the body should then be updated with the inner message
// data for when the data is bound.
meta, dataBytes := parseRawMessage([]byte(record.Body))
record.Body = string(dataBytes)
correlationID := getCorrelationIDSQS(meta)
logger := configureLogger(conf).
Str("correlation_id", correlationID).
Str("sqs_event_source", record.EventSource).
Str("sqs_message_id", record.MessageId).
Logger()
c := &SQSContext{
Context: ctx,
Message: record,
Logger: logger,
NewRelicTx: newrelic.FromContext(ctx),
CorrelationID: correlationID,
}
c.AddNewRelicAttribute("functionName", conf.FunctionName)
c.AddNewRelicAttribute("sqsEventSource", record.EventSource)
c.AddNewRelicAttribute("sqsMessageID", record.MessageId)
c.AddNewRelicAttribute("correlationID", correlationID)
c.AddNewRelicAttribute("buildVersion", conf.BuildVersion)
if err := h(c); err != nil {
logUnhandledError(c.Logger, err)
return err
}
}
return nil
}
}
func SQSHandlerWithNewRelic(h SQSHandlerFunc, conf HandlerConfig) lambda.Handler {
return nrlambda.Wrap(SQSHandler(h, conf), conf.NewRelicApp)
}
func (c *SQSContext) AddNewRelicAttribute(key string, val interface{}) {
if c.NewRelicTx == nil {
return
}
if err := c.NewRelicTx.AddAttribute(key, val); err != nil {
c.Logger.Error().Msgf("failed to add attr '%s' to new relic tx: %+v", key, err)
}
}
func (c *SQSContext) Bind(v interface{}) error {
if err := json.Unmarshal([]byte(c.Message.Body), v); err != nil {
return err
}
if validatable, ok := v.(Validatable); ok {
err := validatable.Validate()
if err != nil {
return err
}
}
return nil
}
func parseRawMessage(body []byte) (*SQSMessageMeta, []byte) {
var envelope SQSMessageEnvelope
err := json.Unmarshal(body, &envelope)
if err != nil {
// errors are swallowed here because the message body isn't guaranteed to be
// in the envelope type - if not, we should return the entire body
return nil, body
}
if envelope.Meta == nil {
// if meta is nil, it means the data wasn't enveloped
return nil, body
}
// we want to return the raw JSON of the inner message so it can be bound by the application
b, err := json.Marshal(envelope.Data)
if err != nil {
return nil, body
}
return envelope.Meta, b
}
func getCorrelationIDSQS(m *SQSMessageMeta) string {
if m == nil || m.CorrelationID == "" {
return uuid.New().String()
}
return m.CorrelationID
}