diff --git a/aether-producer/producer/__init__.py b/aether-producer/producer/__init__.py index 56410e525..22f8eb999 100644 --- a/aether-producer/producer/__init__.py +++ b/aether-producer/producer/__init__.py @@ -496,10 +496,12 @@ def create_topic(self): self.logger.debug(f'Trying to create topic {self.topic_name}') kadmin = self.context.kafka_admin_client topic_config = self.context.settings.get('kafka_settings', {}).get('default.topic.config') + partitions = int(self.context.settings.get('kafka_default_topic_partitions', 1)) + replicas = int(self.context.settings.get('kafka_default_topic_replicas', 1)) topic = NewTopic( self.topic_name, - num_partitions=1, - replication_factor=1, + num_partitions=partitions, + replication_factor=replicas, config=topic_config ) fs = kadmin.create_topics([topic]) diff --git a/aether-producer/producer/settings.json b/aether-producer/producer/settings.json index 0252dfe8a..5e24ab374 100644 --- a/aether-producer/producer/settings.json +++ b/aether-producer/producer/settings.json @@ -16,6 +16,8 @@ "kernel_admin_password": "", "kafka_failure_wait_time": 10, "kafka_url": "kafka:29092", + "kafka_default_topic_partitions": 1, + "kafka_default_topic_replicas": 3, "kafka_settings": { "default.topic.config": { "retention.ms": -1 diff --git a/aether-producer/tests/conf/producer.json b/aether-producer/tests/conf/producer.json index 9a4977e4c..c67e8d866 100644 --- a/aether-producer/tests/conf/producer.json +++ b/aether-producer/tests/conf/producer.json @@ -16,6 +16,8 @@ "kernel_admin_password": "", "kafka_failure_wait_time": 4, "kafka_url": "kafka-test:29092", + "kafka_default_topic_partitions": 1, + "kafka_default_topic_replicas": 1, "kafka_settings": { "default.topic.config": { "retention.ms": -1