This repository has been archived by the owner on Apr 24, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 2
/
cloudwatch_logger.go
288 lines (236 loc) · 7.56 KB
/
cloudwatch_logger.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
package log
import (
"context"
"encoding/json"
"errors"
"fmt"
"log"
"sync"
"time"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs"
"github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs/types"
"github.com/gofor-little/ts"
)
// CloudWatchLogger is a structured logger that logs to CloudWatch and is thread safe.
type CloudWatchLogger struct {
currentDay int
cloudWatchLogs *cloudwatchlogs.Client
logEventsList *ts.LinkedList
logGroupName *string
nextSequenceToken *string
globalFields Fields
mutex sync.RWMutex
}
// NewCloudWatchLogger initializes a new CloudWatchLogger object and returns it.
// The profile and region parameters are optional if authentication with CloudWatch
// can be provided in other ways, such as IAM roles. logGroupName is the name of
// the log group in CloudWatch. globalFields are the fields that are written in every log message.
func NewCloudWatchLogger(ctx context.Context, profile string, region string, logGroupName string, globalFields Fields) (*CloudWatchLogger, error) {
var cfg aws.Config
var err error
if profile != "" && region != "" {
cfg, err = config.LoadDefaultConfig(ctx, config.WithSharedConfigProfile(profile), config.WithRegion(region))
} else {
cfg, err = config.LoadDefaultConfig(ctx)
}
if err != nil {
return nil, fmt.Errorf("failed to load default config: %w", err)
}
logger := &CloudWatchLogger{
currentDay: time.Now().Day(),
cloudWatchLogs: cloudwatchlogs.NewFromConfig(cfg),
logEventsList: &ts.LinkedList{},
logGroupName: aws.String(logGroupName),
globalFields: globalFields,
}
if err := logger.checkLogGroup(ctx); err != nil {
return nil, err
}
go func() {
ticker := time.NewTicker(time.Second / 5)
for {
<-ticker.C
if err := logger.putLogs(ctx); err != nil {
log.Fatalf("failed to send logs to CloudWatch: %v", err)
}
}
}()
return logger, nil
}
// Info writes a log message at an info level.
func (c *CloudWatchLogger) Info(fields Fields) error {
return c.queueLog("info", fields)
}
// Error writes a log message at an error level.
func (c *CloudWatchLogger) Error(fields Fields) error {
return c.queueLog("error", fields)
}
// Debug writes a log message at a debug level.
func (c *CloudWatchLogger) Debug(fields Fields) error {
return c.queueLog("debug", fields)
}
// createLogGroup creates a log group in CloudWatch.
func (c *CloudWatchLogger) createLogGroup(ctx context.Context) error {
input := &cloudwatchlogs.CreateLogGroupInput{
LogGroupName: c.logGroupName,
}
_, err := c.cloudWatchLogs.CreateLogGroup(ctx, input)
return err
}
// queue combines the globalFields and the passed fields, then
// marshals them to JSON and finally adds it to a thread safe queue.
func (c *CloudWatchLogger) queueLog(level string, fields Fields) error {
for key, value := range c.globalFields {
fields[key] = value
}
fields["level"] = level
data, err := json.Marshal(fields)
if err != nil {
return err
}
messages := [][]byte{}
// Check if the data is larger than the max input log event size.
// If so, split it into a slice so the data can be added over multiple
// events. This may break the JSON structure of very large amounts of
// data as it will be split between multiple log events.
for {
if len(data) <= maxInputLogEventSize {
messages = append(messages, data)
break
}
messages = append(messages, data[:maxBatchInputLogEventSize])
data = data[maxBatchInputLogEventSize:]
}
// Lock the mutex so we can queue our messages.
c.mutex.Lock()
defer c.mutex.Unlock()
// Range over the messages and push them to the event list.
for _, m := range messages {
var tail *CloudWatchLogEventSlice
// Fetch the tail from the event list. If the message can be added to the
// tail add it. Otherwise push to the event list and add to the new tail.
if !c.logEventsList.IsEmpty() && c.logEventsList.GetTail().(*CloudWatchLogEventSlice).canAdd(m) {
tail = c.logEventsList.GetTail().(*CloudWatchLogEventSlice)
} else {
tail = &CloudWatchLogEventSlice{}
c.logEventsList.Push(tail)
}
if err := tail.add(m); err != nil {
return err
}
}
return nil
}
// putLogs pops the oldest CloudWatchLogEventList off the queue, then
// writes it to CloudWatch.
func (c *CloudWatchLogger) putLogs(ctx context.Context) error {
if c.logEventsList.IsEmpty() {
return nil
}
if err := c.checkLogStream(ctx); err != nil {
return err
}
elements := c.logEventsList.Pop().(*CloudWatchLogEventSlice).logEvents.GetElements()
inputLogEvents := make([]types.InputLogEvent, len(elements))
for i, e := range elements {
inputLogEvents[i] = e.(types.InputLogEvent)
}
input := &cloudwatchlogs.PutLogEventsInput{
LogEvents: inputLogEvents,
LogGroupName: c.logGroupName,
LogStreamName: aws.String(time.Now().Format("2006-01-02")),
SequenceToken: c.nextSequenceToken,
}
output, err := c.cloudWatchLogs.PutLogEvents(ctx, input)
if err != nil {
var dataAlreadyAccepted *types.DataAlreadyAcceptedException
var invalidSequenceToken *types.InvalidSequenceTokenException
if errors.As(err, &dataAlreadyAccepted) {
input.SequenceToken = dataAlreadyAccepted.ExpectedSequenceToken
_, err = c.cloudWatchLogs.PutLogEvents(ctx, input)
if err != nil {
return err
}
} else if errors.As(err, &invalidSequenceToken) {
input.SequenceToken = invalidSequenceToken.ExpectedSequenceToken
_, err = c.cloudWatchLogs.PutLogEvents(ctx, input)
if err != nil {
return err
}
}
return err
}
c.nextSequenceToken = output.NextSequenceToken
return nil
}
// createLogStream creates a log stream in CloudWatch.
func (c *CloudWatchLogger) createLogStream(ctx context.Context) error {
input := &cloudwatchlogs.CreateLogStreamInput{
LogGroupName: c.logGroupName,
LogStreamName: aws.String(time.Now().Format("2006-01-02")),
}
_, err := c.cloudWatchLogs.CreateLogStream(ctx, input)
return err
}
// checkLogGroup checks if the log group exists in CloudWatch.
// If it doesn't it will be created.
func (c *CloudWatchLogger) checkLogGroup(ctx context.Context) error {
logGroupExists, err := c.logGroupExists(ctx)
if err != nil {
return err
}
if logGroupExists {
return nil
}
return c.createLogGroup(ctx)
}
// checkLogStream checks if the log stream exists in CloudWatch.
// If it doesn't it will be created.
func (c *CloudWatchLogger) checkLogStream(ctx context.Context) error {
logStreamExists, err := c.logStreamExists(ctx)
if err != nil {
return err
}
if logStreamExists {
return nil
}
return c.createLogStream(ctx)
}
// logGroupExists checks if the log group exists in CloudWatch.
func (c *CloudWatchLogger) logGroupExists(ctx context.Context) (bool, error) {
input := &cloudwatchlogs.DescribeLogGroupsInput{
LogGroupNamePrefix: c.logGroupName,
}
output, err := c.cloudWatchLogs.DescribeLogGroups(ctx, input)
if err != nil {
return false, err
}
if output.LogGroups != nil {
for _, logGroup := range output.LogGroups {
if *logGroup.LogGroupName == *c.logGroupName {
return true, nil
}
}
}
return false, nil
}
// logStreamExists checks if the log stream exists in CloudWatch.
func (c *CloudWatchLogger) logStreamExists(ctx context.Context) (bool, error) {
input := &cloudwatchlogs.DescribeLogStreamsInput{
LogGroupName: c.logGroupName,
}
output, err := c.cloudWatchLogs.DescribeLogStreams(ctx, input)
if err != nil {
return false, nil
}
if output.LogStreams != nil {
for _, logStream := range output.LogStreams {
if *logStream.LogStreamName == time.Now().Format("2006-01-02") {
return true, nil
}
}
}
return false, nil
}