Skip to content

Commit

Permalink
Merge branch 'master' into restclient
Browse files Browse the repository at this point in the history
  • Loading branch information
deejay1 authored Mar 4, 2024
2 parents bfc83ab + f2cef4c commit a445a36
Show file tree
Hide file tree
Showing 32 changed files with 294 additions and 244 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ public int hashCode() {

public enum ReadinessStatus {
READY,
NOT_READY,
UNDEFINED
NOT_READY
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package pl.allegro.tech.hermes.benchmark.environment;

import pl.allegro.tech.hermes.frontend.server.ReadinessChecker;
import pl.allegro.tech.hermes.frontend.readiness.ReadinessChecker;

class DisabledReadinessChecker implements ReadinessChecker {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import pl.allegro.tech.hermes.frontend.publishing.message.MessageContentTypeEnforcer;
import pl.allegro.tech.hermes.frontend.publishing.message.MessageFactory;
import pl.allegro.tech.hermes.frontend.publishing.metadata.DefaultHeadersPropagator;
import pl.allegro.tech.hermes.frontend.readiness.HealthCheckService;
import pl.allegro.tech.hermes.frontend.server.HermesServer;
import pl.allegro.tech.hermes.frontend.validator.MessageValidators;
import pl.allegro.tech.hermes.metrics.PathsCompiler;
Expand Down Expand Up @@ -70,6 +71,7 @@ static HermesServer provideHermesServer() throws IOException {
hermesServerProperties,
metricsFacade,
httpHandler,
new HealthCheckService(),
new DisabledReadinessChecker(false),
new NoOpMessagePreviewPersister(),
throughputLimiter,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

public interface KafkaParameters {

String getDatacenter();

boolean isAuthenticationEnabled();

String getAuthenticationMechanism();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package pl.allegro.tech.hermes.domain.readiness;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import pl.allegro.tech.hermes.api.DatacenterReadiness;

import java.util.List;

public record DatacenterReadinessList(List<DatacenterReadiness> datacenters) {
@JsonCreator
public DatacenterReadinessList(@JsonProperty("datacenters") List<DatacenterReadiness> datacenters) {
this.datacenters = datacenters;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ public class ZookeeperPaths {
public static final String MAX_RATE_PATH = "max-rate";
public static final String MAX_RATE_HISTORY_PATH = "history";
public static final String STORAGE_HEALTH_PATH = "storage-health";
public static final String FRONTEND_PATH = "frontend";
public static final String READINESS_PATH = "readiness";
public static final String DATACENTER_READINESS_PATH = "datacenter-readiness";
public static final String OFFLINE_RETRANSMISSION_PATH = "offline-retransmission";
public static final String OFFLINE_RETRANSMISSION_TASKS_PATH = "tasks";

Expand Down Expand Up @@ -151,8 +150,8 @@ public String nodeHealthPathForManagementHost(String host, String port) {
return Joiner.on(URL_SEPARATOR).join(basePath, STORAGE_HEALTH_PATH, String.format("%s_%s", host, port));
}

public String frontendReadinessPath() {
return Joiner.on(URL_SEPARATOR).join(basePath, FRONTEND_PATH, READINESS_PATH);
public String datacenterReadinessPath() {
return Joiner.on(URL_SEPARATOR).join(basePath, DATACENTER_READINESS_PATH);
}

public String offlineRetransmissionPath() {
Expand Down
2 changes: 1 addition & 1 deletion hermes-console/json-server/db.json
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@
},
{
"datacenter": "DC3",
"status": "UNDEFINED"
"status": "READY"
}
],
"constraints": {
Expand Down
2 changes: 1 addition & 1 deletion hermes-console/src/api/datacenter-readiness.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
export interface DatacenterReadiness {
datacenter: string;
status: 'READY' | 'NOT_READY' | 'UNDEFINED';
status: 'READY' | 'NOT_READY';
}

export interface Readiness {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import pl.allegro.tech.hermes.frontend.cache.topic.TopicsCache;
import pl.allegro.tech.hermes.frontend.listeners.BrokerListeners;
import pl.allegro.tech.hermes.frontend.producer.BrokerMessageProducer;
import pl.allegro.tech.hermes.frontend.services.HealthCheckService;
import pl.allegro.tech.hermes.frontend.validator.MessageValidators;
import pl.allegro.tech.hermes.frontend.validator.TopicMessageValidator;
import pl.allegro.tech.hermes.infrastructure.zookeeper.ZookeeperPaths;
Expand Down Expand Up @@ -75,11 +74,6 @@ public BlacklistZookeeperNotifyingCache blacklistZookeeperNotifyingCache(Curator
return new BlacklistZookeeperNotifyingCache(curator, zookeeperPaths);
}

@Bean(initMethod = "startup")
public HealthCheckService healthCheckService() {
return new HealthCheckService();
}

@Bean
public BrokerListeners defaultBrokerListeners() {
return new BrokerListeners();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
package pl.allegro.tech.hermes.frontend.config;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.micrometer.prometheus.PrometheusMeterRegistry;
import io.undertow.server.HttpHandler;
import org.apache.curator.framework.CuratorFramework;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
Expand All @@ -13,14 +11,14 @@
import pl.allegro.tech.hermes.frontend.producer.BrokerMessageProducer;
import pl.allegro.tech.hermes.frontend.publishing.handlers.ThroughputLimiter;
import pl.allegro.tech.hermes.frontend.publishing.preview.DefaultMessagePreviewPersister;
import pl.allegro.tech.hermes.frontend.server.DefaultReadinessChecker;
import pl.allegro.tech.hermes.frontend.readiness.HealthCheckService;
import pl.allegro.tech.hermes.frontend.readiness.ReadinessChecker;
import pl.allegro.tech.hermes.frontend.server.HermesServer;
import pl.allegro.tech.hermes.frontend.server.SslContextFactoryProvider;
import pl.allegro.tech.hermes.frontend.server.TopicMetadataLoadingJob;
import pl.allegro.tech.hermes.frontend.server.TopicMetadataLoadingRunner;
import pl.allegro.tech.hermes.frontend.server.TopicMetadataLoadingStartupHook;
import pl.allegro.tech.hermes.frontend.server.TopicSchemaLoadingStartupHook;
import pl.allegro.tech.hermes.infrastructure.zookeeper.ZookeeperPaths;
import pl.allegro.tech.hermes.schema.SchemaRepository;

import java.util.Optional;
Expand All @@ -39,7 +37,8 @@ public HermesServer hermesServer(HermesServerProperties hermesServerProperties,
SslProperties sslProperties,
MetricsFacade metricsFacade,
HttpHandler publishingHandler,
DefaultReadinessChecker defaultReadinessChecker,
HealthCheckService healthCheckService,
ReadinessChecker readinessChecker,
DefaultMessagePreviewPersister defaultMessagePreviewPersister,
ThroughputLimiter throughputLimiter,
TopicMetadataLoadingJob topicMetadataLoadingJob,
Expand All @@ -51,7 +50,8 @@ public HermesServer hermesServer(HermesServerProperties hermesServerProperties,
hermesServerProperties,
metricsFacade,
publishingHandler,
defaultReadinessChecker,
healthCheckService,
readinessChecker,
defaultMessagePreviewPersister,
throughputLimiter,
topicMetadataLoadingJob,
Expand All @@ -60,23 +60,6 @@ public HermesServer hermesServer(HermesServerProperties hermesServerProperties,
prometheusMeterRegistry);
}

@Bean
public DefaultReadinessChecker readinessChecker(ReadinessCheckProperties readinessCheckProperties,
TopicMetadataLoadingRunner topicMetadataLoadingRunner,
CuratorFramework zookeeper,
ZookeeperPaths paths,
ObjectMapper mapper) {
return new DefaultReadinessChecker(
topicMetadataLoadingRunner,
zookeeper,
paths,
mapper,
readinessCheckProperties.isEnabled(),
readinessCheckProperties.isKafkaCheckEnabled(),
readinessCheckProperties.getInterval()
);
}

@Bean
public SslContextFactoryProvider sslContextFactoryProvider(Optional<SslContextFactory> sslContextFactory,
SslProperties sslProperties) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package pl.allegro.tech.hermes.frontend.config;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.curator.framework.CuratorFramework;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import pl.allegro.tech.hermes.frontend.readiness.AdminReadinessService;
import pl.allegro.tech.hermes.frontend.readiness.DefaultReadinessChecker;
import pl.allegro.tech.hermes.frontend.readiness.HealthCheckService;
import pl.allegro.tech.hermes.frontend.server.TopicMetadataLoadingRunner;
import pl.allegro.tech.hermes.infrastructure.dc.DatacenterNameProvider;
import pl.allegro.tech.hermes.infrastructure.zookeeper.ZookeeperPaths;

@Configuration
@EnableConfigurationProperties({ReadinessCheckProperties.class})
public class ReadinessConfiguration {

@Bean
public DefaultReadinessChecker readinessChecker(ReadinessCheckProperties readinessCheckProperties,
TopicMetadataLoadingRunner topicMetadataLoadingRunner,
AdminReadinessService adminReadinessService) {
return new DefaultReadinessChecker(
topicMetadataLoadingRunner,
adminReadinessService,
readinessCheckProperties.isEnabled(),
readinessCheckProperties.isKafkaCheckEnabled(),
readinessCheckProperties.getInterval()
);
}

@Bean(initMethod = "start", destroyMethod = "stop")
public AdminReadinessService adminReadinessService(ObjectMapper mapper,
CuratorFramework zookeeper,
ZookeeperPaths paths,
DatacenterNameProvider datacenterNameProvider) {
String localDatacenterName = datacenterNameProvider.getDatacenterName();
return new AdminReadinessService(mapper, zookeeper, paths, localDatacenterName);
}

@Bean(initMethod = "startup")
public HealthCheckService healthCheckService() {
return new HealthCheckService();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package pl.allegro.tech.hermes.frontend.readiness;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.allegro.tech.hermes.api.DatacenterReadiness;
import pl.allegro.tech.hermes.common.exception.InternalProcessingException;
import pl.allegro.tech.hermes.domain.readiness.DatacenterReadinessList;
import pl.allegro.tech.hermes.infrastructure.zookeeper.ZookeeperPaths;

import java.util.Collections;
import java.util.Map;
import java.util.stream.Collectors;

import static pl.allegro.tech.hermes.api.DatacenterReadiness.ReadinessStatus.READY;

public class AdminReadinessService implements NodeCacheListener {

private static final Logger logger = LoggerFactory.getLogger(AdminReadinessService.class);

private final NodeCache cache;
private final ObjectMapper mapper;
private final String localDatacenterName;

private volatile Map<String, Boolean> readinessPerDatacenter;

public AdminReadinessService(ObjectMapper mapper,
CuratorFramework curator,
ZookeeperPaths paths,
String localDatacenterName) {
this.mapper = mapper;
this.localDatacenterName = localDatacenterName;
this.cache = new NodeCache(curator, paths.datacenterReadinessPath());
cache.getListenable().addListener(this);
try {
cache.start(true);
} catch (Exception e) {
throw new InternalProcessingException("Readiness cache cannot start.", e);
}
}

public void start() {
refreshAdminReady();
}

public void stop() {
try {
cache.close();
} catch (Exception e) {
logger.warn("Failed to stop readiness cache", e);
}
}

@Override
public void nodeChanged() {
refreshAdminReady();
}

private void refreshAdminReady() {
try {
ChildData nodeData = cache.getCurrentData();
if (nodeData != null) {
byte[] data = nodeData.getData();
DatacenterReadinessList readiness = mapper.readValue(data, DatacenterReadinessList.class);
readinessPerDatacenter = readiness.datacenters().stream()
.collect(Collectors.toMap(DatacenterReadiness::getDatacenter, e -> e.getStatus() == READY));
} else {
readinessPerDatacenter = Collections.emptyMap();
}
} catch (Exception e) {
logger.error("Failed reloading readiness cache.", e);
}
}

public boolean isLocalDatacenterReady() {
return isDatacenterReady(localDatacenterName);
}

public boolean isDatacenterReady(String datacenter) {
return readinessPerDatacenter.getOrDefault(datacenter, true);
}
}
Loading

0 comments on commit a445a36

Please sign in to comment.