Skip to content

Commit

Permalink
Merge branch 'master' into restclient
Browse files Browse the repository at this point in the history
  • Loading branch information
szczygiel-m committed Apr 4, 2024
2 parents a9de1eb + d861cf9 commit 3617163
Show file tree
Hide file tree
Showing 38 changed files with 805 additions and 726 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ allprojects {
guava : '23.0',
jackson : '2.15.2',
jersey : '3.1.2',
jetty : '12.0.5',
jetty : '12.0.7',
curator : '5.4.0',
dropwizard_metrics: '4.1.0',
micrometer_metrics: '1.11.1',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,6 @@ static HermesServer provideHermesServer() throws IOException {
new NoOpMessagePreviewPersister(),
throughputLimiter,
null,
false,
null,
null);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,12 @@ public void send(Message message, CachedTopic topic, PublishingCallback callback
}

@Override
public boolean isTopicAvailable(CachedTopic topic) {
public boolean areAllTopicsAvailable() {
return true;
}

@Override
public boolean isTopicAvailable(CachedTopic cachedTopic) {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,8 @@ public boolean isLeader() {

@Override
public void onRetransmissionStarts(SubscriptionName subscription) throws Exception {
logger.info("Triggering retransmission for subscription {}", subscription);
if (assignmentCache.isAssignedTo(subscription)) {
logger.info("Triggering retransmission for subscription {}", subscription);
supervisor.retransmit(subscription);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import pl.allegro.tech.hermes.frontend.listeners.BrokerListeners;
import pl.allegro.tech.hermes.frontend.metric.CachedTopic;
import pl.allegro.tech.hermes.frontend.producer.BrokerMessageProducer;
import pl.allegro.tech.hermes.frontend.producer.BrokerTopicAvailabilityChecker;
import pl.allegro.tech.hermes.frontend.publishing.PublishingCallback;
import pl.allegro.tech.hermes.frontend.publishing.avro.AvroMessage;
import pl.allegro.tech.hermes.frontend.publishing.message.JsonMessage;
Expand Down Expand Up @@ -44,6 +45,7 @@ public class BackupMessagesLoader {

private static final Logger logger = LoggerFactory.getLogger(BackupMessagesLoader.class);

private final BrokerTopicAvailabilityChecker brokerTopicAvailabilityChecker;
private final BrokerMessageProducer brokerMessageProducer;
private final BrokerListeners brokerListeners;
private final TopicsCache topicsCache;
Expand All @@ -59,7 +61,8 @@ public class BackupMessagesLoader {
private final Set<Topic> topicsAvailabilityCache = new HashSet<>();
private final AtomicReference<ConcurrentLinkedQueue<Pair<Message, CachedTopic>>> toResend = new AtomicReference<>();

public BackupMessagesLoader(BrokerMessageProducer brokerMessageProducer,
public BackupMessagesLoader(BrokerTopicAvailabilityChecker brokerTopicAvailabilityChecker,
BrokerMessageProducer brokerMessageProducer,
BrokerListeners brokerListeners,
TopicsCache topicsCache,
SchemaRepository schemaRepository,
Expand All @@ -68,6 +71,7 @@ public BackupMessagesLoader(BrokerMessageProducer brokerMessageProducer,
BackupMessagesLoaderParameters backupMessagesLoaderParameters,
String datacenter
) {
this.brokerTopicAvailabilityChecker = brokerTopicAvailabilityChecker;
this.brokerMessageProducer = brokerMessageProducer;
this.brokerListeners = brokerListeners;
this.topicsCache = topicsCache;
Expand Down Expand Up @@ -235,7 +239,7 @@ private boolean isBrokerTopicAvailable(CachedTopic cachedTopic) {
return true;
}

if (brokerMessageProducer.isTopicAvailable(cachedTopic)) {
if (brokerTopicAvailabilityChecker.isTopicAvailable(cachedTopic)) {
topicsAvailabilityCache.add(cachedTopic.getTopic());
logger.info("Broker topic {} is available.", cachedTopic.getTopic().getQualifiedName());
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,18 @@ public BackupMessagesLoader backupMessagesLoader(BrokerMessageProducer brokerMes
SchemaRepository schemaRepository,
Trackers trackers,
LocalMessageStorageProperties localMessageStorageProperties,
DatacenterNameProvider datacenterNameProvider
) {
return new BackupMessagesLoader(brokerMessageProducer, brokerListeners, topicsCache, schemaRepository,
new SchemaExistenceEnsurer(schemaRepository), trackers, localMessageStorageProperties, datacenterNameProvider.getDatacenterName());
DatacenterNameProvider datacenterNameProvider) {
return new BackupMessagesLoader(
brokerMessageProducer,
brokerMessageProducer,
brokerListeners,
topicsCache,
schemaRepository,
new SchemaExistenceEnsurer(schemaRepository),
trackers,
localMessageStorageProperties,
datacenterNameProvider.getDatacenterName()
);
}

@Bean(initMethod = "extend")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,32 @@
package pl.allegro.tech.hermes.frontend.config;

import org.apache.kafka.clients.admin.AdminClient;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import pl.allegro.tech.hermes.common.kafka.KafkaParameters;
import pl.allegro.tech.hermes.common.metric.MetricsFacade;
import pl.allegro.tech.hermes.frontend.cache.topic.TopicsCache;
import pl.allegro.tech.hermes.frontend.producer.BrokerLatencyReporter;
import pl.allegro.tech.hermes.frontend.producer.BrokerMessageProducer;
import pl.allegro.tech.hermes.frontend.producer.kafka.KafkaBrokerMessageProducer;
import pl.allegro.tech.hermes.frontend.producer.kafka.KafkaHeaderFactory;
import pl.allegro.tech.hermes.frontend.producer.kafka.KafkaMessageProducerFactory;
import pl.allegro.tech.hermes.frontend.producer.kafka.KafkaTopicMetadataFetcher;
import pl.allegro.tech.hermes.frontend.producer.kafka.KafkaTopicMetadataFetcherFactory;
import pl.allegro.tech.hermes.frontend.producer.kafka.KafkaMessageSenders;
import pl.allegro.tech.hermes.frontend.producer.kafka.KafkaMessageSendersFactory;
import pl.allegro.tech.hermes.frontend.producer.kafka.MessageToKafkaProducerRecordConverter;
import pl.allegro.tech.hermes.frontend.producer.kafka.Producers;
import pl.allegro.tech.hermes.frontend.producer.kafka.ProducerMetadataLoadingJob;
import pl.allegro.tech.hermes.infrastructure.dc.DatacenterNameProvider;

import java.util.List;
import java.util.Properties;

import static org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
import static org.apache.kafka.clients.CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL;
import static org.apache.kafka.clients.CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG;
import static org.apache.kafka.clients.CommonClientConfigs.SECURITY_PROTOCOL_CONFIG;
import static org.apache.kafka.common.config.SaslConfigs.SASL_JAAS_CONFIG;
import static org.apache.kafka.common.config.SaslConfigs.SASL_MECHANISM;

@Configuration
@EnableConfigurationProperties({
LocalMessageStorageProperties.class,
Expand All @@ -26,11 +39,10 @@
public class FrontendProducerConfiguration {

@Bean
public BrokerMessageProducer kafkaBrokerMessageProducer(Producers producers,
KafkaTopicMetadataFetcher kafkaTopicMetadataFetcher,
public BrokerMessageProducer kafkaBrokerMessageProducer(KafkaMessageSenders kafkaMessageSenders,
MetricsFacade metricsFacade,
MessageToKafkaProducerRecordConverter messageConverter) {
return new KafkaBrokerMessageProducer(producers, kafkaTopicMetadataFetcher, metricsFacade, messageConverter);
return new KafkaBrokerMessageProducer(kafkaMessageSenders, metricsFacade, messageConverter);
}

@Bean
Expand All @@ -40,22 +52,54 @@ public KafkaHeaderFactory kafkaHeaderFactory(KafkaHeaderNameProperties kafkaHead
}

@Bean(destroyMethod = "close")
public Producers kafkaMessageProducer(KafkaClustersProperties kafkaClustersProperties,
KafkaProducerProperties kafkaProducerProperties,
LocalMessageStorageProperties localMessageStorageProperties,
DatacenterNameProvider datacenterNameProvider) {
KafkaProperties kafkaProperties = kafkaClustersProperties.toKafkaProperties(datacenterNameProvider);
return new KafkaMessageProducerFactory(kafkaProperties, kafkaProducerProperties,
localMessageStorageProperties.getBufferedSizeBytes()).provide();
public KafkaMessageSenders kafkaMessageSenders(KafkaProducerProperties kafkaProducerProperties,
KafkaMessageSendersFactory kafkaMessageSendersFactory) {
return kafkaMessageSendersFactory.provide(kafkaProducerProperties);
}

@Bean(destroyMethod = "close")
public KafkaTopicMetadataFetcher kafkaTopicMetadataFetcher(KafkaProducerProperties kafkaProducerProperties,
KafkaClustersProperties kafkaClustersProperties,
DatacenterNameProvider datacenterNameProvider) {
public KafkaMessageSendersFactory kafkaMessageSendersFactory(KafkaClustersProperties kafkaClustersProperties,
KafkaProducerProperties kafkaProducerProperties,
TopicLoadingProperties topicLoadingProperties,
TopicsCache topicsCache,
LocalMessageStorageProperties localMessageStorageProperties,
DatacenterNameProvider datacenterNameProvider) {
KafkaProperties kafkaProperties = kafkaClustersProperties.toKafkaProperties(datacenterNameProvider);
return new KafkaTopicMetadataFetcherFactory(kafkaProperties, kafkaProducerProperties.getMetadataMaxAge(),
(int) kafkaProperties.getAdminRequestTimeout().toMillis()).provide();
List<KafkaParameters> remoteKafkaProperties = kafkaClustersProperties.toRemoteKafkaProperties(datacenterNameProvider);
return new KafkaMessageSendersFactory(
kafkaProperties,
remoteKafkaProperties,
createAdminClient(kafkaProperties),
topicsCache,
topicLoadingProperties.getMetadata().getRetryCount(),
topicLoadingProperties.getMetadata().getRetryInterval(),
topicLoadingProperties.getMetadata().getThreadPoolSize(),
localMessageStorageProperties.getBufferedSizeBytes(),
kafkaProducerProperties.getMetadataMaxAge()
);
}

private static AdminClient createAdminClient(KafkaProperties kafkaProperties) {
Properties props = new Properties();
props.put(BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBrokerList());
props.put(SECURITY_PROTOCOL_CONFIG, DEFAULT_SECURITY_PROTOCOL);
props.put(REQUEST_TIMEOUT_MS_CONFIG, (int) kafkaProperties.getAdminRequestTimeout().toMillis());
if (kafkaProperties.isAuthenticationEnabled()) {
props.put(SASL_MECHANISM, kafkaProperties.getAuthenticationMechanism());
props.put(SECURITY_PROTOCOL_CONFIG, kafkaProperties.getAuthenticationProtocol());
props.put(SASL_JAAS_CONFIG, kafkaProperties.getJaasConfig());
}
return AdminClient.create(props);
}

@Bean(initMethod = "start", destroyMethod = "stop")
public ProducerMetadataLoadingJob producerMetadataLoadingJob(KafkaMessageSenders kafkaMessageSenders,
TopicLoadingProperties topicLoadingProperties) {
return new ProducerMetadataLoadingJob(
kafkaMessageSenders,
topicLoadingProperties.getMetadataRefreshJob().isEnabled(),
topicLoadingProperties.getMetadataRefreshJob().getInterval()
);
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,12 @@
import pl.allegro.tech.hermes.common.metric.MetricsFacade;
import pl.allegro.tech.hermes.common.ssl.SslContextFactory;
import pl.allegro.tech.hermes.frontend.cache.topic.TopicsCache;
import pl.allegro.tech.hermes.frontend.producer.BrokerMessageProducer;
import pl.allegro.tech.hermes.frontend.publishing.handlers.ThroughputLimiter;
import pl.allegro.tech.hermes.frontend.publishing.preview.DefaultMessagePreviewPersister;
import pl.allegro.tech.hermes.frontend.readiness.HealthCheckService;
import pl.allegro.tech.hermes.frontend.readiness.ReadinessChecker;
import pl.allegro.tech.hermes.frontend.server.HermesServer;
import pl.allegro.tech.hermes.frontend.server.SslContextFactoryProvider;
import pl.allegro.tech.hermes.frontend.server.TopicMetadataLoadingJob;
import pl.allegro.tech.hermes.frontend.server.TopicMetadataLoadingRunner;
import pl.allegro.tech.hermes.frontend.server.TopicMetadataLoadingStartupHook;
import pl.allegro.tech.hermes.frontend.server.TopicSchemaLoadingStartupHook;
import pl.allegro.tech.hermes.schema.SchemaRepository;

Expand All @@ -41,9 +37,7 @@ public HermesServer hermesServer(HermesServerProperties hermesServerProperties,
ReadinessChecker readinessChecker,
DefaultMessagePreviewPersister defaultMessagePreviewPersister,
ThroughputLimiter throughputLimiter,
TopicMetadataLoadingJob topicMetadataLoadingJob,
SslContextFactoryProvider sslContextFactoryProvider,
TopicLoadingProperties topicLoadingProperties,
PrometheusMeterRegistry prometheusMeterRegistry) {
return new HermesServer(
sslProperties,
Expand All @@ -54,10 +48,9 @@ public HermesServer hermesServer(HermesServerProperties hermesServerProperties,
readinessChecker,
defaultMessagePreviewPersister,
throughputLimiter,
topicMetadataLoadingJob,
topicLoadingProperties.getMetadataRefreshJob().isEnabled(),
sslContextFactoryProvider,
prometheusMeterRegistry);
prometheusMeterRegistry
);
}

@Bean
Expand All @@ -66,28 +59,6 @@ public SslContextFactoryProvider sslContextFactoryProvider(Optional<SslContextFa
return new SslContextFactoryProvider(sslContextFactory.orElse(null), sslProperties);
}

@Bean
public TopicMetadataLoadingJob topicMetadataLoadingJob(TopicMetadataLoadingRunner topicMetadataLoadingRunner,
TopicLoadingProperties topicLoadingProperties) {
return new TopicMetadataLoadingJob(topicMetadataLoadingRunner, topicLoadingProperties.getMetadataRefreshJob().getInterval());
}

@Bean
public TopicMetadataLoadingRunner topicMetadataLoadingRunner(BrokerMessageProducer brokerMessageProducer,
TopicsCache topicsCache,
TopicLoadingProperties topicLoadingProperties) {
return new TopicMetadataLoadingRunner(brokerMessageProducer, topicsCache,
topicLoadingProperties.getMetadata().getRetryCount(),
topicLoadingProperties.getMetadata().getRetryInterval(),
topicLoadingProperties.getMetadata().getThreadPoolSize());
}

@Bean(initMethod = "run")
public TopicMetadataLoadingStartupHook topicMetadataLoadingStartupHook(TopicMetadataLoadingRunner topicMetadataLoadingRunner,
TopicLoadingProperties topicLoadingProperties) {
return new TopicMetadataLoadingStartupHook(topicMetadataLoadingRunner, topicLoadingProperties.getMetadata().isEnabled());
}

@Bean(initMethod = "run")
public TopicSchemaLoadingStartupHook topicSchemaLoadingStartupHook(TopicsCache topicsCache,
SchemaRepository schemaRepository,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package pl.allegro.tech.hermes.frontend.config;

import org.springframework.boot.context.properties.ConfigurationProperties;
import pl.allegro.tech.hermes.common.kafka.KafkaParameters;
import pl.allegro.tech.hermes.infrastructure.dc.DatacenterNameProvider;

import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;

@ConfigurationProperties(prefix = "frontend.kafka")
public class KafkaClustersProperties {
Expand Down Expand Up @@ -47,5 +49,12 @@ public KafkaProperties toKafkaProperties(DatacenterNameProvider datacenterNamePr
.orElseThrow(() -> new IllegalArgumentException(
"No properties for datacenter: " + datacenterNameProvider.getDatacenterName() + " defined."));
}

public List<KafkaParameters> toRemoteKafkaProperties(DatacenterNameProvider datacenterNameProvider) {
return this.clusters
.stream()
.filter(cluster -> !cluster.getDatacenter().equals(datacenterNameProvider.getDatacenterName()))
.collect(Collectors.toList());
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import pl.allegro.tech.hermes.frontend.producer.BrokerTopicAvailabilityChecker;
import pl.allegro.tech.hermes.frontend.readiness.AdminReadinessService;
import pl.allegro.tech.hermes.frontend.readiness.DefaultReadinessChecker;
import pl.allegro.tech.hermes.frontend.readiness.HealthCheckService;
import pl.allegro.tech.hermes.frontend.server.TopicMetadataLoadingRunner;
import pl.allegro.tech.hermes.infrastructure.dc.DatacenterNameProvider;
import pl.allegro.tech.hermes.infrastructure.zookeeper.ZookeeperPaths;

Expand All @@ -18,10 +18,10 @@ public class ReadinessConfiguration {

@Bean
public DefaultReadinessChecker readinessChecker(ReadinessCheckProperties readinessCheckProperties,
TopicMetadataLoadingRunner topicMetadataLoadingRunner,
BrokerTopicAvailabilityChecker brokerTopicAvailabilityChecker,
AdminReadinessService adminReadinessService) {
return new DefaultReadinessChecker(
topicMetadataLoadingRunner,
brokerTopicAvailabilityChecker,
adminReadinessService,
readinessCheckProperties.isEnabled(),
readinessCheckProperties.isKafkaCheckEnabled(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,12 @@ public class TopicLoadingProperties {

public static class MetadataLoadingProperties {

private boolean enabled = false;

private Duration retryInterval = Duration.ofSeconds(1);

private int retryCount = 5;

private int threadPoolSize = 16;

public boolean isEnabled() {
return enabled;
}

public void setEnabled(boolean enabled) {
this.enabled = enabled;
}

public Duration getRetryInterval() {
return retryInterval;
}
Expand Down
Loading

0 comments on commit 3617163

Please sign in to comment.