-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathkafka_topic_creator.py
34 lines (27 loc) · 1.66 KB
/
kafka_topic_creator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import config
from confluent_kafka.admin import AdminClient, NewTopic
def create_topics():
topics_to_be_created_list = []
client = AdminClient({'bootstrap.servers': config.bootstrap_servers})
topic_metadata = client.list_topics()
if topic_metadata.topics.get(config.links_to_be_processed_topic) is None:
print("creating " + config.links_to_be_processed_topic)
topics_to_be_created_list.append(NewTopic(topic=config.links_to_be_processed_topic,
num_partitions=config.links_to_be_processed_topic_num_partitions,
replication_factor=config.links_to_be_processed_topic_replication_factor))
if topic_metadata.topics.get(config.processed_links_topic) is None:
print("creating " + config.processed_links_topic)
topics_to_be_created_list.append(NewTopic(topic=config.processed_links_topic,
num_partitions=config.processed_links_topic_num_partitions,
replication_factor=config.processed_links_topic_replication_factor))
if len(topics_to_be_created_list) > 0:
# Call create_topics to asynchronously create topics. A dict
# of <topic,future> is returned.
fs = client.create_topics(topics_to_be_created_list)
# Wait for each operation to finish.
for topic, f in fs.items():
try:
f.result() # The result itself is None
print("Topic {} created".format(topic))
except Exception as e:
print("Failed to create topic {}: {}".format(topic, e))