-
Notifications
You must be signed in to change notification settings - Fork 0
/
cloudwatch_logs_reader.go
138 lines (119 loc) · 3.29 KB
/
cloudwatch_logs_reader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
package main
import (
"fmt"
"time"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs"
"github.com/hashicorp/golang-lru"
)
const (
maxEventsBuffer = 10000
maxEventsCache = 100000
watchSleepTime = 1 // SleepTime when watch is enabled
)
// When sleep
var watchStart = -time.Duration(3) * time.Minute
// CloudwatchLogsReader is responsible for fetching logs for a particular log
// group
type CloudwatchLogsReader struct {
svc *cloudwatchlogs.CloudWatchLogs
config aws.Config
logGroupName string
start time.Time
end time.Time
filter string
streamPrefix string
eventCache *lru.Cache
}
// NewCloudwatchLogsReader takes a group and optionally a stream prefix, start and
// end time, and returns a reader for any logs that match those parameters.
func NewCloudwatchLogsReader(config aws.Config,
group, streamPrefix, filter string,
start time.Time, end time.Time) (*CloudwatchLogsReader, error) {
svc := cloudwatchlogs.New(config)
// check that group is exists
if groupExists(config, group) == false {
return nil, fmt.Errorf("group %s does not exists", group)
}
cache, err := lru.New(maxEventsCache)
if err != nil {
return nil, err
}
reader := &CloudwatchLogsReader{
config: config,
svc: svc,
logGroupName: group,
start: start,
end: end,
filter: filter,
streamPrefix: streamPrefix,
eventCache: cache,
}
return reader, nil
}
func (reader *CloudwatchLogsReader) Stream(watch bool) (chan Event, error) {
stream := make(chan Event, maxEventsBuffer)
go reader.startStream(stream, watch)
return stream, nil
}
func (reader *CloudwatchLogsReader) startStream(stream chan Event, watch bool) {
ss, err := ListStreams(reader.config,
reader.logGroupName, reader.streamPrefix,
reader.start, reader.end)
if err != nil {
fmt.Println(err)
close(stream)
return
}
// FilterLogEventsInput can not use more than 100 streams.
if len(ss) > 100 {
ss = ss[0:99]
}
params := &cloudwatchlogs.FilterLogEventsInput{
StartTime: aws.Int64(aws.TimeUnixMilli(reader.start)),
EndTime: aws.Int64(aws.TimeUnixMilli(reader.end)),
LogGroupName: aws.String(reader.logGroupName),
LogStreamNames: ss,
Interleaved: aws.Bool(true),
}
if reader.filter != "" {
params.FilterPattern = aws.String(reader.filter)
}
LOOP:
req := reader.svc.FilterLogEventsRequest(params)
p := req.Paginate()
for p.Next() {
page := p.CurrentPage()
for _, e := range page.Events {
if _, ok := reader.eventCache.Peek(*e.EventId); !ok {
stream <- fromFilteredLogEvent(reader.logGroupName, e)
reader.eventCache.Add(*e.EventId, nil)
}
}
}
if err := p.Err(); err != nil {
fmt.Println(err)
}
if watch {
time.Sleep(watchSleepTime)
params.StartTime = aws.Int64(aws.TimeUnixMilli(time.Now().Add(watchStart)))
params.EndTime = aws.Int64(aws.TimeUnixMilli(time.Now()))
goto LOOP
}
close(stream)
}
func groupExists(config aws.Config, group string) bool {
svc := cloudwatchlogs.New(config)
describeLogGroupsInput := &cloudwatchlogs.DescribeLogGroupsInput{
LogGroupNamePrefix: aws.String(group),
}
req := svc.DescribeLogGroupsRequest(describeLogGroupsInput)
p, err := req.Send()
if err != nil {
return false
}
if len(p.LogGroups) == 0 {
return false
}
return true
}