-
Notifications
You must be signed in to change notification settings - Fork 21
/
Copy pathevent_bridge.go
114 lines (88 loc) · 2.79 KB
/
event_bridge.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
package bus
import (
"context"
"log"
"math"
"github.com/aws-samples/serverless-go-demo/types"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/cloudwatchevents"
cloudwatchtypes "github.com/aws/aws-sdk-go-v2/service/cloudwatchevents/types"
)
type EventBridgeBus struct {
client *cloudwatchevents.Client
busName string
}
var _ types.Bus = (*EventBridgeBus)(nil)
func NewEventBridgeBus(ctx context.Context, busName string) *EventBridgeBus {
cfg, err := config.LoadDefaultConfig(ctx)
if err != nil {
log.Fatalf("unable to load SDK config, %v", err)
}
client := cloudwatchevents.NewFromConfig(cfg)
return &EventBridgeBus{
client: client,
busName: busName,
}
}
func (e *EventBridgeBus) Put(ctx context.Context, events []types.Event) ([]types.FailedEvent, error) {
failedBatchEvents, err :=
batchEvents(events, 10, func(batchEvents []types.Event) ([]types.FailedEvent, error) {
eventBridgeEvents := make([]cloudwatchtypes.PutEventsRequestEntry, len(batchEvents))
for i, event := range batchEvents {
eventBridgeEvent := cloudwatchtypes.PutEventsRequestEntry{
EventBusName: &e.busName,
Source: aws.String(event.Source),
Detail: aws.String(event.Detail),
DetailType: aws.String(event.DetailType),
Resources: event.Resources,
}
eventBridgeEvents[i] = eventBridgeEvent
}
result, err := e.client.PutEvents(ctx, &cloudwatchevents.PutEventsInput{
Entries: eventBridgeEvents,
})
failedEvents := []types.FailedEvent{}
if err != nil {
return failedEvents, err
}
if result.FailedEntryCount > 0 {
for i, entry := range result.Entries {
if entry.EventId != nil {
continue
}
failedEvent := types.FailedEvent{
Event: batchEvents[i],
FailureCode: *entry.ErrorCode,
FailureMessage: *entry.ErrorMessage,
}
failedEvents = append(failedEvents, failedEvent)
}
}
return failedEvents, nil
})
return failedBatchEvents, err
}
func batchEvents(events []types.Event, maxBatchSize uint, batchFn func([]types.Event) ([]types.FailedEvent, error)) ([]types.FailedEvent, error) {
skip := 0
recordsAmount := len(events)
batchAmount := int(math.Ceil(float64(recordsAmount) / float64(maxBatchSize)))
batchFailedEvents := []types.FailedEvent{}
for i := 0; i < batchAmount; i++ {
lowerBound := skip
upperBound := skip + int(maxBatchSize)
if upperBound > recordsAmount {
upperBound = recordsAmount
}
batchEvents := events[lowerBound:upperBound]
skip += int(maxBatchSize)
failedEvents, err := batchFn(batchEvents)
if err != nil {
return batchFailedEvents, err
}
if len(failedEvents) > 0 {
batchFailedEvents = append(batchFailedEvents, failedEvents...)
}
}
return batchFailedEvents, nil
}