Skip to content

Commit

Permalink
kafka settings refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
dmytro-landiak committed Dec 31, 2024
1 parent a6522b6 commit ca78090
Show file tree
Hide file tree
Showing 16 changed files with 75 additions and 46 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/**
* Copyright © 2016-2025 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.mqtt.broker.queue.kafka.settings;

import org.springframework.beans.factory.annotation.Value;

public abstract class AbstractKafkaSettings implements KafkaSettings {

@Value("${queue.kafka.kafka-prefix:}")
protected String kafkaPrefix;

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
@Data
@Component
@ConfigurationProperties(prefix = "queue.kafka.application-persisted-msg")
public class ApplicationPersistenceMsgKafkaSettings {
public class ApplicationPersistenceMsgKafkaSettings extends AbstractKafkaSettings {

private String topicProperties;
private String additionalProducerConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,20 @@
package org.thingsboard.mqtt.broker.queue.kafka.settings;

import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;

@Data
@Component
@ConfigurationProperties(prefix = "queue.kafka.application-removed-event")
public class ApplicationRemovedEventKafkaSettings {
public class ApplicationRemovedEventKafkaSettings extends AbstractKafkaSettings {

private String topic;
private String topicProperties;
private String additionalProducerConfig;
private String additionalConsumerConfig;

@Value("${queue.kafka.kafka-prefix:}")
private String kafkaPrefix;

@Override
public String getKafkaTopic() {
return kafkaPrefix + topic;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
@Data
@Component
@ConfigurationProperties(prefix = "queue.kafka.application-persisted-msg.shared-topic")
public class ApplicationSharedTopicMsgKafkaSettings {
public class ApplicationSharedTopicMsgKafkaSettings extends AbstractKafkaSettings {

private String topicProperties;
private String additionalConsumerConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
@Data
@Component
@ConfigurationProperties(prefix = "queue.kafka.basic-downlink-msg")
public class BasicDownLinkPublishMsgKafkaSettings {
public class BasicDownLinkPublishMsgKafkaSettings extends AbstractKafkaSettings {

private String topicPrefix;
private String topicProperties;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,20 @@
package org.thingsboard.mqtt.broker.queue.kafka.settings;

import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;

@Data
@Component
@ConfigurationProperties(prefix = "queue.kafka.client-session-event")
public class ClientSessionEventKafkaSettings {
public class ClientSessionEventKafkaSettings extends AbstractKafkaSettings {

private String topic;
private String topicProperties;
private String additionalProducerConfig;
private String additionalConsumerConfig;

@Value("${queue.kafka.kafka-prefix:}")
private String kafkaPrefix;

@Override
public String getKafkaTopic() {
return kafkaPrefix + topic;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,20 @@
package org.thingsboard.mqtt.broker.queue.kafka.settings;

import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;

@Data
@Component
@ConfigurationProperties(prefix = "queue.kafka.client-session-event-response")
public class ClientSessionEventResponseKafkaSettings {
public class ClientSessionEventResponseKafkaSettings extends AbstractKafkaSettings {

private String topicPrefix;
private String topicProperties;
private String additionalProducerConfig;
private String additionalConsumerConfig;

@Value("${queue.kafka.kafka-prefix:}")
private String kafkaPrefix;

@Override
public String getKafkaTopicPrefix() {
return kafkaPrefix + topicPrefix;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,20 @@
package org.thingsboard.mqtt.broker.queue.kafka.settings;

import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;

@Data
@Component
@ConfigurationProperties(prefix = "queue.kafka.client-session")
public class ClientSessionKafkaSettings {
public class ClientSessionKafkaSettings extends AbstractKafkaSettings {

private String topic;
private String topicProperties;
private String additionalProducerConfig;
private String additionalConsumerConfig;

@Value("${queue.kafka.kafka-prefix:}")
private String kafkaPrefix;

@Override
public String getKafkaTopic() {
return kafkaPrefix + topic;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,20 @@
package org.thingsboard.mqtt.broker.queue.kafka.settings;

import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;

@Data
@Component
@ConfigurationProperties(prefix = "queue.kafka.client-subscriptions")
public class ClientSubscriptionsKafkaSettings {
public class ClientSubscriptionsKafkaSettings extends AbstractKafkaSettings {

private String topic;
private String topicProperties;
private String additionalProducerConfig;
private String additionalConsumerConfig;

@Value("${queue.kafka.kafka-prefix:}")
private String kafkaPrefix;

@Override
public String getKafkaTopic() {
return kafkaPrefix + topic;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,20 @@
package org.thingsboard.mqtt.broker.queue.kafka.settings;

import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;

@Data
@Component
@ConfigurationProperties(prefix = "queue.kafka.device-persisted-msg")
public class DevicePersistenceMsgKafkaSettings {
public class DevicePersistenceMsgKafkaSettings extends AbstractKafkaSettings {

private String topic;
private String topicProperties;
private String additionalProducerConfig;
private String additionalConsumerConfig;

@Value("${queue.kafka.kafka-prefix:}")
private String kafkaPrefix;

@Override
public String getKafkaTopic() {
return kafkaPrefix + topic;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
@Data
@Component
@ConfigurationProperties(prefix = "queue.kafka.disconnect-client-command")
public class DisconnectClientCommandKafkaSettings {
public class DisconnectClientCommandKafkaSettings extends AbstractKafkaSettings {

private String topicPrefix;
private String topicProperties;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
@Data
@Component
@ConfigurationProperties(prefix = "queue.kafka.historical-data-total")
public class HistoricalDataTotalKafkaSettings {
public class HistoricalDataTotalKafkaSettings extends AbstractKafkaSettings {

private String topic;
private String topicProperties;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/**
* Copyright © 2016-2025 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.mqtt.broker.queue.kafka.settings;

public interface KafkaSettings {

default String getKafkaTopic() {
return null;
}

default String getKafkaTopicPrefix() {
return null;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
@Data
@Component
@ConfigurationProperties(prefix = "queue.kafka.persisted-downlink-msg")
public class PersistentDownLinkPublishMsgKafkaSettings {
public class PersistentDownLinkPublishMsgKafkaSettings extends AbstractKafkaSettings {

private String topicPrefix;
private String topicProperties;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,20 @@
package org.thingsboard.mqtt.broker.queue.kafka.settings;

import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;

@Data
@Component
@ConfigurationProperties(prefix = "queue.kafka.msg-all")
public class PublishMsgKafkaSettings {
public class PublishMsgKafkaSettings extends AbstractKafkaSettings {

private String topic;
private String topicProperties;
private String additionalProducerConfig;
private String additionalConsumerConfig;

@Value("${queue.kafka.kafka-prefix:}")
private String kafkaPrefix;

@Override
public String getKafkaTopic() {
return kafkaPrefix + topic;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,20 @@
package org.thingsboard.mqtt.broker.queue.kafka.settings;

import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;

@Data
@Component
@ConfigurationProperties(prefix = "queue.kafka.retained-msg")
public class RetainedMsgKafkaSettings {
public class RetainedMsgKafkaSettings extends AbstractKafkaSettings {

private String topic;
private String topicProperties;
private String additionalProducerConfig;
private String additionalConsumerConfig;

@Value("${queue.kafka.kafka-prefix:}")
private String kafkaPrefix;

@Override
public String getKafkaTopic() {
return kafkaPrefix + topic;
}
Expand Down

0 comments on commit ca78090

Please sign in to comment.