-
Notifications
You must be signed in to change notification settings - Fork 0
/
common.py
120 lines (84 loc) · 2.68 KB
/
common.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
"""Dynamodb stream processor
Process dynamodb streams filtering for inserts and modify payloads
"""
import json
import math
import time
import boto3
try:
import unzip_requirements
except ImportError:
pass
from dynamodb_json import json_util as ddbjson
delivery_stream_name = "core-dynamodb-v1"
firehose_client = boto3.client("firehose", region_name="us-west-2")
def compose(f, g):
return lambda x: f(g(x))
def newImage(record, event_name):
"""Return paylods of interest
function does:
1- strips dynamodb `json-typed-tags`
2- return `NewImage`, a dynamdb term for the actual user-data
"""
record_data = record["dynamodb"]["NewImage"]
record_data.update({"timestamp": math.floor(time.time() * 1000)})
record_data.update({"dynamodbEvent": event_name})
return json.dumps(record_data)
def generate(event):
""" record generator
generator strips the dyamodb `type-tags`
"""
for record in event["Records"]:
yield ddbjson.loads(record)
def inserted(record):
""" process dynamo insert events
"""
return f"{newImage(record, 'INSERT')}\n"
def modified(record):
""" process dynamo modify events
"""
return f"{newImage(record, 'MODIFY')}\n"
def removed(record):
""" process dynamo remove events
"""
return f"{newImage(record, 'REMOVE')}\n"
def event_router(dynamo_event):
""" event router returns to the appropirate function
"""
routes = {"INSERT": inserted, "MODIFY": modified, "REMOVE": removed}
return routes[dynamo_event]
def as_kcl_record(payload):
""" string to kinesis record
"""
return {"Data": payload, "PartitionKey": str(hash(payload))}
def as_firehose_record(payload):
""" string to kinesis record
"""
return {"Data": payload.encode()}
def ddb_to_firehose(record):
"""process dynamodb stream record to kinesis record
"""
f = event_router(dynamo_event=record["eventName"])
process_function = compose(as_firehose_record, f)
return process_function(record)
def dispatcher(event):
""" dispatches the dynamodb events to the corresponding processor
"""
def batch_events(kcls):
for record in generate(event):
kcl_record = ddb_to_firehose(record)
kcls.append(kcl_record)
return kcls
return batch_events([])
def firehose_writer(records):
""" writes to aws firehose
"""
firehose_client.put_record_batch(
DeliveryStreamName=delivery_stream_name, Records=records
)
def processor():
"""Process a batch of dynamodb records
returns a function that transforms and writes dynamodb records to firehose
"""
f = compose(firehose_writer, dispatcher)
return f