diff --git a/build.gradle b/build.gradle index bc670c4a4d..6de28b2eae 100644 --- a/build.gradle +++ b/build.gradle @@ -52,7 +52,7 @@ allprojects { guava : '23.0', jackson : '2.15.2', jersey : '3.1.2', - jetty : '12.0.5', + jetty : '12.0.7', curator : '5.4.0', dropwizard_metrics: '4.1.0', micrometer_metrics: '1.11.1', diff --git a/hermes-benchmark/src/jmh/java/pl/allegro/tech/hermes/benchmark/environment/HermesServerFactory.java b/hermes-benchmark/src/jmh/java/pl/allegro/tech/hermes/benchmark/environment/HermesServerFactory.java index 85ad8d4df0..d1e17d92ae 100644 --- a/hermes-benchmark/src/jmh/java/pl/allegro/tech/hermes/benchmark/environment/HermesServerFactory.java +++ b/hermes-benchmark/src/jmh/java/pl/allegro/tech/hermes/benchmark/environment/HermesServerFactory.java @@ -76,8 +76,6 @@ static HermesServer provideHermesServer() throws IOException { new NoOpMessagePreviewPersister(), throughputLimiter, null, - false, - null, null); } diff --git a/hermes-benchmark/src/jmh/java/pl/allegro/tech/hermes/benchmark/environment/InMemoryBrokerMessageProducer.java b/hermes-benchmark/src/jmh/java/pl/allegro/tech/hermes/benchmark/environment/InMemoryBrokerMessageProducer.java index a7f3a3d832..3d06b1c021 100644 --- a/hermes-benchmark/src/jmh/java/pl/allegro/tech/hermes/benchmark/environment/InMemoryBrokerMessageProducer.java +++ b/hermes-benchmark/src/jmh/java/pl/allegro/tech/hermes/benchmark/environment/InMemoryBrokerMessageProducer.java @@ -13,7 +13,12 @@ public void send(Message message, CachedTopic topic, PublishingCallback callback } @Override - public boolean isTopicAvailable(CachedTopic topic) { + public boolean areAllTopicsAvailable() { + return true; + } + + @Override + public boolean isTopicAvailable(CachedTopic cachedTopic) { return true; } } diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/supervisor/workload/WorkloadSupervisor.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/supervisor/workload/WorkloadSupervisor.java index be74d37319..1e85e5b351 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/supervisor/workload/WorkloadSupervisor.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/supervisor/workload/WorkloadSupervisor.java @@ -170,8 +170,8 @@ public boolean isLeader() { @Override public void onRetransmissionStarts(SubscriptionName subscription) throws Exception { - logger.info("Triggering retransmission for subscription {}", subscription); if (assignmentCache.isAssignedTo(subscription)) { + logger.info("Triggering retransmission for subscription {}", subscription); supervisor.retransmit(subscription); } } diff --git a/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/buffer/BackupMessagesLoader.java b/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/buffer/BackupMessagesLoader.java index 8d6ec417c6..b8aa287fb9 100644 --- a/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/buffer/BackupMessagesLoader.java +++ b/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/buffer/BackupMessagesLoader.java @@ -11,6 +11,7 @@ import pl.allegro.tech.hermes.frontend.listeners.BrokerListeners; import pl.allegro.tech.hermes.frontend.metric.CachedTopic; import pl.allegro.tech.hermes.frontend.producer.BrokerMessageProducer; +import pl.allegro.tech.hermes.frontend.producer.BrokerTopicAvailabilityChecker; import pl.allegro.tech.hermes.frontend.publishing.PublishingCallback; import pl.allegro.tech.hermes.frontend.publishing.avro.AvroMessage; import pl.allegro.tech.hermes.frontend.publishing.message.JsonMessage; @@ -44,6 +45,7 @@ public class BackupMessagesLoader { private static final Logger logger = LoggerFactory.getLogger(BackupMessagesLoader.class); + private final BrokerTopicAvailabilityChecker brokerTopicAvailabilityChecker; private final BrokerMessageProducer brokerMessageProducer; private final BrokerListeners brokerListeners; private final TopicsCache topicsCache; @@ -59,7 +61,8 @@ public class BackupMessagesLoader { private final Set topicsAvailabilityCache = new HashSet<>(); private final AtomicReference>> toResend = new AtomicReference<>(); - public BackupMessagesLoader(BrokerMessageProducer brokerMessageProducer, + public BackupMessagesLoader(BrokerTopicAvailabilityChecker brokerTopicAvailabilityChecker, + BrokerMessageProducer brokerMessageProducer, BrokerListeners brokerListeners, TopicsCache topicsCache, SchemaRepository schemaRepository, @@ -68,6 +71,7 @@ public BackupMessagesLoader(BrokerMessageProducer brokerMessageProducer, BackupMessagesLoaderParameters backupMessagesLoaderParameters, String datacenter ) { + this.brokerTopicAvailabilityChecker = brokerTopicAvailabilityChecker; this.brokerMessageProducer = brokerMessageProducer; this.brokerListeners = brokerListeners; this.topicsCache = topicsCache; @@ -235,7 +239,7 @@ private boolean isBrokerTopicAvailable(CachedTopic cachedTopic) { return true; } - if (brokerMessageProducer.isTopicAvailable(cachedTopic)) { + if (brokerTopicAvailabilityChecker.isTopicAvailable(cachedTopic)) { topicsAvailabilityCache.add(cachedTopic.getTopic()); logger.info("Broker topic {} is available.", cachedTopic.getTopic().getQualifiedName()); return true; diff --git a/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/config/FrontendConfiguration.java b/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/config/FrontendConfiguration.java index 23f72211a7..deffcd8af6 100644 --- a/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/config/FrontendConfiguration.java +++ b/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/config/FrontendConfiguration.java @@ -55,10 +55,18 @@ public BackupMessagesLoader backupMessagesLoader(BrokerMessageProducer brokerMes SchemaRepository schemaRepository, Trackers trackers, LocalMessageStorageProperties localMessageStorageProperties, - DatacenterNameProvider datacenterNameProvider - ) { - return new BackupMessagesLoader(brokerMessageProducer, brokerListeners, topicsCache, schemaRepository, - new SchemaExistenceEnsurer(schemaRepository), trackers, localMessageStorageProperties, datacenterNameProvider.getDatacenterName()); + DatacenterNameProvider datacenterNameProvider) { + return new BackupMessagesLoader( + brokerMessageProducer, + brokerMessageProducer, + brokerListeners, + topicsCache, + schemaRepository, + new SchemaExistenceEnsurer(schemaRepository), + trackers, + localMessageStorageProperties, + datacenterNameProvider.getDatacenterName() + ); } @Bean(initMethod = "extend") diff --git a/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/config/FrontendProducerConfiguration.java b/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/config/FrontendProducerConfiguration.java index 6438546d3d..ca9a27e5c8 100644 --- a/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/config/FrontendProducerConfiguration.java +++ b/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/config/FrontendProducerConfiguration.java @@ -1,19 +1,32 @@ package pl.allegro.tech.hermes.frontend.config; +import org.apache.kafka.clients.admin.AdminClient; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import pl.allegro.tech.hermes.common.kafka.KafkaParameters; import pl.allegro.tech.hermes.common.metric.MetricsFacade; +import pl.allegro.tech.hermes.frontend.cache.topic.TopicsCache; +import pl.allegro.tech.hermes.frontend.producer.BrokerLatencyReporter; import pl.allegro.tech.hermes.frontend.producer.BrokerMessageProducer; import pl.allegro.tech.hermes.frontend.producer.kafka.KafkaBrokerMessageProducer; import pl.allegro.tech.hermes.frontend.producer.kafka.KafkaHeaderFactory; -import pl.allegro.tech.hermes.frontend.producer.kafka.KafkaMessageProducerFactory; -import pl.allegro.tech.hermes.frontend.producer.kafka.KafkaTopicMetadataFetcher; -import pl.allegro.tech.hermes.frontend.producer.kafka.KafkaTopicMetadataFetcherFactory; +import pl.allegro.tech.hermes.frontend.producer.kafka.KafkaMessageSenders; +import pl.allegro.tech.hermes.frontend.producer.kafka.KafkaMessageSendersFactory; import pl.allegro.tech.hermes.frontend.producer.kafka.MessageToKafkaProducerRecordConverter; -import pl.allegro.tech.hermes.frontend.producer.kafka.Producers; +import pl.allegro.tech.hermes.frontend.producer.kafka.ProducerMetadataLoadingJob; import pl.allegro.tech.hermes.infrastructure.dc.DatacenterNameProvider; +import java.util.List; +import java.util.Properties; + +import static org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG; +import static org.apache.kafka.clients.CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL; +import static org.apache.kafka.clients.CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG; +import static org.apache.kafka.clients.CommonClientConfigs.SECURITY_PROTOCOL_CONFIG; +import static org.apache.kafka.common.config.SaslConfigs.SASL_JAAS_CONFIG; +import static org.apache.kafka.common.config.SaslConfigs.SASL_MECHANISM; + @Configuration @EnableConfigurationProperties({ LocalMessageStorageProperties.class, @@ -26,11 +39,10 @@ public class FrontendProducerConfiguration { @Bean - public BrokerMessageProducer kafkaBrokerMessageProducer(Producers producers, - KafkaTopicMetadataFetcher kafkaTopicMetadataFetcher, + public BrokerMessageProducer kafkaBrokerMessageProducer(KafkaMessageSenders kafkaMessageSenders, MetricsFacade metricsFacade, MessageToKafkaProducerRecordConverter messageConverter) { - return new KafkaBrokerMessageProducer(producers, kafkaTopicMetadataFetcher, metricsFacade, messageConverter); + return new KafkaBrokerMessageProducer(kafkaMessageSenders, metricsFacade, messageConverter); } @Bean @@ -40,22 +52,54 @@ public KafkaHeaderFactory kafkaHeaderFactory(KafkaHeaderNameProperties kafkaHead } @Bean(destroyMethod = "close") - public Producers kafkaMessageProducer(KafkaClustersProperties kafkaClustersProperties, - KafkaProducerProperties kafkaProducerProperties, - LocalMessageStorageProperties localMessageStorageProperties, - DatacenterNameProvider datacenterNameProvider) { - KafkaProperties kafkaProperties = kafkaClustersProperties.toKafkaProperties(datacenterNameProvider); - return new KafkaMessageProducerFactory(kafkaProperties, kafkaProducerProperties, - localMessageStorageProperties.getBufferedSizeBytes()).provide(); + public KafkaMessageSenders kafkaMessageSenders(KafkaProducerProperties kafkaProducerProperties, + KafkaMessageSendersFactory kafkaMessageSendersFactory) { + return kafkaMessageSendersFactory.provide(kafkaProducerProperties); } @Bean(destroyMethod = "close") - public KafkaTopicMetadataFetcher kafkaTopicMetadataFetcher(KafkaProducerProperties kafkaProducerProperties, - KafkaClustersProperties kafkaClustersProperties, - DatacenterNameProvider datacenterNameProvider) { + public KafkaMessageSendersFactory kafkaMessageSendersFactory(KafkaClustersProperties kafkaClustersProperties, + KafkaProducerProperties kafkaProducerProperties, + TopicLoadingProperties topicLoadingProperties, + TopicsCache topicsCache, + LocalMessageStorageProperties localMessageStorageProperties, + DatacenterNameProvider datacenterNameProvider) { KafkaProperties kafkaProperties = kafkaClustersProperties.toKafkaProperties(datacenterNameProvider); - return new KafkaTopicMetadataFetcherFactory(kafkaProperties, kafkaProducerProperties.getMetadataMaxAge(), - (int) kafkaProperties.getAdminRequestTimeout().toMillis()).provide(); + List remoteKafkaProperties = kafkaClustersProperties.toRemoteKafkaProperties(datacenterNameProvider); + return new KafkaMessageSendersFactory( + kafkaProperties, + remoteKafkaProperties, + createAdminClient(kafkaProperties), + topicsCache, + topicLoadingProperties.getMetadata().getRetryCount(), + topicLoadingProperties.getMetadata().getRetryInterval(), + topicLoadingProperties.getMetadata().getThreadPoolSize(), + localMessageStorageProperties.getBufferedSizeBytes(), + kafkaProducerProperties.getMetadataMaxAge() + ); + } + + private static AdminClient createAdminClient(KafkaProperties kafkaProperties) { + Properties props = new Properties(); + props.put(BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBrokerList()); + props.put(SECURITY_PROTOCOL_CONFIG, DEFAULT_SECURITY_PROTOCOL); + props.put(REQUEST_TIMEOUT_MS_CONFIG, (int) kafkaProperties.getAdminRequestTimeout().toMillis()); + if (kafkaProperties.isAuthenticationEnabled()) { + props.put(SASL_MECHANISM, kafkaProperties.getAuthenticationMechanism()); + props.put(SECURITY_PROTOCOL_CONFIG, kafkaProperties.getAuthenticationProtocol()); + props.put(SASL_JAAS_CONFIG, kafkaProperties.getJaasConfig()); + } + return AdminClient.create(props); + } + + @Bean(initMethod = "start", destroyMethod = "stop") + public ProducerMetadataLoadingJob producerMetadataLoadingJob(KafkaMessageSenders kafkaMessageSenders, + TopicLoadingProperties topicLoadingProperties) { + return new ProducerMetadataLoadingJob( + kafkaMessageSenders, + topicLoadingProperties.getMetadataRefreshJob().isEnabled(), + topicLoadingProperties.getMetadataRefreshJob().getInterval() + ); } @Bean diff --git a/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/config/FrontendServerConfiguration.java b/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/config/FrontendServerConfiguration.java index 2646f50cd3..a324ea7f50 100644 --- a/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/config/FrontendServerConfiguration.java +++ b/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/config/FrontendServerConfiguration.java @@ -8,16 +8,12 @@ import pl.allegro.tech.hermes.common.metric.MetricsFacade; import pl.allegro.tech.hermes.common.ssl.SslContextFactory; import pl.allegro.tech.hermes.frontend.cache.topic.TopicsCache; -import pl.allegro.tech.hermes.frontend.producer.BrokerMessageProducer; import pl.allegro.tech.hermes.frontend.publishing.handlers.ThroughputLimiter; import pl.allegro.tech.hermes.frontend.publishing.preview.DefaultMessagePreviewPersister; import pl.allegro.tech.hermes.frontend.readiness.HealthCheckService; import pl.allegro.tech.hermes.frontend.readiness.ReadinessChecker; import pl.allegro.tech.hermes.frontend.server.HermesServer; import pl.allegro.tech.hermes.frontend.server.SslContextFactoryProvider; -import pl.allegro.tech.hermes.frontend.server.TopicMetadataLoadingJob; -import pl.allegro.tech.hermes.frontend.server.TopicMetadataLoadingRunner; -import pl.allegro.tech.hermes.frontend.server.TopicMetadataLoadingStartupHook; import pl.allegro.tech.hermes.frontend.server.TopicSchemaLoadingStartupHook; import pl.allegro.tech.hermes.schema.SchemaRepository; @@ -41,9 +37,7 @@ public HermesServer hermesServer(HermesServerProperties hermesServerProperties, ReadinessChecker readinessChecker, DefaultMessagePreviewPersister defaultMessagePreviewPersister, ThroughputLimiter throughputLimiter, - TopicMetadataLoadingJob topicMetadataLoadingJob, SslContextFactoryProvider sslContextFactoryProvider, - TopicLoadingProperties topicLoadingProperties, PrometheusMeterRegistry prometheusMeterRegistry) { return new HermesServer( sslProperties, @@ -54,10 +48,9 @@ public HermesServer hermesServer(HermesServerProperties hermesServerProperties, readinessChecker, defaultMessagePreviewPersister, throughputLimiter, - topicMetadataLoadingJob, - topicLoadingProperties.getMetadataRefreshJob().isEnabled(), sslContextFactoryProvider, - prometheusMeterRegistry); + prometheusMeterRegistry + ); } @Bean @@ -66,28 +59,6 @@ public SslContextFactoryProvider sslContextFactoryProvider(Optional new IllegalArgumentException( "No properties for datacenter: " + datacenterNameProvider.getDatacenterName() + " defined.")); } + + public List toRemoteKafkaProperties(DatacenterNameProvider datacenterNameProvider) { + return this.clusters + .stream() + .filter(cluster -> !cluster.getDatacenter().equals(datacenterNameProvider.getDatacenterName())) + .collect(Collectors.toList()); + } } diff --git a/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/config/ReadinessConfiguration.java b/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/config/ReadinessConfiguration.java index d3bd78afba..8febee759e 100644 --- a/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/config/ReadinessConfiguration.java +++ b/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/config/ReadinessConfiguration.java @@ -5,10 +5,10 @@ import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import pl.allegro.tech.hermes.frontend.producer.BrokerTopicAvailabilityChecker; import pl.allegro.tech.hermes.frontend.readiness.AdminReadinessService; import pl.allegro.tech.hermes.frontend.readiness.DefaultReadinessChecker; import pl.allegro.tech.hermes.frontend.readiness.HealthCheckService; -import pl.allegro.tech.hermes.frontend.server.TopicMetadataLoadingRunner; import pl.allegro.tech.hermes.infrastructure.dc.DatacenterNameProvider; import pl.allegro.tech.hermes.infrastructure.zookeeper.ZookeeperPaths; @@ -18,10 +18,10 @@ public class ReadinessConfiguration { @Bean public DefaultReadinessChecker readinessChecker(ReadinessCheckProperties readinessCheckProperties, - TopicMetadataLoadingRunner topicMetadataLoadingRunner, + BrokerTopicAvailabilityChecker brokerTopicAvailabilityChecker, AdminReadinessService adminReadinessService) { return new DefaultReadinessChecker( - topicMetadataLoadingRunner, + brokerTopicAvailabilityChecker, adminReadinessService, readinessCheckProperties.isEnabled(), readinessCheckProperties.isKafkaCheckEnabled(), diff --git a/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/config/TopicLoadingProperties.java b/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/config/TopicLoadingProperties.java index a469047686..87b1acd1a5 100644 --- a/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/config/TopicLoadingProperties.java +++ b/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/config/TopicLoadingProperties.java @@ -15,22 +15,12 @@ public class TopicLoadingProperties { public static class MetadataLoadingProperties { - private boolean enabled = false; - private Duration retryInterval = Duration.ofSeconds(1); private int retryCount = 5; private int threadPoolSize = 16; - public boolean isEnabled() { - return enabled; - } - - public void setEnabled(boolean enabled) { - this.enabled = enabled; - } - public Duration getRetryInterval() { return retryInterval; } diff --git a/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/producer/BrokerMessageProducer.java b/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/producer/BrokerMessageProducer.java index 6ca0c78103..f09d29545b 100644 --- a/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/producer/BrokerMessageProducer.java +++ b/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/producer/BrokerMessageProducer.java @@ -4,9 +4,7 @@ import pl.allegro.tech.hermes.frontend.publishing.PublishingCallback; import pl.allegro.tech.hermes.frontend.publishing.message.Message; -public interface BrokerMessageProducer { +public interface BrokerMessageProducer extends BrokerTopicAvailabilityChecker { void send(Message message, CachedTopic topic, PublishingCallback callback); - - boolean isTopicAvailable(CachedTopic topic); } diff --git a/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/producer/BrokerTopicAvailabilityChecker.java b/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/producer/BrokerTopicAvailabilityChecker.java new file mode 100644 index 0000000000..4dd6b0b49a --- /dev/null +++ b/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/producer/BrokerTopicAvailabilityChecker.java @@ -0,0 +1,10 @@ +package pl.allegro.tech.hermes.frontend.producer; + +import pl.allegro.tech.hermes.frontend.metric.CachedTopic; + +public interface BrokerTopicAvailabilityChecker { + + boolean areAllTopicsAvailable(); + + boolean isTopicAvailable(CachedTopic cachedTopic); +} diff --git a/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/producer/kafka/KafkaBrokerMessageProducer.java b/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/producer/kafka/KafkaBrokerMessageProducer.java index d5461abd44..09cf06185d 100644 --- a/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/producer/kafka/KafkaBrokerMessageProducer.java +++ b/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/producer/kafka/KafkaBrokerMessageProducer.java @@ -22,20 +22,15 @@ public class KafkaBrokerMessageProducer implements BrokerMessageProducer { private static final Logger logger = LoggerFactory.getLogger(KafkaBrokerMessageProducer.class); - private final Producers producers; - private final KafkaTopicMetadataFetcher kafkaTopicMetadataFetcher; - private final MetricsFacade metricsFacade; + private final KafkaMessageSenders kafkaMessageSenders; private final MessageToKafkaProducerRecordConverter messageConverter; - public KafkaBrokerMessageProducer(Producers producers, - KafkaTopicMetadataFetcher kafkaTopicMetadataFetcher, + public KafkaBrokerMessageProducer(KafkaMessageSenders kafkaMessageSenders, MetricsFacade metricsFacade, MessageToKafkaProducerRecordConverter messageConverter) { - this.producers = producers; - this.kafkaTopicMetadataFetcher = kafkaTopicMetadataFetcher; - this.metricsFacade = metricsFacade; + this.kafkaMessageSenders = kafkaMessageSenders; this.messageConverter = messageConverter; - producers.registerGauges(metricsFacade); + kafkaMessageSenders.registerLocalSenderMetrics(metricsFacade); } @Override @@ -44,44 +39,19 @@ public void send(Message message, CachedTopic cachedTopic, final PublishingCallb messageConverter.convertToProducerRecord(message, cachedTopic.getKafkaTopics().getPrimary().name()); try { - producers.get(cachedTopic.getTopic()).send(producerRecord, new SendCallback(message, cachedTopic, callback)); + var producer = kafkaMessageSenders.get(cachedTopic.getTopic()); + producer.send(producerRecord, new SendCallback(message, cachedTopic, callback)); } catch (Exception e) { // message didn't get to internal producer buffer and it will not be send to a broker callback.onUnpublished(message, cachedTopic.getTopic(), e); } } - @Override - public boolean isTopicAvailable(CachedTopic cachedTopic) { - String kafkaTopicName = cachedTopic.getKafkaTopics().getPrimary().name().asString(); - - try { - List partitionInfos = producers.get(cachedTopic.getTopic()).partitionsFor(kafkaTopicName); - if (anyPartitionWithoutLeader(partitionInfos)) { - logger.warn("Topic {} has partitions without a leader.", kafkaTopicName); - return false; - } - if (anyUnderReplicatedPartition(partitionInfos, kafkaTopicName)) { - logger.warn("Topic {} has under replicated partitions.", kafkaTopicName); - return false; - } - if (partitionInfos.size() > 0) { - return true; - } - } catch (Exception e) { - logger.warn("Could not read information about partitions for topic {}. {}", kafkaTopicName, e.getMessage()); - return false; - } - - logger.warn("No information about partitions for topic {}", kafkaTopicName); - return false; - } - private Supplier produceMetadataSupplier(CachedTopic topic, RecordMetadata recordMetadata) { return () -> { String kafkaTopicName = topic.getKafkaTopics().getPrimary().name().asString(); try { - List topicPartitions = producers.get(topic.getTopic()).partitionsFor(kafkaTopicName); + List topicPartitions = kafkaMessageSenders.get(topic.getTopic()).loadPartitionMetadataFor(kafkaTopicName); Optional partitionInfo = topicPartitions.stream() .filter(p -> p.partition() == recordMetadata.partition()) @@ -99,13 +69,14 @@ private Supplier produceMetadataSupplier(CachedTopic topic, Rec }; } - private boolean anyPartitionWithoutLeader(List partitionInfos) { - return partitionInfos.stream().anyMatch(p -> p.leader() == null); + @Override + public boolean areAllTopicsAvailable() { + return kafkaMessageSenders.areAllTopicsAvailable(); } - private boolean anyUnderReplicatedPartition(List partitionInfos, String kafkaTopicName) throws Exception { - int minInSyncReplicas = kafkaTopicMetadataFetcher.fetchMinInSyncReplicas(kafkaTopicName); - return partitionInfos.stream().anyMatch(p -> p.inSyncReplicas().length < minInSyncReplicas); + @Override + public boolean isTopicAvailable(CachedTopic cachedTopic) { + return kafkaMessageSenders.isTopicAvailable(cachedTopic); } private class SendCallback implements org.apache.kafka.clients.producer.Callback { @@ -125,7 +96,6 @@ public void onCompletion(RecordMetadata recordMetadata, Exception e) { Supplier produceMetadata = produceMetadataSupplier(topic, recordMetadata); if (e == null) { callback.onPublished(message, topic.getTopic(), produceMetadata); - producers.maybeRegisterNodeMetricsGauges(metricsFacade); } else { callback.onUnpublished(message, topic.getTopic(), produceMetadata, e); } diff --git a/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/producer/kafka/KafkaMessageSender.java b/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/producer/kafka/KafkaMessageSender.java new file mode 100644 index 0000000000..2dc76d23d6 --- /dev/null +++ b/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/producer/kafka/KafkaMessageSender.java @@ -0,0 +1,90 @@ +package pl.allegro.tech.hermes.frontend.producer.kafka; + +import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.Metric; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.PartitionInfo; +import pl.allegro.tech.hermes.api.Topic; +import pl.allegro.tech.hermes.common.metric.MetricsFacade; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.function.Predicate; +import java.util.function.ToDoubleFunction; + +public class KafkaMessageSender { + + private final Producer producer; + private final String datacenter; + + KafkaMessageSender(Producer kafkaProducer, String datacenter) { + this.producer = kafkaProducer; + this.datacenter = datacenter; + } + + public String getDatacenter() { + return datacenter; + } + + public void send(ProducerRecord producerRecord, Callback callback) { + producer.send(producerRecord, callback); + } + + List loadPartitionMetadataFor(String topic) { + return producer.partitionsFor(topic); + } + + public void close() { + producer.close(); + } + + public void registerGauges(MetricsFacade metricsFacade, Topic.Ack ack) { + MetricName bufferTotalBytes = producerMetric("buffer-total-bytes", "producer-metrics", "buffer total bytes"); + MetricName bufferAvailableBytes = producerMetric("buffer-available-bytes", "producer-metrics", "buffer available bytes"); + MetricName compressionRate = producerMetric("compression-rate-avg", "producer-metrics", "average compression rate"); + MetricName failedBatches = producerMetric("record-error-total", "producer-metrics", "failed publishing batches"); + MetricName metadataAge = producerMetric("metadata-age", "producer-metrics", "age [s] of metadata"); + MetricName queueTimeMax = producerMetric("record-queue-time-max", "producer-metrics", "maximum time [ms] that batch spent in the send buffer"); + + // TODO: add 'datacenter' label + if (ack == Topic.Ack.ALL) { + metricsFacade.producer().registerAckAllTotalBytesGauge(producer, producerGauge(bufferTotalBytes)); + metricsFacade.producer().registerAckAllAvailableBytesGauge(producer, producerGauge(bufferAvailableBytes)); + metricsFacade.producer().registerAckAllCompressionRateGauge(producer, producerGauge(compressionRate)); + metricsFacade.producer().registerAckAllFailedBatchesGauge(producer, producerGauge(failedBatches)); + metricsFacade.producer().registerAckAllMetadataAgeGauge(producer, producerGauge(metadataAge)); + metricsFacade.producer().registerAckAllRecordQueueTimeMaxGauge(producer, producerGauge(queueTimeMax)); + } else if (ack == Topic.Ack.LEADER) { + metricsFacade.producer().registerAckLeaderTotalBytesGauge(producer, producerGauge(bufferTotalBytes)); + metricsFacade.producer().registerAckLeaderAvailableBytesGauge(producer, producerGauge(bufferAvailableBytes)); + metricsFacade.producer().registerAckLeaderCompressionRateGauge(producer, producerGauge(compressionRate)); + metricsFacade.producer().registerAckLeaderFailedBatchesGauge(producer, producerGauge(failedBatches)); + metricsFacade.producer().registerAckLeaderMetadataAgeGauge(producer, producerGauge(metadataAge)); + metricsFacade.producer().registerAckLeaderRecordQueueTimeMaxGauge(producer, producerGauge(queueTimeMax)); + } + } + + private double findProducerMetric(Producer producer, + Predicate> predicate) { + Optional> first = + producer.metrics().entrySet().stream().filter(predicate).findFirst(); + double value = first.map(metricNameEntry -> metricNameEntry.getValue().value()).orElse(0.0); + return value < 0 ? 0.0 : value; + } + + + private ToDoubleFunction> producerGauge(MetricName producerMetricName) { + Predicate> predicate = entry -> entry.getKey().group().equals(producerMetricName.group()) + && entry.getKey().name().equals(producerMetricName.name()); + return producer -> findProducerMetric(producer, predicate); + } + + private static MetricName producerMetric(String name, String group, String description) { + return new MetricName(name, group, description, Collections.emptyMap()); + } + +} diff --git a/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/producer/kafka/KafkaMessageSenders.java b/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/producer/kafka/KafkaMessageSenders.java new file mode 100644 index 0000000000..221afe5737 --- /dev/null +++ b/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/producer/kafka/KafkaMessageSenders.java @@ -0,0 +1,160 @@ +package pl.allegro.tech.hermes.frontend.producer.kafka; + +import org.apache.kafka.common.PartitionInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import pl.allegro.tech.hermes.api.Topic; +import pl.allegro.tech.hermes.common.metric.MetricsFacade; +import pl.allegro.tech.hermes.frontend.metric.CachedTopic; + +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +// exposes kafka producer metrics, see: https://docs.confluent.io/platform/current/kafka/monitoring.html#producer-metrics +public class KafkaMessageSenders { + + private static final Logger logger = LoggerFactory.getLogger(KafkaMessageSenders.class); + + private final KafkaMessageSender ackLeader; + private final KafkaMessageSender ackAll; + + private final List> remoteAckLeader; + private final List> remoteAckAll; + + private final MinInSyncReplicasLoader localMinInSyncReplicasLoader; + private final TopicMetadataLoadingExecutor topicMetadataLoadingExecutor; + private final List localDatacenterTopicMetadataLoaders; + private final List kafkaProducerMetadataRefreshers; + + KafkaMessageSenders(TopicMetadataLoadingExecutor topicMetadataLoadingExecutor, + MinInSyncReplicasLoader localMinInSyncReplicasLoader, + Tuple localSenders, + List remoteSenders) { + this.topicMetadataLoadingExecutor = topicMetadataLoadingExecutor; + this.localMinInSyncReplicasLoader = localMinInSyncReplicasLoader; + this.ackLeader = localSenders.ackLeader; + this.ackAll = localSenders.ackAll; + this.remoteAckLeader = remoteSenders.stream().map(it -> it.ackLeader).collect(Collectors.toList()); + this.remoteAckAll = remoteSenders.stream().map(it -> it.ackAll).collect(Collectors.toList()); + this.localDatacenterTopicMetadataLoaders = List.of( + new LocalDatacenterTopicAvailabilityChecker() + ); + this.kafkaProducerMetadataRefreshers = Stream.concat(Stream.of(localSenders), remoteSenders.stream()) + .map(KafkaProducerMetadataRefresher::new) + .collect(Collectors.toList()); + } + + KafkaMessageSender get(Topic topic) { + return topic.isReplicationConfirmRequired() ? ackAll : ackLeader; + } + + List> getRemote(Topic topic) { + return topic.isReplicationConfirmRequired() ? remoteAckLeader : remoteAckAll; + } + + void refreshTopicMetadata() { + topicMetadataLoadingExecutor.execute(kafkaProducerMetadataRefreshers); + } + + boolean areAllTopicsAvailable() { + return topicMetadataLoadingExecutor.execute(localDatacenterTopicMetadataLoaders); + } + + boolean isTopicAvailable(CachedTopic cachedTopic) { + String kafkaTopicName = cachedTopic.getKafkaTopics().getPrimary().name().asString(); + + try { + List partitionInfos = get(cachedTopic.getTopic()).loadPartitionMetadataFor(kafkaTopicName); + if (anyPartitionWithoutLeader(partitionInfos)) { + logger.warn("Topic {} has partitions without a leader.", kafkaTopicName); + return false; + } + if (anyUnderReplicatedPartition(partitionInfos, kafkaTopicName)) { + logger.warn("Topic {} has under replicated partitions.", kafkaTopicName); + return false; + } + if (!partitionInfos.isEmpty()) { + return true; + } + } catch (Exception e) { + logger.warn("Could not read information about partitions for topic {}. {}", kafkaTopicName, e.getMessage()); + return false; + } + + logger.warn("No information about partitions for topic {}", kafkaTopicName); + return false; + } + + private boolean anyPartitionWithoutLeader(List partitionInfos) { + return partitionInfos.stream().anyMatch(p -> p.leader() == null); + } + + private boolean anyUnderReplicatedPartition(List partitionInfos, String kafkaTopicName) throws Exception { + int minInSyncReplicas = localMinInSyncReplicasLoader.get(kafkaTopicName); + return partitionInfos.stream().anyMatch(p -> p.inSyncReplicas().length < minInSyncReplicas); + } + + public void registerLocalSenderMetrics(MetricsFacade metricsFacade) { + ackLeader.registerGauges(metricsFacade, Topic.Ack.LEADER); + ackAll.registerGauges(metricsFacade, Topic.Ack.ALL); + } + + static class Tuple { + private final KafkaMessageSender ackLeader; + private final KafkaMessageSender ackAll; + + Tuple(KafkaMessageSender ackLeader, KafkaMessageSender ackAll) { + this.ackLeader = ackLeader; + this.ackAll = ackAll; + } + } + + public void close() { + ackAll.close(); + ackLeader.close(); + } + + private class KafkaProducerMetadataRefresher implements TopicMetadataLoader { + + private final KafkaMessageSender ackLeader; + private final KafkaMessageSender ackAll; + + KafkaProducerMetadataRefresher(Tuple tuple) { + this.ackLeader = tuple.ackLeader; + this.ackAll = tuple.ackAll; + } + + @Override + public MetadataLoadingResult load(CachedTopic cachedTopic) { + String kafkaTopicName = cachedTopic.getKafkaTopics().getPrimary().name().asString(); + var sender = getSender(cachedTopic.getTopic()); + var partitionInfos = sender.loadPartitionMetadataFor(kafkaTopicName); + if (anyPartitionWithoutLeader(partitionInfos)) { + logger.warn("Topic {} has partitions without a leader.", kafkaTopicName); + return MetadataLoadingResult.failure(cachedTopic.getTopicName(), sender.getDatacenter()); + } + if (partitionInfos.isEmpty()) { + logger.warn("No information about partitions for topic {}", kafkaTopicName); + return MetadataLoadingResult.failure(cachedTopic.getTopicName(), sender.getDatacenter()); + } + return MetadataLoadingResult.success(cachedTopic.getTopicName(), sender.getDatacenter()); + } + + private KafkaMessageSender getSender(Topic topic) { + return topic.isReplicationConfirmRequired() ? ackAll : ackLeader; + } + } + + private class LocalDatacenterTopicAvailabilityChecker implements TopicMetadataLoader { + + @Override + public MetadataLoadingResult load(CachedTopic topic) { + String datacenter = get(topic.getTopic()).getDatacenter(); + if (isTopicAvailable(topic)) { + return MetadataLoadingResult.success(topic.getTopicName(), datacenter); + } + return MetadataLoadingResult.failure(topic.getTopicName(), datacenter); + } + } +} diff --git a/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/producer/kafka/KafkaMessageProducerFactory.java b/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/producer/kafka/KafkaMessageSendersFactory.java similarity index 59% rename from hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/producer/kafka/KafkaMessageProducerFactory.java rename to hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/producer/kafka/KafkaMessageSendersFactory.java index 266c032bce..df1ba5d9a9 100644 --- a/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/producer/kafka/KafkaMessageProducerFactory.java +++ b/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/producer/kafka/KafkaMessageSendersFactory.java @@ -1,11 +1,12 @@ package pl.allegro.tech.hermes.frontend.producer.kafka; -import com.google.common.collect.ImmutableMap; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.admin.AdminClient; import pl.allegro.tech.hermes.common.kafka.KafkaParameters; +import pl.allegro.tech.hermes.frontend.cache.topic.TopicsCache; +import java.time.Duration; import java.util.HashMap; +import java.util.List; import java.util.Map; import static org.apache.kafka.clients.CommonClientConfigs.SECURITY_PROTOCOL_CONFIG; @@ -29,24 +30,54 @@ import static org.apache.kafka.common.config.SaslConfigs.SASL_JAAS_CONFIG; import static org.apache.kafka.common.config.SaslConfigs.SASL_MECHANISM; -public class KafkaMessageProducerFactory { +public class KafkaMessageSendersFactory { private static final String ACK_ALL = "-1"; private static final String ACK_LEADER = "1"; + private final TopicMetadataLoadingExecutor topicMetadataLoadingExecutor; + private final MinInSyncReplicasLoader localMinInSyncReplicasLoader; private final KafkaParameters kafkaParameters; - private final KafkaProducerParameters kafkaProducerParameters; + private final List remoteKafkaParameters; private final long bufferedSizeBytes; - public KafkaMessageProducerFactory(KafkaParameters kafkaParameters, - KafkaProducerParameters kafkaProducerParameters, - long bufferedSizeBytes) { - this.kafkaProducerParameters = kafkaProducerParameters; + public KafkaMessageSendersFactory(KafkaParameters kafkaParameters, + List remoteKafkaParameters, + AdminClient localAdminClient, + TopicsCache topicsCache, + int retryCount, + Duration retryInterval, + int threadPoolSize, + long bufferedSizeBytes, + Duration metadataMaxAge) { + this.topicMetadataLoadingExecutor = new TopicMetadataLoadingExecutor(topicsCache, retryCount, retryInterval, threadPoolSize); + this.localMinInSyncReplicasLoader = new MinInSyncReplicasLoader(localAdminClient, metadataMaxAge); this.bufferedSizeBytes = bufferedSizeBytes; this.kafkaParameters = kafkaParameters; + this.remoteKafkaParameters = remoteKafkaParameters; } - public Producers provide() { + public KafkaMessageSenders provide(KafkaProducerParameters kafkaProducerParameters) { + KafkaMessageSenders.Tuple localProducers = new KafkaMessageSenders.Tuple( + sender(kafkaParameters, kafkaProducerParameters, ACK_LEADER), + sender(kafkaParameters, kafkaProducerParameters, ACK_ALL) + ); + + List remoteProducers = remoteKafkaParameters.stream().map( + kafkaProperties -> new KafkaMessageSenders.Tuple( + sender(kafkaProperties, kafkaProducerParameters, ACK_LEADER), + sender(kafkaProperties, kafkaProducerParameters, ACK_ALL))).toList(); + return new KafkaMessageSenders( + topicMetadataLoadingExecutor, + localMinInSyncReplicasLoader, + localProducers, + remoteProducers + ); + } + + private KafkaMessageSender sender(KafkaParameters kafkaParameters, + KafkaProducerParameters kafkaProducerParameters, + String acks) { Map props = new HashMap<>(); props.put(BOOTSTRAP_SERVERS_CONFIG, kafkaParameters.getBrokerList()); props.put(MAX_BLOCK_MS_CONFIG, (int) kafkaProducerParameters.getMaxBlock().toMillis()); @@ -64,19 +95,21 @@ public Producers provide() { props.put(LINGER_MS_CONFIG, (int) kafkaProducerParameters.getLinger().toMillis()); props.put(METRICS_SAMPLE_WINDOW_MS_CONFIG, (int) kafkaProducerParameters.getMetricsSampleWindow().toMillis()); props.put(MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, kafkaProducerParameters.getMaxInflightRequestsPerConnection()); + props.put(ACKS_CONFIG, acks); if (kafkaParameters.isAuthenticationEnabled()) { props.put(SASL_MECHANISM, kafkaParameters.getAuthenticationMechanism()); props.put(SECURITY_PROTOCOL_CONFIG, kafkaParameters.getAuthenticationProtocol()); props.put(SASL_JAAS_CONFIG, kafkaParameters.getJaasConfig()); } - - Producer leaderConfirms = new KafkaProducer<>(copyWithEntryAdded(props, ACKS_CONFIG, ACK_LEADER)); - Producer everyoneConfirms = new KafkaProducer<>(copyWithEntryAdded(props, ACKS_CONFIG, ACK_ALL)); - return new Producers(leaderConfirms, everyoneConfirms, kafkaProducerParameters.isReportNodeMetricsEnabled()); + return new KafkaMessageSender<>( + new org.apache.kafka.clients.producer.KafkaProducer<>(props), + kafkaParameters.getDatacenter() + ); } - private ImmutableMap copyWithEntryAdded(Map common, String key, String value) { - return ImmutableMap.builder().putAll(common).put(key, value).build(); + public void close() throws Exception { + topicMetadataLoadingExecutor.close(); + localMinInSyncReplicasLoader.close(); } } diff --git a/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/producer/kafka/KafkaTopicMetadataFetcherFactory.java b/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/producer/kafka/KafkaTopicMetadataFetcherFactory.java deleted file mode 100644 index 43832da5fe..0000000000 --- a/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/producer/kafka/KafkaTopicMetadataFetcherFactory.java +++ /dev/null @@ -1,43 +0,0 @@ -package pl.allegro.tech.hermes.frontend.producer.kafka; - -import org.apache.kafka.clients.admin.AdminClient; -import pl.allegro.tech.hermes.common.kafka.KafkaParameters; - -import java.time.Duration; -import java.util.Properties; - -import static org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG; -import static org.apache.kafka.clients.CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL; -import static org.apache.kafka.clients.CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG; -import static org.apache.kafka.clients.CommonClientConfigs.SECURITY_PROTOCOL_CONFIG; -import static org.apache.kafka.common.config.SaslConfigs.SASL_JAAS_CONFIG; -import static org.apache.kafka.common.config.SaslConfigs.SASL_MECHANISM; - -public class KafkaTopicMetadataFetcherFactory { - - private final KafkaParameters kafkaParameters; - - private final Duration metadataMaxAge; - - private final int requestTimeoutMs; - - public KafkaTopicMetadataFetcherFactory(KafkaParameters kafkaParameters, Duration metadataMaxAge, int requestTimeoutMs) { - this.kafkaParameters = kafkaParameters; - this.metadataMaxAge = metadataMaxAge; - this.requestTimeoutMs = requestTimeoutMs; - } - - public KafkaTopicMetadataFetcher provide() { - Properties props = new Properties(); - props.put(BOOTSTRAP_SERVERS_CONFIG, kafkaParameters.getBrokerList()); - props.put(SECURITY_PROTOCOL_CONFIG, DEFAULT_SECURITY_PROTOCOL); - props.put(REQUEST_TIMEOUT_MS_CONFIG, requestTimeoutMs); - if (kafkaParameters.isAuthenticationEnabled()) { - props.put(SASL_MECHANISM, kafkaParameters.getAuthenticationMechanism()); - props.put(SECURITY_PROTOCOL_CONFIG, kafkaParameters.getAuthenticationProtocol()); - props.put(SASL_JAAS_CONFIG, kafkaParameters.getJaasConfig()); - } - AdminClient adminClient = AdminClient.create(props); - return new KafkaTopicMetadataFetcher(adminClient, metadataMaxAge); - } -} diff --git a/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/producer/kafka/KafkaTopicMetadataFetcher.java b/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/producer/kafka/MinInSyncReplicasLoader.java similarity index 80% rename from hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/producer/kafka/KafkaTopicMetadataFetcher.java rename to hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/producer/kafka/MinInSyncReplicasLoader.java index d63c2a4af1..e3d693d221 100644 --- a/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/producer/kafka/KafkaTopicMetadataFetcher.java +++ b/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/producer/kafka/MinInSyncReplicasLoader.java @@ -12,24 +12,25 @@ import java.time.Duration; import java.util.Map; +import java.util.concurrent.ExecutionException; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.apache.kafka.common.config.ConfigResource.Type.TOPIC; import static org.apache.kafka.common.config.TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG; -public class KafkaTopicMetadataFetcher { - private final LoadingCache minInSyncReplicasCache; +class MinInSyncReplicasLoader { + private final AdminClient adminClient; + private final LoadingCache minInSyncReplicasCache; - KafkaTopicMetadataFetcher(AdminClient adminClient, Duration metadataMaxAge) { + MinInSyncReplicasLoader(AdminClient adminClient, Duration metadataMaxAge) { this.adminClient = adminClient; - this.minInSyncReplicasCache = CacheBuilder - .newBuilder() + this.minInSyncReplicasCache = CacheBuilder.newBuilder() .expireAfterWrite(metadataMaxAge.toMillis(), MILLISECONDS) - .build(new MinInSyncReplicasLoader()); + .build(new MinInSyncReplicasCacheLoader()); } - int fetchMinInSyncReplicas(String kafkaTopicName) throws Exception { + int get(String kafkaTopicName) throws ExecutionException { return minInSyncReplicasCache.get(kafkaTopicName); } @@ -37,7 +38,7 @@ void close() { adminClient.close(); } - private class MinInSyncReplicasLoader extends CacheLoader { + private class MinInSyncReplicasCacheLoader extends CacheLoader { @Override public Integer load(String kafkaTopicName) throws Exception { diff --git a/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/producer/kafka/ProducerMetadataLoadingJob.java b/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/producer/kafka/ProducerMetadataLoadingJob.java new file mode 100644 index 0000000000..30eff9e89c --- /dev/null +++ b/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/producer/kafka/ProducerMetadataLoadingJob.java @@ -0,0 +1,60 @@ +package pl.allegro.tech.hermes.frontend.producer.kafka; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; + +import java.time.Duration; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; + +/** + * The purpose of this job is to periodically refresh the cache in + * {@link org.apache.kafka.clients.producer.KafkaProducer} that stores topic metadata. + * This is especially important to avoid a cold start, i.e. when a new hermes-frontend + * instance is launched with the cache being empty. Since the producer relies on topic + * metadata to send produce requests to Kafka, if the cache is empty, the producer must + * load the metadata before sending the produce request. Fetching the metadata might be + * costly, therefore we want to avoid passing on this cost to the Hermes client. + */ +public class ProducerMetadataLoadingJob implements Runnable { + + private final KafkaMessageSenders kafkaMessageSenders; + private final ScheduledExecutorService executorService; + private final boolean enabled; + private final Duration interval; + + private ScheduledFuture job; + + public ProducerMetadataLoadingJob(KafkaMessageSenders kafkaMessageSenders, + boolean enabled, + Duration interval) { + this.kafkaMessageSenders = kafkaMessageSenders; + this.enabled = enabled; + this.interval = interval; + ThreadFactory threadFactory = new ThreadFactoryBuilder() + .setNameFormat("TopicMetadataLoadingJob-%d").build(); + this.executorService = Executors.newSingleThreadScheduledExecutor(threadFactory); + } + + @Override + public void run() { + kafkaMessageSenders.refreshTopicMetadata(); + } + + public void start() { + if (enabled) { + kafkaMessageSenders.refreshTopicMetadata(); + job = executorService.scheduleAtFixedRate(this, interval.toSeconds(), interval.toSeconds(), TimeUnit.SECONDS); + } + } + + public void stop() throws InterruptedException { + if (enabled) { + job.cancel(false); + executorService.shutdown(); + executorService.awaitTermination(1, TimeUnit.MINUTES); + } + } +} diff --git a/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/producer/kafka/Producers.java b/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/producer/kafka/Producers.java deleted file mode 100644 index 61c109eafc..0000000000 --- a/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/producer/kafka/Producers.java +++ /dev/null @@ -1,114 +0,0 @@ -package pl.allegro.tech.hermes.frontend.producer.kafka; - -import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.common.Metric; -import org.apache.kafka.common.MetricName; -import org.apache.kafka.common.Node; -import pl.allegro.tech.hermes.api.Topic; -import pl.allegro.tech.hermes.common.metric.MetricsFacade; - -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.Predicate; -import java.util.function.ToDoubleFunction; - -// exposes kafka producer metrics, see: https://docs.confluent.io/platform/current/kafka/monitoring.html#producer-metrics -public class Producers { - private final Producer ackLeader; - private final Producer ackAll; - - private final boolean reportNodeMetrics; - private final AtomicBoolean nodeMetricsRegistered = new AtomicBoolean(false); - - public Producers(Producer ackLeader, - Producer ackAll, - boolean reportNodeMetrics) { - this.ackLeader = ackLeader; - this.ackAll = ackAll; - this.reportNodeMetrics = reportNodeMetrics; - } - - public Producer get(Topic topic) { - return topic.isReplicationConfirmRequired() ? ackAll : ackLeader; - } - - public void registerGauges(MetricsFacade metricsFacade) { - MetricName bufferTotalBytes = producerMetric("buffer-total-bytes", "producer-metrics", "buffer total bytes"); - metricsFacade.producer().registerAckAllTotalBytesGauge(ackAll, producerGauge(bufferTotalBytes)); - metricsFacade.producer().registerAckLeaderTotalBytesGauge(ackLeader, producerGauge(bufferTotalBytes)); - - MetricName bufferAvailableBytes = producerMetric("buffer-available-bytes", "producer-metrics", "buffer available bytes"); - metricsFacade.producer().registerAckAllAvailableBytesGauge(ackAll, producerGauge(bufferAvailableBytes)); - metricsFacade.producer().registerAckLeaderAvailableBytesGauge(ackLeader, producerGauge(bufferAvailableBytes)); - - MetricName compressionRate = producerMetric("compression-rate-avg", "producer-metrics", "average compression rate"); - metricsFacade.producer().registerAckAllCompressionRateGauge(ackAll, producerGauge(compressionRate)); - metricsFacade.producer().registerAckLeaderCompressionRateGauge(ackLeader, producerGauge(compressionRate)); - - MetricName failedBatches = producerMetric("record-error-total", "producer-metrics", "failed publishing batches"); - metricsFacade.producer().registerAckAllFailedBatchesGauge(ackAll, producerGauge(failedBatches)); - metricsFacade.producer().registerAckLeaderFailedBatchesGauge(ackLeader, producerGauge(failedBatches)); - - MetricName metadataAge = producerMetric("metadata-age", "producer-metrics", "age [s] of metadata"); - metricsFacade.producer().registerAckAllMetadataAgeGauge(ackAll, producerGauge(metadataAge)); - metricsFacade.producer().registerAckLeaderMetadataAgeGauge(ackLeader, producerGauge(metadataAge)); - - MetricName queueTimeMax = producerMetric("record-queue-time-max", "producer-metrics", - "maximum time [ms] that batch spent in the send buffer"); - metricsFacade.producer().registerAckAllRecordQueueTimeMaxGauge(ackAll, producerGauge(queueTimeMax)); - metricsFacade.producer().registerAckLeaderRecordQueueTimeMaxGauge(ackLeader, producerGauge(queueTimeMax)); - } - - public void maybeRegisterNodeMetricsGauges(MetricsFacade metricsFacade) { - if (reportNodeMetrics && nodeMetricsRegistered.compareAndSet(false, true)) { - registerLatencyPerBrokerGauge(metricsFacade); - } - } - - private void registerLatencyPerBrokerGauge(MetricsFacade metricsFacade) { - List brokers = ProducerBrokerNodeReader.read(ackLeader); - for (Node broker : brokers) { - metricsFacade.producer().registerAckAllMaxLatencyBrokerGauge(ackAll, - producerLatencyGauge("request-latency-max", broker), broker.host()); - metricsFacade.producer().registerAckLeaderMaxLatencyPerBrokerGauge(ackLeader, - producerLatencyGauge("request-latency-max", broker), broker.host()); - metricsFacade.producer().registerAckAllAvgLatencyPerBrokerGauge(ackAll, - producerLatencyGauge("request-latency-avg", broker), broker.host()); - metricsFacade.producer().registerAckLeaderAvgLatencyPerBrokerGauge(ackLeader, - producerLatencyGauge("request-latency-avg", broker), broker.host()); - } - } - - private double findProducerMetric(Producer producer, - Predicate> predicate) { - Optional> first = - producer.metrics().entrySet().stream().filter(predicate).findFirst(); - double value = first.map(metricNameEntry -> metricNameEntry.getValue().value()).orElse(0.0); - return value < 0 ? 0.0 : value; - } - - private ToDoubleFunction> producerLatencyGauge(String producerMetricName, Node node) { - Predicate> predicate = entry -> entry.getKey().group().equals("producer-node-metrics") - && entry.getKey().name().equals(producerMetricName) - && entry.getKey().tags().containsValue("node-" + node.id()); - return producer -> findProducerMetric(producer, predicate); - } - - private ToDoubleFunction> producerGauge(MetricName producerMetricName) { - Predicate> predicate = entry -> entry.getKey().group().equals(producerMetricName.group()) - && entry.getKey().name().equals(producerMetricName.name()); - return producer -> findProducerMetric(producer, predicate); - } - - private static MetricName producerMetric(String name, String group, String description) { - return new MetricName(name, group, description, Collections.emptyMap()); - } - - public void close() { - ackAll.close(); - ackLeader.close(); - } -} diff --git a/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/producer/kafka/TopicMetadataLoader.java b/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/producer/kafka/TopicMetadataLoader.java new file mode 100644 index 0000000000..565b4a3c47 --- /dev/null +++ b/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/producer/kafka/TopicMetadataLoader.java @@ -0,0 +1,29 @@ +package pl.allegro.tech.hermes.frontend.producer.kafka; + +import pl.allegro.tech.hermes.api.TopicName; +import pl.allegro.tech.hermes.frontend.metric.CachedTopic; + +interface TopicMetadataLoader { + + MetadataLoadingResult load(CachedTopic cachedTopic); + + record MetadataLoadingResult(Type type, TopicName topicName, String datacenter) { + + static MetadataLoadingResult success(TopicName topicName, String datacenter) { + return new MetadataLoadingResult(Type.SUCCESS, topicName, datacenter); + } + + static MetadataLoadingResult failure(TopicName topicName, String datacenter) { + return new MetadataLoadingResult(Type.FAILURE, topicName, datacenter); + } + + boolean isFailure() { + return Type.FAILURE == type; + } + } + + enum Type { + SUCCESS, + FAILURE + } +} diff --git a/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/producer/kafka/TopicMetadataLoadingExecutor.java b/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/producer/kafka/TopicMetadataLoadingExecutor.java new file mode 100644 index 0000000000..32318cf43c --- /dev/null +++ b/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/producer/kafka/TopicMetadataLoadingExecutor.java @@ -0,0 +1,108 @@ +package pl.allegro.tech.hermes.frontend.producer.kafka; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import net.jodah.failsafe.Failsafe; +import net.jodah.failsafe.RetryPolicy; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import pl.allegro.tech.hermes.api.TopicName; +import pl.allegro.tech.hermes.frontend.cache.topic.TopicsCache; +import pl.allegro.tech.hermes.frontend.metric.CachedTopic; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import static java.util.concurrent.CompletableFuture.completedFuture; +import static pl.allegro.tech.hermes.frontend.producer.kafka.TopicMetadataLoader.MetadataLoadingResult; +import static pl.allegro.tech.hermes.frontend.producer.kafka.TopicMetadataLoader.Type; +import static pl.allegro.tech.hermes.frontend.utils.CompletableFuturesHelper.allComplete; + +class TopicMetadataLoadingExecutor { + + private static final Logger logger = LoggerFactory.getLogger(TopicMetadataLoadingExecutor.class); + + private final ScheduledExecutorService scheduler; + private final RetryPolicy retryPolicy; + private final TopicsCache topicsCache; + + TopicMetadataLoadingExecutor(TopicsCache topicsCache, int retryCount, Duration retryInterval, int threadPoolSize) { + this.topicsCache = topicsCache; + ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("topic-metadata-loader-%d").build(); + this.scheduler = Executors.newScheduledThreadPool(threadPoolSize, threadFactory); + this.retryPolicy = new RetryPolicy() + .withMaxRetries(retryCount) + .withDelay(retryInterval) + .handleIf((resp, cause) -> resp.isFailure()); + } + + boolean execute(List loaders) { + try { + long start = System.currentTimeMillis(); + logger.info("Loading topic metadata"); + List topics = topicsCache.getTopics(); + List allResults = loadMetadataForTopics(topics, loaders); + logger.info("Finished loading topic metadata in {}ms", System.currentTimeMillis() - start); + logResultInfo(allResults); + return allResults.stream().noneMatch(MetadataLoadingResult::isFailure); + } catch (Exception e) { + logger.error("An error occurred while refreshing topic metadata", e); + return false; + } + } + + private List loadMetadataForTopics(List topics, List loaders) { + List> completableFutures = new ArrayList<>(); + for (CachedTopic topic : topics) { + for (TopicMetadataLoader loader : loaders) { + completableFutures.add(loadTopicMetadata(topic, loader)); + } + } + return allComplete(completableFutures).join(); + } + + private CompletableFuture loadTopicMetadata(CachedTopic topic, TopicMetadataLoader loader) { + return Failsafe.with(retryPolicy).with(scheduler) + .getStageAsync((context) -> completedFuture(loader.load(topic))); + } + + private void logResultInfo(List allResults) { + Map> resultsPerDatacenter = allResults.stream() + .collect(Collectors.groupingBy(MetadataLoadingResult::datacenter, Collectors.toList())); + + for (Map.Entry> datacenterResults : resultsPerDatacenter.entrySet()) { + Map> groupedResults = getGroupedResults(datacenterResults.getValue()); + Optional> successes = Optional.ofNullable(groupedResults.get(Type.SUCCESS)); + Optional> failures = Optional.ofNullable(groupedResults.get(Type.FAILURE)); + + logger.info("Results of loading topic metadata from datacenter {}: successfully loaded {} topics, failed for {} topics{}", + datacenterResults.getKey(), + successes.map(List::size).orElse(0), + failures.map(List::size).orElse(0), + failures.map(results -> String.format("Failed topics: [%s].", topicsOfResults(results))).orElse("") + ); + } + } + + private Map> getGroupedResults(List allResults) { + return allResults.stream().collect(Collectors.groupingBy(MetadataLoadingResult::type, Collectors.toList())); + } + + private String topicsOfResults(List results) { + return results.stream().map(MetadataLoadingResult::topicName).map(TopicName::qualifiedName) + .collect(Collectors.joining(", ")); + } + + void close() throws Exception { + scheduler.shutdown(); + scheduler.awaitTermination(1, TimeUnit.SECONDS); + } +} diff --git a/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/readiness/DefaultReadinessChecker.java b/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/readiness/DefaultReadinessChecker.java index aa7ca002ce..0b6f066cf0 100644 --- a/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/readiness/DefaultReadinessChecker.java +++ b/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/readiness/DefaultReadinessChecker.java @@ -1,39 +1,34 @@ package pl.allegro.tech.hermes.frontend.readiness; import com.google.common.util.concurrent.ThreadFactoryBuilder; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import pl.allegro.tech.hermes.frontend.server.MetadataLoadingResult; -import pl.allegro.tech.hermes.frontend.server.TopicMetadataLoadingRunner; +import pl.allegro.tech.hermes.frontend.producer.BrokerTopicAvailabilityChecker; import java.time.Duration; -import java.util.List; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; public class DefaultReadinessChecker implements ReadinessChecker { - private static final Logger logger = LoggerFactory.getLogger(DefaultReadinessChecker.class); private final boolean enabled; - private final boolean kafkaCheckEnabled; + private final boolean topicsCheckEnabled; private final Duration interval; - private final TopicMetadataLoadingRunner topicMetadataLoadingRunner; + private final BrokerTopicAvailabilityChecker brokerTopicAvailabilityChecker; private final ScheduledExecutorService scheduler; private final AdminReadinessService adminReadinessService; private volatile boolean ready = false; - public DefaultReadinessChecker(TopicMetadataLoadingRunner topicMetadataLoadingRunner, + public DefaultReadinessChecker(BrokerTopicAvailabilityChecker brokerTopicAvailabilityChecker, AdminReadinessService adminReadinessService, boolean enabled, - boolean kafkaCheckEnabled, + boolean topicsCheckEnabled, Duration interval) { this.enabled = enabled; - this.kafkaCheckEnabled = kafkaCheckEnabled; + this.topicsCheckEnabled = topicsCheckEnabled; this.interval = interval; - this.topicMetadataLoadingRunner = topicMetadataLoadingRunner; + this.brokerTopicAvailabilityChecker = brokerTopicAvailabilityChecker; this.adminReadinessService = adminReadinessService; ThreadFactory threadFactory = new ThreadFactoryBuilder() .setNameFormat("ReadinessChecker-%d").build(); @@ -64,31 +59,21 @@ public void stop() throws InterruptedException { } private class ReadinessCheckerJob implements Runnable { - private volatile boolean kafkaReady = false; + private volatile boolean allTopicsAvailable = false; @Override public void run() { if (!adminReadinessService.isLocalDatacenterReady()) { ready = false; - } else if (kafkaReady) { + } else if (allTopicsAvailable) { ready = true; + } else if (topicsCheckEnabled) { + allTopicsAvailable = brokerTopicAvailabilityChecker.areAllTopicsAvailable(); + ready = allTopicsAvailable; } else { - kafkaReady = checkKafkaReadiness(); - ready = kafkaReady; - } - } - - private boolean checkKafkaReadiness() { - if (kafkaCheckEnabled) { - try { - List results = topicMetadataLoadingRunner.refreshMetadata(); - return results.stream().noneMatch(MetadataLoadingResult::isFailure); - } catch (Exception ex) { - logger.warn("Unexpected error occurred during checking Kafka readiness", ex); - return false; - } + allTopicsAvailable = true; + ready = true; } - return true; } } } diff --git a/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/server/HermesServer.java b/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/server/HermesServer.java index da15daf716..aa68eab291 100644 --- a/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/server/HermesServer.java +++ b/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/server/HermesServer.java @@ -35,8 +35,6 @@ public class HermesServer { private final ReadinessChecker readinessChecker; private final MessagePreviewPersister messagePreviewPersister; private final ThroughputLimiter throughputLimiter; - private final TopicMetadataLoadingJob topicMetadataLoadingJob; - private final boolean topicMetadataLoadingJobEnabled; private final SslContextFactoryProvider sslContextFactoryProvider; private final PrometheusMeterRegistry prometheusMeterRegistry; private Undertow undertow; @@ -51,8 +49,6 @@ public HermesServer( ReadinessChecker readinessChecker, MessagePreviewPersister messagePreviewPersister, ThroughputLimiter throughputLimiter, - TopicMetadataLoadingJob topicMetadataLoadingJob, - boolean topicMetadataLoadingJobEnabled, SslContextFactoryProvider sslContextFactoryProvider, PrometheusMeterRegistry prometheusMeterRegistry) { @@ -64,8 +60,6 @@ public HermesServer( this.healthCheckService = healthCheckService; this.readinessChecker = readinessChecker; this.messagePreviewPersister = messagePreviewPersister; - this.topicMetadataLoadingJob = topicMetadataLoadingJob; - this.topicMetadataLoadingJobEnabled = topicMetadataLoadingJobEnabled; this.sslContextFactoryProvider = sslContextFactoryProvider; this.throughputLimiter = throughputLimiter; } @@ -74,10 +68,6 @@ public void start() { configureServer().start(); messagePreviewPersister.start(); throughputLimiter.start(); - - if (topicMetadataLoadingJobEnabled) { - topicMetadataLoadingJob.start(); - } healthCheckService.startup(); readinessChecker.start(); } @@ -101,10 +91,6 @@ public void shutdown() throws InterruptedException { undertow.stop(); messagePreviewPersister.shutdown(); throughputLimiter.stop(); - - if (topicMetadataLoadingJobEnabled) { - topicMetadataLoadingJob.stop(); - } readinessChecker.stop(); } diff --git a/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/server/MetadataLoadingResult.java b/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/server/MetadataLoadingResult.java deleted file mode 100644 index 9008e668ef..0000000000 --- a/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/server/MetadataLoadingResult.java +++ /dev/null @@ -1,37 +0,0 @@ -package pl.allegro.tech.hermes.frontend.server; - -import pl.allegro.tech.hermes.api.TopicName; - -public final class MetadataLoadingResult { - - enum Type { SUCCESS, FAILURE } - - private final Type type; - - private final TopicName topicName; - - MetadataLoadingResult(Type type, TopicName topicName) { - this.type = type; - this.topicName = topicName; - } - - static MetadataLoadingResult success(TopicName topicName) { - return new MetadataLoadingResult(Type.SUCCESS, topicName); - } - - static MetadataLoadingResult failure(TopicName topicName) { - return new MetadataLoadingResult(Type.FAILURE, topicName); - } - - Type getType() { - return type; - } - - TopicName getTopicName() { - return topicName; - } - - public boolean isFailure() { - return Type.FAILURE == type; - } -} diff --git a/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/server/TopicMetadataLoader.java b/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/server/TopicMetadataLoader.java deleted file mode 100644 index db538116e1..0000000000 --- a/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/server/TopicMetadataLoader.java +++ /dev/null @@ -1,64 +0,0 @@ -package pl.allegro.tech.hermes.frontend.server; - -import com.google.common.util.concurrent.ThreadFactoryBuilder; -import net.jodah.failsafe.ExecutionContext; -import net.jodah.failsafe.Failsafe; -import net.jodah.failsafe.RetryPolicy; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import pl.allegro.tech.hermes.frontend.metric.CachedTopic; -import pl.allegro.tech.hermes.frontend.producer.BrokerMessageProducer; - -import java.time.Duration; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; - -import static java.util.concurrent.CompletableFuture.completedFuture; - -class TopicMetadataLoader implements AutoCloseable { - - private static final Logger logger = LoggerFactory.getLogger(TopicMetadataLoader.class); - - private final BrokerMessageProducer brokerMessageProducer; - - private final ScheduledExecutorService scheduler; - - private final RetryPolicy retryPolicy; - - TopicMetadataLoader(BrokerMessageProducer brokerMessageProducer, - int retryCount, - Duration retryInterval, - int threadPoolSize) { - - this.brokerMessageProducer = brokerMessageProducer; - ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("topic-metadata-loader-%d").build(); - this.scheduler = Executors.newScheduledThreadPool(threadPoolSize, threadFactory); - this.retryPolicy = new RetryPolicy() - .withMaxRetries(retryCount) - .withDelay(retryInterval) - .handleIf((resp, cause) -> resp.isFailure()); - } - - CompletableFuture loadTopicMetadata(CachedTopic topic) { - return Failsafe.with(retryPolicy).with(scheduler) - .getStageAsync((context) -> completedFuture(fetchTopicMetadata(topic, context))); - } - - private MetadataLoadingResult fetchTopicMetadata(CachedTopic topic, ExecutionContext context) { - int attempt = context.getAttemptCount(); - if (brokerMessageProducer.isTopicAvailable(topic)) { - return MetadataLoadingResult.success(topic.getTopicName()); - } - logger.warn("Failed to load metadata for topic {}, attempt #{}", topic.getQualifiedName(), attempt); - return MetadataLoadingResult.failure(topic.getTopicName()); - } - - @Override - public void close() throws Exception { - scheduler.shutdown(); - scheduler.awaitTermination(1, TimeUnit.SECONDS); - } -} diff --git a/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/server/TopicMetadataLoadingJob.java b/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/server/TopicMetadataLoadingJob.java deleted file mode 100644 index bfbe1f9ea3..0000000000 --- a/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/server/TopicMetadataLoadingJob.java +++ /dev/null @@ -1,52 +0,0 @@ -package pl.allegro.tech.hermes.frontend.server; - -import com.google.common.util.concurrent.ThreadFactoryBuilder; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.time.Duration; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; - -public class TopicMetadataLoadingJob implements Runnable { - - private static final Logger logger = LoggerFactory.getLogger(TopicMetadataLoadingJob.class); - - private final TopicMetadataLoadingRunner topicMetadataLoadingRunner; - private final ScheduledExecutorService executorService; - private final Duration interval; - - private ScheduledFuture job; - - public TopicMetadataLoadingJob(TopicMetadataLoadingRunner topicMetadataLoadingRunner, Duration interval) { - this.topicMetadataLoadingRunner = topicMetadataLoadingRunner; - this.interval = interval; - - ThreadFactory threadFactory = new ThreadFactoryBuilder() - .setNameFormat("TopicMetadataLoadingJob-%d").build(); - this.executorService = Executors.newSingleThreadScheduledExecutor(threadFactory); - } - - @Override - public void run() { - try { - topicMetadataLoadingRunner.refreshMetadata(); - } catch (Exception e) { - logger.error("An error occurred while refreshing topic metadata", e); - } - } - - public void start() { - job = executorService.scheduleAtFixedRate(this, interval.toSeconds(), interval.toSeconds(), TimeUnit.SECONDS); - } - - public void stop() throws InterruptedException { - job.cancel(false); - executorService.shutdown(); - executorService.awaitTermination(1, TimeUnit.MINUTES); - } - -} \ No newline at end of file diff --git a/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/server/TopicMetadataLoadingRunner.java b/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/server/TopicMetadataLoadingRunner.java deleted file mode 100644 index dfb59b856d..0000000000 --- a/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/server/TopicMetadataLoadingRunner.java +++ /dev/null @@ -1,81 +0,0 @@ -package pl.allegro.tech.hermes.frontend.server; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import pl.allegro.tech.hermes.api.TopicName; -import pl.allegro.tech.hermes.frontend.cache.topic.TopicsCache; -import pl.allegro.tech.hermes.frontend.metric.CachedTopic; -import pl.allegro.tech.hermes.frontend.producer.BrokerMessageProducer; - -import java.time.Duration; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.stream.Collectors; - -import static java.util.stream.Collectors.toList; -import static pl.allegro.tech.hermes.frontend.server.CompletableFuturesHelper.allComplete; -import static pl.allegro.tech.hermes.frontend.server.MetadataLoadingResult.Type.FAILURE; -import static pl.allegro.tech.hermes.frontend.server.MetadataLoadingResult.Type.SUCCESS; - -public class TopicMetadataLoadingRunner { - - private static final Logger logger = LoggerFactory.getLogger(TopicMetadataLoadingRunner.class); - - private final BrokerMessageProducer brokerMessageProducer; - - private final TopicsCache topicsCache; - - private final int retryCount; - - private final Duration retryInterval; - - private final int threadPoolSize; - - public TopicMetadataLoadingRunner(BrokerMessageProducer brokerMessageProducer, - TopicsCache topicsCache, - int retryCount, - Duration retryInterval, - int threadPoolSize) { - this.brokerMessageProducer = brokerMessageProducer; - this.topicsCache = topicsCache; - this.retryCount = retryCount; - this.retryInterval = retryInterval; - this.threadPoolSize = threadPoolSize; - } - - public List refreshMetadata() throws Exception { - long start = System.currentTimeMillis(); - logger.info("Loading topics metadata"); - List topics = topicsCache.getTopics(); - List allResults = loadMetadataForTopics(topics); - logResultInfo(allResults, System.currentTimeMillis() - start); - return allResults; - } - - private List loadMetadataForTopics(List topics) throws Exception { - try (TopicMetadataLoader loader = new TopicMetadataLoader(brokerMessageProducer, retryCount, retryInterval, threadPoolSize)) { - return allComplete(topics.stream().map(loader::loadTopicMetadata).collect(toList())).join(); - } - } - - private void logResultInfo(List allResults, long elapsed) { - Map> groupedResults = getGroupedResults(allResults); - Optional> successes = Optional.ofNullable(groupedResults.get(SUCCESS)); - Optional> failures = Optional.ofNullable(groupedResults.get(FAILURE)); - - logger.info("Finished loading metadata for {} topics in {}ms [successful: {}, failed: {}]. {}", - allResults.size(), elapsed, successes.map(List::size).orElse(0), failures.map(List::size).orElse(0), - failures.map(results -> String.format("Failed for: [%s].", topicsOfResults(results))).orElse("")); - } - - private Map> getGroupedResults(List allResults) { - return allResults.stream().collect(Collectors.groupingBy(MetadataLoadingResult::getType, Collectors.toList())); - } - - private String topicsOfResults(List results) { - return results.stream().map(MetadataLoadingResult::getTopicName).map(TopicName::qualifiedName) - .collect(Collectors.joining(", ")); - } - -} \ No newline at end of file diff --git a/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/server/TopicMetadataLoadingStartupHook.java b/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/server/TopicMetadataLoadingStartupHook.java deleted file mode 100644 index cf6dcd94af..0000000000 --- a/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/server/TopicMetadataLoadingStartupHook.java +++ /dev/null @@ -1,30 +0,0 @@ -package pl.allegro.tech.hermes.frontend.server; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class TopicMetadataLoadingStartupHook { - private static final Logger logger = LoggerFactory.getLogger(TopicMetadataLoadingStartupHook.class); - - private final TopicMetadataLoadingRunner topicMetadataLoadingRunner; - - private final boolean isTopicMetadataLoadingStartupHookEnabled; - - public TopicMetadataLoadingStartupHook(TopicMetadataLoadingRunner topicMetadataLoadingRunner, - boolean isTopicMetadataLoadingStartupHookEnabled) { - this.topicMetadataLoadingRunner = topicMetadataLoadingRunner; - this.isTopicMetadataLoadingStartupHookEnabled = isTopicMetadataLoadingStartupHookEnabled; - } - - public void run() { - if (isTopicMetadataLoadingStartupHookEnabled) { - try { - topicMetadataLoadingRunner.refreshMetadata(); - } catch (Exception e) { - logger.error("An error occurred while refreshing topic metadata", e); - } - } else { - logger.info("Topic metadata loading startup hook is disabled"); - } - } -} diff --git a/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/server/TopicSchemaLoadingStartupHook.java b/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/server/TopicSchemaLoadingStartupHook.java index 407d631d3d..33fad952ad 100644 --- a/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/server/TopicSchemaLoadingStartupHook.java +++ b/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/server/TopicSchemaLoadingStartupHook.java @@ -17,10 +17,10 @@ import static java.util.stream.Collectors.joining; import static java.util.stream.Collectors.toList; -import static pl.allegro.tech.hermes.frontend.server.CompletableFuturesHelper.allComplete; import static pl.allegro.tech.hermes.frontend.server.SchemaLoadingResult.Type.FAILURE; import static pl.allegro.tech.hermes.frontend.server.SchemaLoadingResult.Type.MISSING; import static pl.allegro.tech.hermes.frontend.server.SchemaLoadingResult.Type.SUCCESS; +import static pl.allegro.tech.hermes.frontend.utils.CompletableFuturesHelper.allComplete; public class TopicSchemaLoadingStartupHook { diff --git a/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/server/CompletableFuturesHelper.java b/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/utils/CompletableFuturesHelper.java similarity index 62% rename from hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/server/CompletableFuturesHelper.java rename to hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/utils/CompletableFuturesHelper.java index 006ac6fef1..b3db4b9760 100644 --- a/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/server/CompletableFuturesHelper.java +++ b/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/utils/CompletableFuturesHelper.java @@ -1,13 +1,13 @@ -package pl.allegro.tech.hermes.frontend.server; +package pl.allegro.tech.hermes.frontend.utils; import java.util.List; import java.util.concurrent.CompletableFuture; import static java.util.stream.Collectors.toList; -class CompletableFuturesHelper { +public class CompletableFuturesHelper { - static CompletableFuture> allComplete(List> futures) { + public static CompletableFuture> allComplete(List> futures) { return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])) .thenApply(v -> futures.stream().map(CompletableFuture::join).collect(toList())); } diff --git a/hermes-frontend/src/test/groovy/pl/allegro/tech/hermes/frontend/producer/kafka/KafkaBrokerMessageProducerIntegrationTest.groovy b/hermes-frontend/src/test/groovy/pl/allegro/tech/hermes/frontend/producer/kafka/KafkaBrokerMessageProducerIntegrationTest.groovy index 08246d5de9..f2486d36c1 100644 --- a/hermes-frontend/src/test/groovy/pl/allegro/tech/hermes/frontend/producer/kafka/KafkaBrokerMessageProducerIntegrationTest.groovy +++ b/hermes-frontend/src/test/groovy/pl/allegro/tech/hermes/frontend/producer/kafka/KafkaBrokerMessageProducerIntegrationTest.groovy @@ -24,9 +24,8 @@ import pl.allegro.tech.hermes.common.kafka.KafkaNamesMapper import pl.allegro.tech.hermes.common.metric.HermesMetrics import pl.allegro.tech.hermes.common.metric.MetricsFacade import pl.allegro.tech.hermes.frontend.config.HTTPHeadersProperties -import pl.allegro.tech.hermes.frontend.config.SchemaProperties import pl.allegro.tech.hermes.frontend.config.KafkaHeaderNameProperties -import pl.allegro.tech.hermes.frontend.config.KafkaProducerProperties +import pl.allegro.tech.hermes.frontend.config.SchemaProperties import pl.allegro.tech.hermes.frontend.metric.CachedTopic import pl.allegro.tech.hermes.frontend.publishing.avro.AvroMessage import pl.allegro.tech.hermes.frontend.server.CachedTopicsTestHelper @@ -69,7 +68,7 @@ class KafkaBrokerMessageProducerIntegrationTest extends Specification { KafkaNamesMapper kafkaNamesMapper = new JsonToAvroMigrationKafkaNamesMapper("", "_") @Shared - Producers producers + KafkaMessageSenders producers @Shared String containerId @@ -84,7 +83,7 @@ class KafkaBrokerMessageProducerIntegrationTest extends Specification { KafkaHeaderNameProperties kafkaHeaderNameProperties = new KafkaHeaderNameProperties() @Shared - KafkaProducerProperties kafkaProducerProperties = new KafkaProducerProperties() + String datacenter = "dc"; def setupSpec() { kafkaContainer.start() @@ -101,9 +100,18 @@ class KafkaBrokerMessageProducerIntegrationTest extends Specification { } def setup() { - producers = new Producers(leaderConfirms, everyoneConfirms, kafkaProducerProperties.isReportNodeMetricsEnabled()) + TopicMetadataLoadingExecutor topicMetadataLoadingExecutor = Mock() + MinInSyncReplicasLoader minInSyncReplicasLoader = Mock() + producers = new KafkaMessageSenders( + topicMetadataLoadingExecutor, + minInSyncReplicasLoader, + new KafkaMessageSenders.Tuple( + new KafkaMessageSender(leaderConfirms, datacenter), + new KafkaMessageSender(everyoneConfirms, datacenter) + ), + Collections.emptyList() + ) brokerMessageProducer = new KafkaBrokerMessageProducer(producers, - new KafkaTopicMetadataFetcher(adminClient, kafkaProducerProperties.getMetadataMaxAge()), new MetricsFacade(new SimpleMeterRegistry(), new HermesMetrics(new MetricRegistry(), new PathsCompiler("localhost"))), new MessageToKafkaProducerRecordConverter(new KafkaHeaderFactory( kafkaHeaderNameProperties, diff --git a/hermes-frontend/src/test/groovy/pl/allegro/tech/hermes/frontend/producer/kafka/TopicMetadataLoadingExecutorTest.groovy b/hermes-frontend/src/test/groovy/pl/allegro/tech/hermes/frontend/producer/kafka/TopicMetadataLoadingExecutorTest.groovy new file mode 100644 index 0000000000..cdbf903c09 --- /dev/null +++ b/hermes-frontend/src/test/groovy/pl/allegro/tech/hermes/frontend/producer/kafka/TopicMetadataLoadingExecutorTest.groovy @@ -0,0 +1,109 @@ +package pl.allegro.tech.hermes.frontend.producer.kafka + +import com.google.common.collect.ImmutableList +import pl.allegro.tech.hermes.frontend.cache.topic.TopicsCache +import pl.allegro.tech.hermes.frontend.metric.CachedTopic +import spock.lang.Shared +import spock.lang.Specification + +import java.time.Duration + +import static pl.allegro.tech.hermes.frontend.producer.kafka.TopicMetadataLoader.* +import static pl.allegro.tech.hermes.frontend.producer.kafka.TopicMetadataLoader.MetadataLoadingResult.* +import static pl.allegro.tech.hermes.frontend.server.CachedTopicsTestHelper.cachedTopic + +class TopicMetadataLoadingExecutorTest extends Specification { + + @Shared + List topics = ["g1.topicA", "g1.topicB", "g2.topicC"] + + @Shared + Map cachedTopics = new HashMap<>() + + @Shared + TopicsCache topicsCache + + def setupSpec() { + for (String topic : topics) { + cachedTopics.put(topic, cachedTopic(topic)) + } + topicsCache = Mock() { + getTopics() >> ImmutableList.copyOf(cachedTopics.values()) + } + } + + def "should load topic metadata"() { + given: + TopicMetadataLoader loader = Mock() + def executor = new TopicMetadataLoadingExecutor(topicsCache, 2, Duration.ofSeconds(10), 2) + + when: + boolean status = executor.execute([loader]) + + then: + for (String topic : topics) { + 1 * loader.load(cachedTopics.get(topic)) >> success(topic) + } + status + } + + def "should retry loading topic metadata"() { + given: + TopicMetadataLoader loader = Mock() + def executor = new TopicMetadataLoadingExecutor(topicsCache, 2, Duration.ofSeconds(10), 4) + + when: + boolean status = executor.execute([loader]) + + then: + 1 * loader.load(cachedTopics.get("g1.topicA")) >> failure("g1.topicA") + 1 * loader.load(cachedTopics.get("g1.topicA")) >> success("g1.topicA") + + 1 * loader.load(cachedTopics.get("g1.topicB")) >> success("g1.topicB") + + 2 * loader.load(cachedTopics.get("g2.topicC")) >> failure("g2.topicC") + 1 * loader.load(cachedTopics.get("g2.topicC")) >> success("g2.topicC") + + status + } + + def "should leave retry loop when reached max retries and failed to load metadata"() { + given: + TopicMetadataLoader loader = Mock() + def executor = new TopicMetadataLoadingExecutor(topicsCache, 2, Duration.ofSeconds(10), 4) + + when: + boolean status = executor.execute([loader]) + + then: + 3 * loader.load(cachedTopics.get("g1.topicA")) >> failure("g1.topicA") + 1 * loader.load(cachedTopics.get("g1.topicB")) >> success("g1.topicB") + 1 * loader.load(cachedTopics.get("g2.topicC")) >> success("g2.topicC") + + !status + } + + def "should not throw exception when no topics exist"() { + given: + TopicMetadataLoader loader = Mock() + TopicsCache emptyCache = Mock() { + getTopics() >> [] + } + def executor = new TopicMetadataLoadingExecutor(emptyCache, 2, Duration.ofSeconds(10), 4) + + when: + boolean status = executor.execute([loader]) + + then: + noExceptionThrown() + status + } + + private MetadataLoadingResult success(String topicName) { + return success(cachedTopics.get(topicName).topic.name, "dc1") + } + + private MetadataLoadingResult failure(String topicName) { + return failure(cachedTopics.get(topicName).topic.name, "dc1") + } +} diff --git a/hermes-frontend/src/test/groovy/pl/allegro/tech/hermes/frontend/server/TopicMetadataLoadingRunnerTest.groovy b/hermes-frontend/src/test/groovy/pl/allegro/tech/hermes/frontend/server/TopicMetadataLoadingRunnerTest.groovy deleted file mode 100644 index 3adff22679..0000000000 --- a/hermes-frontend/src/test/groovy/pl/allegro/tech/hermes/frontend/server/TopicMetadataLoadingRunnerTest.groovy +++ /dev/null @@ -1,94 +0,0 @@ -package pl.allegro.tech.hermes.frontend.server - -import com.google.common.collect.ImmutableList -import pl.allegro.tech.hermes.frontend.cache.topic.TopicsCache -import pl.allegro.tech.hermes.frontend.metric.CachedTopic -import pl.allegro.tech.hermes.frontend.producer.BrokerMessageProducer -import spock.lang.Shared -import spock.lang.Specification - -import java.time.Duration - -import static pl.allegro.tech.hermes.frontend.server.CachedTopicsTestHelper.cachedTopic - -class TopicMetadataLoadingRunnerTest extends Specification { - - @Shared - List topics = ["g1.topicA", "g1.topicB", "g2.topicC"] - - @Shared - Map cachedTopics = new HashMap<>() - - @Shared - TopicsCache topicsCache - - def setupSpec() { - for (String topic : topics) { - cachedTopics.put(topic, cachedTopic(topic)) - } - topicsCache = Mock() { - getTopics() >> ImmutableList.copyOf(cachedTopics.values()) - } - } - - def "should load topic metadata"() { - given: - BrokerMessageProducer producer = Mock() - def hook = new TopicMetadataLoadingRunner(producer, topicsCache, 2, Duration.ofSeconds(10), 2) - - when: - hook.refreshMetadata() - - then: - for (String topic : topics) { - 1 * producer.isTopicAvailable(cachedTopics.get(topic)) >> true - } - } - - def "should retry loading topic metadata"() { - given: - BrokerMessageProducer producer = Mock() - def hook = new TopicMetadataLoadingRunner(producer, topicsCache, 2, Duration.ofSeconds(10), 4) - - when: - hook.refreshMetadata() - - then: - 1 * producer.isTopicAvailable(cachedTopics.get("g1.topicA")) >> false - 1 * producer.isTopicAvailable(cachedTopics.get("g1.topicA")) >> true - - 1 * producer.isTopicAvailable(cachedTopics.get("g1.topicB")) >> true - - 2 * producer.isTopicAvailable(cachedTopics.get("g2.topicC")) >> false - 1 * producer.isTopicAvailable(cachedTopics.get("g2.topicC")) >> true - } - - def "should leave retry loop when reached max retries and failed to load metadata"() { - given: - BrokerMessageProducer producer = Mock() - def hook = new TopicMetadataLoadingRunner(producer, topicsCache, 2, Duration.ofSeconds(10), 4) - - when: - hook.refreshMetadata() - - then: - 3 * producer.isTopicAvailable(cachedTopics.get("g1.topicA")) >> false - 1 * producer.isTopicAvailable(cachedTopics.get("g1.topicB")) >> true - 1 * producer.isTopicAvailable(cachedTopics.get("g2.topicC")) >> true - } - - def "should not throw exception when no topics exist"() { - given: - BrokerMessageProducer producer = Mock() - TopicsCache emptyCache = Mock() { - getTopics() >> [] - } - def hook = new TopicMetadataLoadingRunner(producer, emptyCache, 2, Duration.ofSeconds(10), 4) - - when: - hook.refreshMetadata() - - then: - noExceptionThrown() - } -} diff --git a/hermes-frontend/src/test/groovy/pl/allegro/tech/hermes/frontend/server/CompletableFuturesHelperTest.groovy b/hermes-frontend/src/test/groovy/pl/allegro/tech/hermes/frontend/utils/CompletableFuturesHelperTest.groovy similarity index 97% rename from hermes-frontend/src/test/groovy/pl/allegro/tech/hermes/frontend/server/CompletableFuturesHelperTest.groovy rename to hermes-frontend/src/test/groovy/pl/allegro/tech/hermes/frontend/utils/CompletableFuturesHelperTest.groovy index 720817d3e4..a3bfd0b54f 100644 --- a/hermes-frontend/src/test/groovy/pl/allegro/tech/hermes/frontend/server/CompletableFuturesHelperTest.groovy +++ b/hermes-frontend/src/test/groovy/pl/allegro/tech/hermes/frontend/utils/CompletableFuturesHelperTest.groovy @@ -1,4 +1,4 @@ -package pl.allegro.tech.hermes.frontend.server +package pl.allegro.tech.hermes.frontend.utils import spock.lang.Specification diff --git a/hermes-frontend/src/test/java/pl/allegro/tech/hermes/frontend/buffer/BackupMessagesLoaderTest.java b/hermes-frontend/src/test/java/pl/allegro/tech/hermes/frontend/buffer/BackupMessagesLoaderTest.java index 9353ed1cba..02b7a007ea 100644 --- a/hermes-frontend/src/test/java/pl/allegro/tech/hermes/frontend/buffer/BackupMessagesLoaderTest.java +++ b/hermes-frontend/src/test/java/pl/allegro/tech/hermes/frontend/buffer/BackupMessagesLoaderTest.java @@ -100,6 +100,7 @@ public void shouldNotSendOldMessages() { final BackupMessagesLoader backupMessagesLoader = new BackupMessagesLoader( + producer, producer, listeners, topicsCache, @@ -144,6 +145,7 @@ public void shouldSendAndResendMessages() { ); BackupMessagesLoader backupMessagesLoader = new BackupMessagesLoader( + producer, producer, listeners, topicsCache, @@ -174,6 +176,7 @@ public void shouldSendOnlyWhenBrokerTopicIsAvailable() { BackupMessagesLoader backupMessagesLoader = new BackupMessagesLoader( + producer, producer, listeners, topicsCache, @@ -213,6 +216,7 @@ public void shouldSendMessageWithAllArgumentsFromBackupMessage() { BackupMessagesLoader backupMessagesLoader = new BackupMessagesLoader( + producer, producer, listeners, topicsCache, diff --git a/hermes-frontend/src/test/java/pl/allegro/tech/hermes/frontend/producer/kafka/KafkaBrokerMessageProducerTest.java b/hermes-frontend/src/test/java/pl/allegro/tech/hermes/frontend/producer/kafka/KafkaBrokerMessageProducerTest.java index 845a6afc1f..ce4a76a640 100644 --- a/hermes-frontend/src/test/java/pl/allegro/tech/hermes/frontend/producer/kafka/KafkaBrokerMessageProducerTest.java +++ b/hermes-frontend/src/test/java/pl/allegro/tech/hermes/frontend/producer/kafka/KafkaBrokerMessageProducerTest.java @@ -2,6 +2,7 @@ import com.codahale.metrics.MetricRegistry; import io.micrometer.core.instrument.simple.SimpleMeterRegistry; +import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.producer.MockProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.ByteArraySerializer; @@ -17,9 +18,9 @@ import pl.allegro.tech.hermes.common.kafka.NamespaceKafkaNamesMapper; import pl.allegro.tech.hermes.common.metric.HermesMetrics; import pl.allegro.tech.hermes.common.metric.MetricsFacade; +import pl.allegro.tech.hermes.frontend.cache.topic.TopicsCache; import pl.allegro.tech.hermes.frontend.config.HTTPHeadersProperties; import pl.allegro.tech.hermes.frontend.config.KafkaHeaderNameProperties; -import pl.allegro.tech.hermes.frontend.config.KafkaProducerProperties; import pl.allegro.tech.hermes.frontend.config.SchemaProperties; import pl.allegro.tech.hermes.frontend.metric.CachedTopic; import pl.allegro.tech.hermes.frontend.publishing.PublishingCallback; @@ -27,11 +28,13 @@ import pl.allegro.tech.hermes.frontend.publishing.message.Message; import pl.allegro.tech.hermes.metrics.PathsCompiler; +import java.time.Duration; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; import static com.google.common.base.Charsets.UTF_8; import static com.jayway.awaitility.Awaitility.await; +import static java.util.Collections.emptyList; import static java.util.Collections.emptyMap; import static org.assertj.core.api.Assertions.assertThat; import static pl.allegro.tech.hermes.test.helper.builder.TopicBuilder.topic; @@ -42,33 +45,44 @@ public class KafkaBrokerMessageProducerTest { private static final Long TIMESTAMP = 1L; private static final String PARTITION_KEY = "partition-key"; private static final String MESSAGE_ID = "id"; + private static final String datacenter = "dc"; private static final Topic TOPIC = topic("group.topic").build(); private static final byte[] CONTENT = "{\"data\":\"json\"}".getBytes(UTF_8); private static final Message MESSAGE = new JsonMessage(MESSAGE_ID, CONTENT, TIMESTAMP, PARTITION_KEY, emptyMap()); private final ByteArraySerializer serializer = new ByteArraySerializer(); + @Mock + private HermesMetrics hermesMetrics = new HermesMetrics(new MetricRegistry(), new PathsCompiler("")); + private final MetricsFacade metricsFacade = new MetricsFacade(new SimpleMeterRegistry(), hermesMetrics); + private final MockProducer leaderConfirmsProducer = new MockProducer<>(true, serializer, serializer); private final MockProducer everyoneConfirmProducer = new MockProducer<>(true, serializer, serializer); + private final KafkaMessageSender leaderConfirmsProduceWrapper = new KafkaMessageSender<>(leaderConfirmsProducer, datacenter); + private final KafkaMessageSender everyoneConfirmsProduceWrapper = new KafkaMessageSender<>(everyoneConfirmProducer, datacenter); + private final KafkaHeaderNameProperties kafkaHeaderNameProperties = new KafkaHeaderNameProperties(); private final HTTPHeadersPropagationAsKafkaHeadersProperties httpHeadersPropagationAsKafkaHeadersProperties = new HTTPHeadersProperties.PropagationAsKafkaHeadersProperties(); - private final KafkaProducerProperties kafkaProducerProperties = new KafkaProducerProperties(); - private final Producers producers = - new Producers(leaderConfirmsProducer, everyoneConfirmProducer, kafkaProducerProperties.isReportNodeMetricsEnabled()); + @Mock + private TopicsCache topicsCache; + private final TopicMetadataLoadingExecutor topicMetadataLoadingExecutor = new TopicMetadataLoadingExecutor( + topicsCache, 2, Duration.ofSeconds(10), 2 + ); + @Mock + private AdminClient adminClient; + private final MinInSyncReplicasLoader localMinInSyncReplicasLoader = new MinInSyncReplicasLoader(adminClient, Duration.ofMinutes(1)); + private final KafkaMessageSenders kafkaMessageSenders = new KafkaMessageSenders( + topicMetadataLoadingExecutor, + localMinInSyncReplicasLoader, + new KafkaMessageSenders.Tuple(leaderConfirmsProduceWrapper, everyoneConfirmsProduceWrapper), + emptyList() + ); private KafkaBrokerMessageProducer producer; private final KafkaNamesMapper kafkaNamesMapper = new NamespaceKafkaNamesMapper("ns", "_"); private final KafkaHeaderFactory kafkaHeaderFactory = new KafkaHeaderFactory(kafkaHeaderNameProperties, httpHeadersPropagationAsKafkaHeadersProperties); - @Mock - private HermesMetrics hermesMetrics = new HermesMetrics(new MetricRegistry(), new PathsCompiler("")); - - private final MetricsFacade metricsFacade = new MetricsFacade(new SimpleMeterRegistry(), hermesMetrics); - - @Mock - private KafkaTopicMetadataFetcher kafkaTopicMetadataFetcher; - private CachedTopic cachedTopic; private final SchemaProperties schemaProperties = new SchemaProperties(); @@ -78,7 +92,7 @@ public void before() { cachedTopic = new CachedTopic(TOPIC, metricsFacade, kafkaNamesMapper.toKafkaTopics(TOPIC)); MessageToKafkaProducerRecordConverter messageConverter = new MessageToKafkaProducerRecordConverter(kafkaHeaderFactory, schemaProperties.isIdHeaderEnabled()); - producer = new KafkaBrokerMessageProducer(producers, kafkaTopicMetadataFetcher, metricsFacade, messageConverter); + producer = new KafkaBrokerMessageProducer(kafkaMessageSenders, metricsFacade, messageConverter); } @After