-
Notifications
You must be signed in to change notification settings - Fork 1
/
MysqlAsSink.py
34 lines (26 loc) · 1.04 KB
/
MysqlAsSink.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
from kafka import KafkaConsumer
import json
import msgpack
import pymysql.cursors
# To consume latest messages and auto-commit offsets
consumer = KafkaConsumer('test',
group_id='my-group',
bootstrap_servers=['localhost:9092'])
connection = pymysql.connect(host='127.0.0.1',
user='root',
password='root',
db='datasource',
charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor)
cursor = connection.cursor()
for message in consumer:
x = str(message.value.decode('utf-8')).split('|')
print(message.value.decode('utf-8'))
print(x)
sql = "insert into emp1 values(%s, %s, %s)"
cursor.execute(sql, ( int(x[0].replace('"', "")), x[1], x[2].replace('"', "")))
connection.commit()
connection.commit()
connection.close()
# StopIteration if no message after 1sec
KafkaConsumer(consumer_timeout_ms=100)