Skip to content

Commit

Permalink
additional minor refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
dmytro-landiak committed Dec 30, 2024
1 parent 693db65 commit a0542d7
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package org.thingsboard.mqtt.broker.queue.provider;

import jakarta.annotation.PostConstruct;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
Expand All @@ -27,6 +28,7 @@
import org.thingsboard.mqtt.broker.queue.kafka.settings.ApplicationRemovedEventKafkaSettings;
import org.thingsboard.mqtt.broker.queue.util.QueueUtil;

import java.util.Map;
import java.util.Properties;

@Slf4j
Expand All @@ -36,13 +38,20 @@ public class KafkaApplicationRemovedEventQueueFactory extends AbstractQueueFacto

private final ApplicationRemovedEventKafkaSettings kafkaSettings;

private Map<String, String> topicConfigs;

@PostConstruct
public void init() {
this.topicConfigs = QueueUtil.getConfigs(kafkaSettings.getTopicProperties());
}

@Override
public TbQueueProducer<TbProtoQueueMsg<QueueProtos.ApplicationRemovedEventProto>> createEventProducer(String serviceId) {
TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder<TbProtoQueueMsg<QueueProtos.ApplicationRemovedEventProto>> producerBuilder = TbKafkaProducerTemplate.builder();
producerBuilder.properties(producerSettings.toProps(kafkaSettings.getAdditionalProducerConfig()));
producerBuilder.clientId(kafkaPrefix + "application-removed-event-producer-" + serviceId);
producerBuilder.defaultTopic(kafkaSettings.getKafkaTopic());
producerBuilder.topicConfigs(QueueUtil.getConfigs(kafkaSettings.getTopicProperties()));
producerBuilder.topicConfigs(topicConfigs);
producerBuilder.admin(queueAdmin);
return producerBuilder.build();
}
Expand All @@ -54,7 +63,7 @@ public TbQueueControlledOffsetConsumer<TbProtoQueueMsg<QueueProtos.ApplicationRe
QueueUtil.overrideProperties("ApplicationRemovedEventQueue-" + serviceId, props, requiredConsumerProperties);
consumerBuilder.properties(props);
consumerBuilder.topic(kafkaSettings.getKafkaTopic());
consumerBuilder.topicConfigs(QueueUtil.getConfigs(kafkaSettings.getTopicProperties()));
consumerBuilder.topicConfigs(topicConfigs);
consumerBuilder.clientId(kafkaPrefix + "application-removed-event-consumer-" + serviceId);
consumerBuilder.groupId(kafkaPrefix + "application-removed-event-consumer-group");
consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), QueueProtos.ApplicationRemovedEventProto.parseFrom(msg.getData()), msg.getHeaders(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package org.thingsboard.mqtt.broker.queue.provider;

import jakarta.annotation.PostConstruct;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
Expand All @@ -28,6 +29,8 @@
import org.thingsboard.mqtt.broker.queue.kafka.settings.ClientSessionEventResponseKafkaSettings;
import org.thingsboard.mqtt.broker.queue.util.QueueUtil;

import java.util.Map;

@Slf4j
@Component
@RequiredArgsConstructor
Expand All @@ -36,13 +39,20 @@ public class KafkaClientSessionEventQueueFactory extends AbstractQueueFactory im
private final ClientSessionEventKafkaSettings clientSessionEventSettings;
private final ClientSessionEventResponseKafkaSettings clientSessionEventResponseSettings;

private Map<String, String> clientSessionEventTopicConfigs;

@PostConstruct
public void init() {
this.clientSessionEventTopicConfigs = QueueUtil.getConfigs(clientSessionEventSettings.getTopicProperties());
}

@Override
public TbQueueProducer<TbProtoQueueMsg<QueueProtos.ClientSessionEventProto>> createEventProducer(String serviceId) {
TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder<TbProtoQueueMsg<QueueProtos.ClientSessionEventProto>> producerBuilder = TbKafkaProducerTemplate.builder();
producerBuilder.properties(producerSettings.toProps(clientSessionEventSettings.getAdditionalProducerConfig()));
producerBuilder.clientId(kafkaPrefix + "client-session-event-producer-" + serviceId);
producerBuilder.defaultTopic(clientSessionEventSettings.getKafkaTopic());
producerBuilder.topicConfigs(QueueUtil.getConfigs(clientSessionEventSettings.getTopicProperties()));
producerBuilder.topicConfigs(clientSessionEventTopicConfigs);
producerBuilder.admin(queueAdmin);
producerBuilder.statsManager(producerStatsManager);
return producerBuilder.build();
Expand All @@ -53,7 +63,7 @@ public TbQueueControlledOffsetConsumer<TbProtoQueueMsg<QueueProtos.ClientSession
TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder<TbProtoQueueMsg<QueueProtos.ClientSessionEventProto>> consumerBuilder = TbKafkaConsumerTemplate.builder();
consumerBuilder.properties(consumerSettings.toProps(clientSessionEventSettings.getKafkaTopic(), clientSessionEventSettings.getAdditionalConsumerConfig()));
consumerBuilder.topic(clientSessionEventSettings.getKafkaTopic());
consumerBuilder.topicConfigs(QueueUtil.getConfigs(clientSessionEventSettings.getTopicProperties()));
consumerBuilder.topicConfigs(clientSessionEventTopicConfigs);
consumerBuilder.clientId(kafkaPrefix + "client-session-event-consumer-" + consumerName);
consumerBuilder.groupId(kafkaPrefix + "client-session-event-consumer-group");
consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), QueueProtos.ClientSessionEventProto.parseFrom(msg.getData()), msg.getHeaders(),
Expand Down

0 comments on commit a0542d7

Please sign in to comment.