Skip to content

Commit

Permalink
extract checking topic availability from BrokerMessageProducer (#1832)
Browse files Browse the repository at this point in the history
  • Loading branch information
piotrrzysko authored Mar 25, 2024
1 parent a8829b2 commit d861cf9
Show file tree
Hide file tree
Showing 36 changed files with 803 additions and 724 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,6 @@ static HermesServer provideHermesServer() throws IOException {
new NoOpMessagePreviewPersister(),
throughputLimiter,
null,
false,
null,
null);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,12 @@ public void send(Message message, CachedTopic topic, PublishingCallback callback
}

@Override
public boolean isTopicAvailable(CachedTopic topic) {
public boolean areAllTopicsAvailable() {
return true;
}

@Override
public boolean isTopicAvailable(CachedTopic cachedTopic) {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import pl.allegro.tech.hermes.frontend.listeners.BrokerListeners;
import pl.allegro.tech.hermes.frontend.metric.CachedTopic;
import pl.allegro.tech.hermes.frontend.producer.BrokerMessageProducer;
import pl.allegro.tech.hermes.frontend.producer.BrokerTopicAvailabilityChecker;
import pl.allegro.tech.hermes.frontend.publishing.PublishingCallback;
import pl.allegro.tech.hermes.frontend.publishing.avro.AvroMessage;
import pl.allegro.tech.hermes.frontend.publishing.message.JsonMessage;
Expand Down Expand Up @@ -44,6 +45,7 @@ public class BackupMessagesLoader {

private static final Logger logger = LoggerFactory.getLogger(BackupMessagesLoader.class);

private final BrokerTopicAvailabilityChecker brokerTopicAvailabilityChecker;
private final BrokerMessageProducer brokerMessageProducer;
private final BrokerListeners brokerListeners;
private final TopicsCache topicsCache;
Expand All @@ -59,7 +61,8 @@ public class BackupMessagesLoader {
private final Set<Topic> topicsAvailabilityCache = new HashSet<>();
private final AtomicReference<ConcurrentLinkedQueue<Pair<Message, CachedTopic>>> toResend = new AtomicReference<>();

public BackupMessagesLoader(BrokerMessageProducer brokerMessageProducer,
public BackupMessagesLoader(BrokerTopicAvailabilityChecker brokerTopicAvailabilityChecker,
BrokerMessageProducer brokerMessageProducer,
BrokerListeners brokerListeners,
TopicsCache topicsCache,
SchemaRepository schemaRepository,
Expand All @@ -68,6 +71,7 @@ public BackupMessagesLoader(BrokerMessageProducer brokerMessageProducer,
BackupMessagesLoaderParameters backupMessagesLoaderParameters,
String datacenter
) {
this.brokerTopicAvailabilityChecker = brokerTopicAvailabilityChecker;
this.brokerMessageProducer = brokerMessageProducer;
this.brokerListeners = brokerListeners;
this.topicsCache = topicsCache;
Expand Down Expand Up @@ -235,7 +239,7 @@ private boolean isBrokerTopicAvailable(CachedTopic cachedTopic) {
return true;
}

if (brokerMessageProducer.isTopicAvailable(cachedTopic)) {
if (brokerTopicAvailabilityChecker.isTopicAvailable(cachedTopic)) {
topicsAvailabilityCache.add(cachedTopic.getTopic());
logger.info("Broker topic {} is available.", cachedTopic.getTopic().getQualifiedName());
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,18 @@ public BackupMessagesLoader backupMessagesLoader(BrokerMessageProducer brokerMes
SchemaRepository schemaRepository,
Trackers trackers,
LocalMessageStorageProperties localMessageStorageProperties,
DatacenterNameProvider datacenterNameProvider
) {
return new BackupMessagesLoader(brokerMessageProducer, brokerListeners, topicsCache, schemaRepository,
new SchemaExistenceEnsurer(schemaRepository), trackers, localMessageStorageProperties, datacenterNameProvider.getDatacenterName());
DatacenterNameProvider datacenterNameProvider) {
return new BackupMessagesLoader(
brokerMessageProducer,
brokerMessageProducer,
brokerListeners,
topicsCache,
schemaRepository,
new SchemaExistenceEnsurer(schemaRepository),
trackers,
localMessageStorageProperties,
datacenterNameProvider.getDatacenterName()
);
}

@Bean(initMethod = "extend")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,32 @@
package pl.allegro.tech.hermes.frontend.config;

import org.apache.kafka.clients.admin.AdminClient;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import pl.allegro.tech.hermes.common.kafka.KafkaParameters;
import pl.allegro.tech.hermes.common.metric.MetricsFacade;
import pl.allegro.tech.hermes.frontend.cache.topic.TopicsCache;
import pl.allegro.tech.hermes.frontend.producer.BrokerLatencyReporter;
import pl.allegro.tech.hermes.frontend.producer.BrokerMessageProducer;
import pl.allegro.tech.hermes.frontend.producer.kafka.KafkaBrokerMessageProducer;
import pl.allegro.tech.hermes.frontend.producer.kafka.KafkaHeaderFactory;
import pl.allegro.tech.hermes.frontend.producer.kafka.KafkaMessageProducerFactory;
import pl.allegro.tech.hermes.frontend.producer.kafka.KafkaTopicMetadataFetcher;
import pl.allegro.tech.hermes.frontend.producer.kafka.KafkaTopicMetadataFetcherFactory;
import pl.allegro.tech.hermes.frontend.producer.kafka.KafkaMessageSenders;
import pl.allegro.tech.hermes.frontend.producer.kafka.KafkaMessageSendersFactory;
import pl.allegro.tech.hermes.frontend.producer.kafka.MessageToKafkaProducerRecordConverter;
import pl.allegro.tech.hermes.frontend.producer.kafka.Producers;
import pl.allegro.tech.hermes.frontend.producer.kafka.ProducerMetadataLoadingJob;
import pl.allegro.tech.hermes.infrastructure.dc.DatacenterNameProvider;

import java.util.List;
import java.util.Properties;

import static org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
import static org.apache.kafka.clients.CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL;
import static org.apache.kafka.clients.CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG;
import static org.apache.kafka.clients.CommonClientConfigs.SECURITY_PROTOCOL_CONFIG;
import static org.apache.kafka.common.config.SaslConfigs.SASL_JAAS_CONFIG;
import static org.apache.kafka.common.config.SaslConfigs.SASL_MECHANISM;

@Configuration
@EnableConfigurationProperties({
LocalMessageStorageProperties.class,
Expand All @@ -26,11 +39,10 @@
public class FrontendProducerConfiguration {

@Bean
public BrokerMessageProducer kafkaBrokerMessageProducer(Producers producers,
KafkaTopicMetadataFetcher kafkaTopicMetadataFetcher,
public BrokerMessageProducer kafkaBrokerMessageProducer(KafkaMessageSenders kafkaMessageSenders,
MetricsFacade metricsFacade,
MessageToKafkaProducerRecordConverter messageConverter) {
return new KafkaBrokerMessageProducer(producers, kafkaTopicMetadataFetcher, metricsFacade, messageConverter);
return new KafkaBrokerMessageProducer(kafkaMessageSenders, metricsFacade, messageConverter);
}

@Bean
Expand All @@ -40,22 +52,54 @@ public KafkaHeaderFactory kafkaHeaderFactory(KafkaHeaderNameProperties kafkaHead
}

@Bean(destroyMethod = "close")
public Producers kafkaMessageProducer(KafkaClustersProperties kafkaClustersProperties,
KafkaProducerProperties kafkaProducerProperties,
LocalMessageStorageProperties localMessageStorageProperties,
DatacenterNameProvider datacenterNameProvider) {
KafkaProperties kafkaProperties = kafkaClustersProperties.toKafkaProperties(datacenterNameProvider);
return new KafkaMessageProducerFactory(kafkaProperties, kafkaProducerProperties,
localMessageStorageProperties.getBufferedSizeBytes()).provide();
public KafkaMessageSenders kafkaMessageSenders(KafkaProducerProperties kafkaProducerProperties,
KafkaMessageSendersFactory kafkaMessageSendersFactory) {
return kafkaMessageSendersFactory.provide(kafkaProducerProperties);
}

@Bean(destroyMethod = "close")
public KafkaTopicMetadataFetcher kafkaTopicMetadataFetcher(KafkaProducerProperties kafkaProducerProperties,
KafkaClustersProperties kafkaClustersProperties,
DatacenterNameProvider datacenterNameProvider) {
public KafkaMessageSendersFactory kafkaMessageSendersFactory(KafkaClustersProperties kafkaClustersProperties,
KafkaProducerProperties kafkaProducerProperties,
TopicLoadingProperties topicLoadingProperties,
TopicsCache topicsCache,
LocalMessageStorageProperties localMessageStorageProperties,
DatacenterNameProvider datacenterNameProvider) {
KafkaProperties kafkaProperties = kafkaClustersProperties.toKafkaProperties(datacenterNameProvider);
return new KafkaTopicMetadataFetcherFactory(kafkaProperties, kafkaProducerProperties.getMetadataMaxAge(),
(int) kafkaProperties.getAdminRequestTimeout().toMillis()).provide();
List<KafkaParameters> remoteKafkaProperties = kafkaClustersProperties.toRemoteKafkaProperties(datacenterNameProvider);
return new KafkaMessageSendersFactory(
kafkaProperties,
remoteKafkaProperties,
createAdminClient(kafkaProperties),
topicsCache,
topicLoadingProperties.getMetadata().getRetryCount(),
topicLoadingProperties.getMetadata().getRetryInterval(),
topicLoadingProperties.getMetadata().getThreadPoolSize(),
localMessageStorageProperties.getBufferedSizeBytes(),
kafkaProducerProperties.getMetadataMaxAge()
);
}

private static AdminClient createAdminClient(KafkaProperties kafkaProperties) {
Properties props = new Properties();
props.put(BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBrokerList());
props.put(SECURITY_PROTOCOL_CONFIG, DEFAULT_SECURITY_PROTOCOL);
props.put(REQUEST_TIMEOUT_MS_CONFIG, (int) kafkaProperties.getAdminRequestTimeout().toMillis());
if (kafkaProperties.isAuthenticationEnabled()) {
props.put(SASL_MECHANISM, kafkaProperties.getAuthenticationMechanism());
props.put(SECURITY_PROTOCOL_CONFIG, kafkaProperties.getAuthenticationProtocol());
props.put(SASL_JAAS_CONFIG, kafkaProperties.getJaasConfig());
}
return AdminClient.create(props);
}

@Bean(initMethod = "start", destroyMethod = "stop")
public ProducerMetadataLoadingJob producerMetadataLoadingJob(KafkaMessageSenders kafkaMessageSenders,
TopicLoadingProperties topicLoadingProperties) {
return new ProducerMetadataLoadingJob(
kafkaMessageSenders,
topicLoadingProperties.getMetadataRefreshJob().isEnabled(),
topicLoadingProperties.getMetadataRefreshJob().getInterval()
);
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,12 @@
import pl.allegro.tech.hermes.common.metric.MetricsFacade;
import pl.allegro.tech.hermes.common.ssl.SslContextFactory;
import pl.allegro.tech.hermes.frontend.cache.topic.TopicsCache;
import pl.allegro.tech.hermes.frontend.producer.BrokerMessageProducer;
import pl.allegro.tech.hermes.frontend.publishing.handlers.ThroughputLimiter;
import pl.allegro.tech.hermes.frontend.publishing.preview.DefaultMessagePreviewPersister;
import pl.allegro.tech.hermes.frontend.readiness.HealthCheckService;
import pl.allegro.tech.hermes.frontend.readiness.ReadinessChecker;
import pl.allegro.tech.hermes.frontend.server.HermesServer;
import pl.allegro.tech.hermes.frontend.server.SslContextFactoryProvider;
import pl.allegro.tech.hermes.frontend.server.TopicMetadataLoadingJob;
import pl.allegro.tech.hermes.frontend.server.TopicMetadataLoadingRunner;
import pl.allegro.tech.hermes.frontend.server.TopicMetadataLoadingStartupHook;
import pl.allegro.tech.hermes.frontend.server.TopicSchemaLoadingStartupHook;
import pl.allegro.tech.hermes.schema.SchemaRepository;

Expand All @@ -41,9 +37,7 @@ public HermesServer hermesServer(HermesServerProperties hermesServerProperties,
ReadinessChecker readinessChecker,
DefaultMessagePreviewPersister defaultMessagePreviewPersister,
ThroughputLimiter throughputLimiter,
TopicMetadataLoadingJob topicMetadataLoadingJob,
SslContextFactoryProvider sslContextFactoryProvider,
TopicLoadingProperties topicLoadingProperties,
PrometheusMeterRegistry prometheusMeterRegistry) {
return new HermesServer(
sslProperties,
Expand All @@ -54,10 +48,9 @@ public HermesServer hermesServer(HermesServerProperties hermesServerProperties,
readinessChecker,
defaultMessagePreviewPersister,
throughputLimiter,
topicMetadataLoadingJob,
topicLoadingProperties.getMetadataRefreshJob().isEnabled(),
sslContextFactoryProvider,
prometheusMeterRegistry);
prometheusMeterRegistry
);
}

@Bean
Expand All @@ -66,28 +59,6 @@ public SslContextFactoryProvider sslContextFactoryProvider(Optional<SslContextFa
return new SslContextFactoryProvider(sslContextFactory.orElse(null), sslProperties);
}

@Bean
public TopicMetadataLoadingJob topicMetadataLoadingJob(TopicMetadataLoadingRunner topicMetadataLoadingRunner,
TopicLoadingProperties topicLoadingProperties) {
return new TopicMetadataLoadingJob(topicMetadataLoadingRunner, topicLoadingProperties.getMetadataRefreshJob().getInterval());
}

@Bean
public TopicMetadataLoadingRunner topicMetadataLoadingRunner(BrokerMessageProducer brokerMessageProducer,
TopicsCache topicsCache,
TopicLoadingProperties topicLoadingProperties) {
return new TopicMetadataLoadingRunner(brokerMessageProducer, topicsCache,
topicLoadingProperties.getMetadata().getRetryCount(),
topicLoadingProperties.getMetadata().getRetryInterval(),
topicLoadingProperties.getMetadata().getThreadPoolSize());
}

@Bean(initMethod = "run")
public TopicMetadataLoadingStartupHook topicMetadataLoadingStartupHook(TopicMetadataLoadingRunner topicMetadataLoadingRunner,
TopicLoadingProperties topicLoadingProperties) {
return new TopicMetadataLoadingStartupHook(topicMetadataLoadingRunner, topicLoadingProperties.getMetadata().isEnabled());
}

@Bean(initMethod = "run")
public TopicSchemaLoadingStartupHook topicSchemaLoadingStartupHook(TopicsCache topicsCache,
SchemaRepository schemaRepository,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package pl.allegro.tech.hermes.frontend.config;

import org.springframework.boot.context.properties.ConfigurationProperties;
import pl.allegro.tech.hermes.common.kafka.KafkaParameters;
import pl.allegro.tech.hermes.infrastructure.dc.DatacenterNameProvider;

import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;

@ConfigurationProperties(prefix = "frontend.kafka")
public class KafkaClustersProperties {
Expand Down Expand Up @@ -47,5 +49,12 @@ public KafkaProperties toKafkaProperties(DatacenterNameProvider datacenterNamePr
.orElseThrow(() -> new IllegalArgumentException(
"No properties for datacenter: " + datacenterNameProvider.getDatacenterName() + " defined."));
}

public List<KafkaParameters> toRemoteKafkaProperties(DatacenterNameProvider datacenterNameProvider) {
return this.clusters
.stream()
.filter(cluster -> !cluster.getDatacenter().equals(datacenterNameProvider.getDatacenterName()))
.collect(Collectors.toList());
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import pl.allegro.tech.hermes.frontend.producer.BrokerTopicAvailabilityChecker;
import pl.allegro.tech.hermes.frontend.readiness.AdminReadinessService;
import pl.allegro.tech.hermes.frontend.readiness.DefaultReadinessChecker;
import pl.allegro.tech.hermes.frontend.readiness.HealthCheckService;
import pl.allegro.tech.hermes.frontend.server.TopicMetadataLoadingRunner;
import pl.allegro.tech.hermes.infrastructure.dc.DatacenterNameProvider;
import pl.allegro.tech.hermes.infrastructure.zookeeper.ZookeeperPaths;

Expand All @@ -18,10 +18,10 @@ public class ReadinessConfiguration {

@Bean
public DefaultReadinessChecker readinessChecker(ReadinessCheckProperties readinessCheckProperties,
TopicMetadataLoadingRunner topicMetadataLoadingRunner,
BrokerTopicAvailabilityChecker brokerTopicAvailabilityChecker,
AdminReadinessService adminReadinessService) {
return new DefaultReadinessChecker(
topicMetadataLoadingRunner,
brokerTopicAvailabilityChecker,
adminReadinessService,
readinessCheckProperties.isEnabled(),
readinessCheckProperties.isKafkaCheckEnabled(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,12 @@ public class TopicLoadingProperties {

public static class MetadataLoadingProperties {

private boolean enabled = false;

private Duration retryInterval = Duration.ofSeconds(1);

private int retryCount = 5;

private int threadPoolSize = 16;

public boolean isEnabled() {
return enabled;
}

public void setEnabled(boolean enabled) {
this.enabled = enabled;
}

public Duration getRetryInterval() {
return retryInterval;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,7 @@
import pl.allegro.tech.hermes.frontend.publishing.PublishingCallback;
import pl.allegro.tech.hermes.frontend.publishing.message.Message;

public interface BrokerMessageProducer {
public interface BrokerMessageProducer extends BrokerTopicAvailabilityChecker {

void send(Message message, CachedTopic topic, PublishingCallback callback);

boolean isTopicAvailable(CachedTopic topic);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package pl.allegro.tech.hermes.frontend.producer;

import pl.allegro.tech.hermes.frontend.metric.CachedTopic;

public interface BrokerTopicAvailabilityChecker {

boolean areAllTopicsAvailable();

boolean isTopicAvailable(CachedTopic cachedTopic);
}
Loading

0 comments on commit d861cf9

Please sign in to comment.