-
Notifications
You must be signed in to change notification settings - Fork 15
/
kinesis.py
55 lines (40 loc) · 1.65 KB
/
kinesis.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
import json
import boto3
import csv
def lambda_handler(event, context):
if event:
# Capture event details
eventObj = event["Records"][0]
sourcebucket = eventObj['s3']['bucket']['name']
csvkey = eventObj['s3']['object']['key']
# Set variable for resource
s3 = boto3.resource('s3')
# Read csv content
csv_object = s3.Object(sourcebucket, csvkey)
csv_content = csv_object.get()['Body'].read().splitlines()
# Placeholder list for JSON
l = []
for line in csv_content:
x = json.dumps(line.decode('utf-8')).split(',')
# Parse columns
Id = str(x[0])
Team = str(x[1])
City = str(x[2])
# Format JSON
y = '{ "Id": ' + Id + '"' + ',' \
+ ' "Team": ' + '"' + Team + '"' + ',' \
+ ' "City": ' + '"' + City + '"' + '}'
# Add to list
l.append(y)
# Clean formatting
jsonFormatted = str(l).replace("'","")
# Push to Kinesis stream
kinesis = boto3.client('kinesis', region_name='us-east-2')
stream_name = 'my-demo-stream'
payload = {
'Data': jsonFormatted
}
kinesis.put_record(StreamName=stream_name, Data=json.dumps(payload), PartitionKey='partition')
print('Success')
return
# s3 (source bucket) -> Lambda -> Kinesis (Data Stream) -> Kinesis (Firehose) -> s3 (data lake)