From f2cef4cff452ae3a1e6987328ae165f09ff41744 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20R=C5=BCysko?= Date: Mon, 4 Mar 2024 14:17:24 +0100 Subject: [PATCH] Propagate readiness status to all datacenters (#1826) This is part of the feature of falling back to remote datacenters when the local one is unhealthy during publishing. Considering that hermes-frontend has access only to ZooKeeper in its local datacenter, it needs to have information there on the readiness status of all datacenters to select only those that are ready. Before this change, only the readiness status of the local datacenter was stored in the local ZooKeeper. --- .../tech/hermes/api/DatacenterReadiness.java | 3 +- .../environment/DisabledReadinessChecker.java | 2 +- .../environment/HermesServerFactory.java | 2 + .../hermes/common/kafka/KafkaParameters.java | 2 + .../readiness/DatacenterReadinessList.java | 14 +++ .../zookeeper/ZookeeperPaths.java | 7 +- hermes-console/json-server/db.json | 2 +- .../src/api/datacenter-readiness.ts | 2 +- .../config/FrontendConfiguration.java | 6 -- .../config/FrontendServerConfiguration.java | 29 ++----- .../config/ReadinessConfiguration.java | 45 ++++++++++ .../readiness/AdminReadinessService.java | 86 +++++++++++++++++++ .../DefaultReadinessChecker.java | 59 ++----------- .../HealthCheckService.java | 5 +- .../ReadinessChecker.java | 2 +- .../frontend/server/HealthCheckHandler.java | 2 +- .../hermes/frontend/server/HermesServer.java | 6 +- .../server/MetadataLoadingResult.java | 4 +- .../server/ReadinessCheckHandler.java | 3 +- .../management/api/ReadinessEndpoint.java | 2 +- .../config/ReadinessConfiguration.java | 15 +++- .../config/storage/StorageConfiguration.java | 15 ++-- ...ultiDatacenterRepositoryQueryExecutor.java | 32 ------- .../management/domain/dc/QueryCommand.java | 7 -- .../DatacenterReadinessRepository.java | 12 +++ .../domain/readiness/GetReadinessQuery.java | 28 ------ .../domain/readiness/ReadinessRepository.java | 8 -- .../domain/readiness/ReadinessService.java | 46 ++++++++-- .../domain/readiness/SetReadinessCommand.java | 34 +++----- ...ookeeperDatacenterReadinessRepository.java | 30 ++++--- .../zookeeper/ZookeeperRepositoryManager.java | 8 +- .../integrationtests/ReadinessCheckTest.java | 20 ++--- 32 files changed, 294 insertions(+), 244 deletions(-) create mode 100644 hermes-common/src/main/java/pl/allegro/tech/hermes/domain/readiness/DatacenterReadinessList.java create mode 100644 hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/config/ReadinessConfiguration.java create mode 100644 hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/readiness/AdminReadinessService.java rename hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/{server => readiness}/DefaultReadinessChecker.java (59%) rename hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/{services => readiness}/HealthCheckService.java (74%) rename hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/{server => readiness}/ReadinessChecker.java (71%) delete mode 100644 hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/dc/MultiDatacenterRepositoryQueryExecutor.java delete mode 100644 hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/dc/QueryCommand.java create mode 100644 hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/readiness/DatacenterReadinessRepository.java delete mode 100644 hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/readiness/GetReadinessQuery.java delete mode 100644 hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/readiness/ReadinessRepository.java diff --git a/hermes-api/src/main/java/pl/allegro/tech/hermes/api/DatacenterReadiness.java b/hermes-api/src/main/java/pl/allegro/tech/hermes/api/DatacenterReadiness.java index 024c9d32b1..88f12af28a 100644 --- a/hermes-api/src/main/java/pl/allegro/tech/hermes/api/DatacenterReadiness.java +++ b/hermes-api/src/main/java/pl/allegro/tech/hermes/api/DatacenterReadiness.java @@ -52,7 +52,6 @@ public int hashCode() { public enum ReadinessStatus { READY, - NOT_READY, - UNDEFINED + NOT_READY } } diff --git a/hermes-benchmark/src/jmh/java/pl/allegro/tech/hermes/benchmark/environment/DisabledReadinessChecker.java b/hermes-benchmark/src/jmh/java/pl/allegro/tech/hermes/benchmark/environment/DisabledReadinessChecker.java index 4216c1cbe0..88a8bf689e 100644 --- a/hermes-benchmark/src/jmh/java/pl/allegro/tech/hermes/benchmark/environment/DisabledReadinessChecker.java +++ b/hermes-benchmark/src/jmh/java/pl/allegro/tech/hermes/benchmark/environment/DisabledReadinessChecker.java @@ -1,6 +1,6 @@ package pl.allegro.tech.hermes.benchmark.environment; -import pl.allegro.tech.hermes.frontend.server.ReadinessChecker; +import pl.allegro.tech.hermes.frontend.readiness.ReadinessChecker; class DisabledReadinessChecker implements ReadinessChecker { diff --git a/hermes-benchmark/src/jmh/java/pl/allegro/tech/hermes/benchmark/environment/HermesServerFactory.java b/hermes-benchmark/src/jmh/java/pl/allegro/tech/hermes/benchmark/environment/HermesServerFactory.java index c0961fa63c..aa3302f058 100644 --- a/hermes-benchmark/src/jmh/java/pl/allegro/tech/hermes/benchmark/environment/HermesServerFactory.java +++ b/hermes-benchmark/src/jmh/java/pl/allegro/tech/hermes/benchmark/environment/HermesServerFactory.java @@ -26,6 +26,7 @@ import pl.allegro.tech.hermes.frontend.publishing.message.MessageContentTypeEnforcer; import pl.allegro.tech.hermes.frontend.publishing.message.MessageFactory; import pl.allegro.tech.hermes.frontend.publishing.metadata.DefaultHeadersPropagator; +import pl.allegro.tech.hermes.frontend.readiness.HealthCheckService; import pl.allegro.tech.hermes.frontend.server.HermesServer; import pl.allegro.tech.hermes.frontend.validator.MessageValidators; import pl.allegro.tech.hermes.metrics.PathsCompiler; @@ -70,6 +71,7 @@ static HermesServer provideHermesServer() throws IOException { hermesServerProperties, metricsFacade, httpHandler, + new HealthCheckService(), new DisabledReadinessChecker(false), new NoOpMessagePreviewPersister(), throughputLimiter, diff --git a/hermes-common/src/main/java/pl/allegro/tech/hermes/common/kafka/KafkaParameters.java b/hermes-common/src/main/java/pl/allegro/tech/hermes/common/kafka/KafkaParameters.java index c8049398b4..a9c0f92e47 100644 --- a/hermes-common/src/main/java/pl/allegro/tech/hermes/common/kafka/KafkaParameters.java +++ b/hermes-common/src/main/java/pl/allegro/tech/hermes/common/kafka/KafkaParameters.java @@ -2,6 +2,8 @@ public interface KafkaParameters { + String getDatacenter(); + boolean isAuthenticationEnabled(); String getAuthenticationMechanism(); diff --git a/hermes-common/src/main/java/pl/allegro/tech/hermes/domain/readiness/DatacenterReadinessList.java b/hermes-common/src/main/java/pl/allegro/tech/hermes/domain/readiness/DatacenterReadinessList.java new file mode 100644 index 0000000000..29758d4f4d --- /dev/null +++ b/hermes-common/src/main/java/pl/allegro/tech/hermes/domain/readiness/DatacenterReadinessList.java @@ -0,0 +1,14 @@ +package pl.allegro.tech.hermes.domain.readiness; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import pl.allegro.tech.hermes.api.DatacenterReadiness; + +import java.util.List; + +public record DatacenterReadinessList(List datacenters) { + @JsonCreator + public DatacenterReadinessList(@JsonProperty("datacenters") List datacenters) { + this.datacenters = datacenters; + } +} diff --git a/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperPaths.java b/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperPaths.java index a5090a3061..b39596faca 100644 --- a/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperPaths.java +++ b/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperPaths.java @@ -27,8 +27,7 @@ public class ZookeeperPaths { public static final String MAX_RATE_PATH = "max-rate"; public static final String MAX_RATE_HISTORY_PATH = "history"; public static final String STORAGE_HEALTH_PATH = "storage-health"; - public static final String FRONTEND_PATH = "frontend"; - public static final String READINESS_PATH = "readiness"; + public static final String DATACENTER_READINESS_PATH = "datacenter-readiness"; public static final String OFFLINE_RETRANSMISSION_PATH = "offline-retransmission"; public static final String OFFLINE_RETRANSMISSION_TASKS_PATH = "tasks"; @@ -151,8 +150,8 @@ public String nodeHealthPathForManagementHost(String host, String port) { return Joiner.on(URL_SEPARATOR).join(basePath, STORAGE_HEALTH_PATH, String.format("%s_%s", host, port)); } - public String frontendReadinessPath() { - return Joiner.on(URL_SEPARATOR).join(basePath, FRONTEND_PATH, READINESS_PATH); + public String datacenterReadinessPath() { + return Joiner.on(URL_SEPARATOR).join(basePath, DATACENTER_READINESS_PATH); } public String offlineRetransmissionPath() { diff --git a/hermes-console/json-server/db.json b/hermes-console/json-server/db.json index f5174a1005..313fd39fcd 100644 --- a/hermes-console/json-server/db.json +++ b/hermes-console/json-server/db.json @@ -128,7 +128,7 @@ }, { "datacenter": "DC3", - "status": "UNDEFINED" + "status": "READY" } ], "constraints": { diff --git a/hermes-console/src/api/datacenter-readiness.ts b/hermes-console/src/api/datacenter-readiness.ts index e6acf35caa..cedad9c849 100644 --- a/hermes-console/src/api/datacenter-readiness.ts +++ b/hermes-console/src/api/datacenter-readiness.ts @@ -1,6 +1,6 @@ export interface DatacenterReadiness { datacenter: string; - status: 'READY' | 'NOT_READY' | 'UNDEFINED'; + status: 'READY' | 'NOT_READY'; } export interface Readiness { diff --git a/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/config/FrontendConfiguration.java b/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/config/FrontendConfiguration.java index 79f55c326d..42b5487e05 100644 --- a/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/config/FrontendConfiguration.java +++ b/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/config/FrontendConfiguration.java @@ -16,7 +16,6 @@ import pl.allegro.tech.hermes.frontend.cache.topic.TopicsCache; import pl.allegro.tech.hermes.frontend.listeners.BrokerListeners; import pl.allegro.tech.hermes.frontend.producer.BrokerMessageProducer; -import pl.allegro.tech.hermes.frontend.services.HealthCheckService; import pl.allegro.tech.hermes.frontend.validator.MessageValidators; import pl.allegro.tech.hermes.frontend.validator.TopicMessageValidator; import pl.allegro.tech.hermes.infrastructure.zookeeper.ZookeeperPaths; @@ -75,11 +74,6 @@ public BlacklistZookeeperNotifyingCache blacklistZookeeperNotifyingCache(Curator return new BlacklistZookeeperNotifyingCache(curator, zookeeperPaths); } - @Bean(initMethod = "startup") - public HealthCheckService healthCheckService() { - return new HealthCheckService(); - } - @Bean public BrokerListeners defaultBrokerListeners() { return new BrokerListeners(); diff --git a/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/config/FrontendServerConfiguration.java b/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/config/FrontendServerConfiguration.java index d83f8fd0a8..2646f50cd3 100644 --- a/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/config/FrontendServerConfiguration.java +++ b/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/config/FrontendServerConfiguration.java @@ -1,9 +1,7 @@ package pl.allegro.tech.hermes.frontend.config; -import com.fasterxml.jackson.databind.ObjectMapper; import io.micrometer.prometheus.PrometheusMeterRegistry; import io.undertow.server.HttpHandler; -import org.apache.curator.framework.CuratorFramework; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -13,14 +11,14 @@ import pl.allegro.tech.hermes.frontend.producer.BrokerMessageProducer; import pl.allegro.tech.hermes.frontend.publishing.handlers.ThroughputLimiter; import pl.allegro.tech.hermes.frontend.publishing.preview.DefaultMessagePreviewPersister; -import pl.allegro.tech.hermes.frontend.server.DefaultReadinessChecker; +import pl.allegro.tech.hermes.frontend.readiness.HealthCheckService; +import pl.allegro.tech.hermes.frontend.readiness.ReadinessChecker; import pl.allegro.tech.hermes.frontend.server.HermesServer; import pl.allegro.tech.hermes.frontend.server.SslContextFactoryProvider; import pl.allegro.tech.hermes.frontend.server.TopicMetadataLoadingJob; import pl.allegro.tech.hermes.frontend.server.TopicMetadataLoadingRunner; import pl.allegro.tech.hermes.frontend.server.TopicMetadataLoadingStartupHook; import pl.allegro.tech.hermes.frontend.server.TopicSchemaLoadingStartupHook; -import pl.allegro.tech.hermes.infrastructure.zookeeper.ZookeeperPaths; import pl.allegro.tech.hermes.schema.SchemaRepository; import java.util.Optional; @@ -39,7 +37,8 @@ public HermesServer hermesServer(HermesServerProperties hermesServerProperties, SslProperties sslProperties, MetricsFacade metricsFacade, HttpHandler publishingHandler, - DefaultReadinessChecker defaultReadinessChecker, + HealthCheckService healthCheckService, + ReadinessChecker readinessChecker, DefaultMessagePreviewPersister defaultMessagePreviewPersister, ThroughputLimiter throughputLimiter, TopicMetadataLoadingJob topicMetadataLoadingJob, @@ -51,7 +50,8 @@ public HermesServer hermesServer(HermesServerProperties hermesServerProperties, hermesServerProperties, metricsFacade, publishingHandler, - defaultReadinessChecker, + healthCheckService, + readinessChecker, defaultMessagePreviewPersister, throughputLimiter, topicMetadataLoadingJob, @@ -60,23 +60,6 @@ public HermesServer hermesServer(HermesServerProperties hermesServerProperties, prometheusMeterRegistry); } - @Bean - public DefaultReadinessChecker readinessChecker(ReadinessCheckProperties readinessCheckProperties, - TopicMetadataLoadingRunner topicMetadataLoadingRunner, - CuratorFramework zookeeper, - ZookeeperPaths paths, - ObjectMapper mapper) { - return new DefaultReadinessChecker( - topicMetadataLoadingRunner, - zookeeper, - paths, - mapper, - readinessCheckProperties.isEnabled(), - readinessCheckProperties.isKafkaCheckEnabled(), - readinessCheckProperties.getInterval() - ); - } - @Bean public SslContextFactoryProvider sslContextFactoryProvider(Optional sslContextFactory, SslProperties sslProperties) { diff --git a/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/config/ReadinessConfiguration.java b/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/config/ReadinessConfiguration.java new file mode 100644 index 0000000000..d3bd78afba --- /dev/null +++ b/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/config/ReadinessConfiguration.java @@ -0,0 +1,45 @@ +package pl.allegro.tech.hermes.frontend.config; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.curator.framework.CuratorFramework; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import pl.allegro.tech.hermes.frontend.readiness.AdminReadinessService; +import pl.allegro.tech.hermes.frontend.readiness.DefaultReadinessChecker; +import pl.allegro.tech.hermes.frontend.readiness.HealthCheckService; +import pl.allegro.tech.hermes.frontend.server.TopicMetadataLoadingRunner; +import pl.allegro.tech.hermes.infrastructure.dc.DatacenterNameProvider; +import pl.allegro.tech.hermes.infrastructure.zookeeper.ZookeeperPaths; + +@Configuration +@EnableConfigurationProperties({ReadinessCheckProperties.class}) +public class ReadinessConfiguration { + + @Bean + public DefaultReadinessChecker readinessChecker(ReadinessCheckProperties readinessCheckProperties, + TopicMetadataLoadingRunner topicMetadataLoadingRunner, + AdminReadinessService adminReadinessService) { + return new DefaultReadinessChecker( + topicMetadataLoadingRunner, + adminReadinessService, + readinessCheckProperties.isEnabled(), + readinessCheckProperties.isKafkaCheckEnabled(), + readinessCheckProperties.getInterval() + ); + } + + @Bean(initMethod = "start", destroyMethod = "stop") + public AdminReadinessService adminReadinessService(ObjectMapper mapper, + CuratorFramework zookeeper, + ZookeeperPaths paths, + DatacenterNameProvider datacenterNameProvider) { + String localDatacenterName = datacenterNameProvider.getDatacenterName(); + return new AdminReadinessService(mapper, zookeeper, paths, localDatacenterName); + } + + @Bean(initMethod = "startup") + public HealthCheckService healthCheckService() { + return new HealthCheckService(); + } +} diff --git a/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/readiness/AdminReadinessService.java b/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/readiness/AdminReadinessService.java new file mode 100644 index 0000000000..942a8cce4a --- /dev/null +++ b/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/readiness/AdminReadinessService.java @@ -0,0 +1,86 @@ +package pl.allegro.tech.hermes.frontend.readiness; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.recipes.cache.ChildData; +import org.apache.curator.framework.recipes.cache.NodeCache; +import org.apache.curator.framework.recipes.cache.NodeCacheListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import pl.allegro.tech.hermes.api.DatacenterReadiness; +import pl.allegro.tech.hermes.common.exception.InternalProcessingException; +import pl.allegro.tech.hermes.domain.readiness.DatacenterReadinessList; +import pl.allegro.tech.hermes.infrastructure.zookeeper.ZookeeperPaths; + +import java.util.Collections; +import java.util.Map; +import java.util.stream.Collectors; + +import static pl.allegro.tech.hermes.api.DatacenterReadiness.ReadinessStatus.READY; + +public class AdminReadinessService implements NodeCacheListener { + + private static final Logger logger = LoggerFactory.getLogger(AdminReadinessService.class); + + private final NodeCache cache; + private final ObjectMapper mapper; + private final String localDatacenterName; + + private volatile Map readinessPerDatacenter; + + public AdminReadinessService(ObjectMapper mapper, + CuratorFramework curator, + ZookeeperPaths paths, + String localDatacenterName) { + this.mapper = mapper; + this.localDatacenterName = localDatacenterName; + this.cache = new NodeCache(curator, paths.datacenterReadinessPath()); + cache.getListenable().addListener(this); + try { + cache.start(true); + } catch (Exception e) { + throw new InternalProcessingException("Readiness cache cannot start.", e); + } + } + + public void start() { + refreshAdminReady(); + } + + public void stop() { + try { + cache.close(); + } catch (Exception e) { + logger.warn("Failed to stop readiness cache", e); + } + } + + @Override + public void nodeChanged() { + refreshAdminReady(); + } + + private void refreshAdminReady() { + try { + ChildData nodeData = cache.getCurrentData(); + if (nodeData != null) { + byte[] data = nodeData.getData(); + DatacenterReadinessList readiness = mapper.readValue(data, DatacenterReadinessList.class); + readinessPerDatacenter = readiness.datacenters().stream() + .collect(Collectors.toMap(DatacenterReadiness::getDatacenter, e -> e.getStatus() == READY)); + } else { + readinessPerDatacenter = Collections.emptyMap(); + } + } catch (Exception e) { + logger.error("Failed reloading readiness cache.", e); + } + } + + public boolean isLocalDatacenterReady() { + return isDatacenterReady(localDatacenterName); + } + + public boolean isDatacenterReady(String datacenter) { + return readinessPerDatacenter.getOrDefault(datacenter, true); + } +} diff --git a/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/server/DefaultReadinessChecker.java b/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/readiness/DefaultReadinessChecker.java similarity index 59% rename from hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/server/DefaultReadinessChecker.java rename to hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/readiness/DefaultReadinessChecker.java index 5cfb7806f9..aa7ca002ce 100644 --- a/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/server/DefaultReadinessChecker.java +++ b/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/readiness/DefaultReadinessChecker.java @@ -1,16 +1,10 @@ -package pl.allegro.tech.hermes.frontend.server; +package pl.allegro.tech.hermes.frontend.readiness; -import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.util.concurrent.ThreadFactoryBuilder; -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.recipes.cache.ChildData; -import org.apache.curator.framework.recipes.cache.NodeCache; -import org.apache.curator.framework.recipes.cache.NodeCacheListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import pl.allegro.tech.hermes.api.Readiness; -import pl.allegro.tech.hermes.common.exception.InternalProcessingException; -import pl.allegro.tech.hermes.infrastructure.zookeeper.ZookeeperPaths; +import pl.allegro.tech.hermes.frontend.server.MetadataLoadingResult; +import pl.allegro.tech.hermes.frontend.server.TopicMetadataLoadingRunner; import java.time.Duration; import java.util.List; @@ -19,7 +13,7 @@ import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; -public class DefaultReadinessChecker implements ReadinessChecker, NodeCacheListener { +public class DefaultReadinessChecker implements ReadinessChecker { private static final Logger logger = LoggerFactory.getLogger(DefaultReadinessChecker.class); private final boolean enabled; @@ -27,16 +21,12 @@ public class DefaultReadinessChecker implements ReadinessChecker, NodeCacheListe private final Duration interval; private final TopicMetadataLoadingRunner topicMetadataLoadingRunner; private final ScheduledExecutorService scheduler; - private final ObjectMapper mapper; - private final NodeCache cache; + private final AdminReadinessService adminReadinessService; - private volatile boolean adminReady = false; private volatile boolean ready = false; public DefaultReadinessChecker(TopicMetadataLoadingRunner topicMetadataLoadingRunner, - CuratorFramework curator, - ZookeeperPaths paths, - ObjectMapper mapper, + AdminReadinessService adminReadinessService, boolean enabled, boolean kafkaCheckEnabled, Duration interval) { @@ -44,14 +34,7 @@ public DefaultReadinessChecker(TopicMetadataLoadingRunner topicMetadataLoadingRu this.kafkaCheckEnabled = kafkaCheckEnabled; this.interval = interval; this.topicMetadataLoadingRunner = topicMetadataLoadingRunner; - this.mapper = mapper; - this.cache = new NodeCache(curator, paths.frontendReadinessPath()); - cache.getListenable().addListener(this); - try { - cache.start(true); - } catch (Exception e) { - throw new InternalProcessingException("Readiness cache cannot start.", e); - } + this.adminReadinessService = adminReadinessService; ThreadFactory threadFactory = new ThreadFactoryBuilder() .setNameFormat("ReadinessChecker-%d").build(); this.scheduler = Executors.newSingleThreadScheduledExecutor(threadFactory); @@ -68,7 +51,6 @@ public boolean isReady() { @Override public void start() { if (enabled) { - refreshAdminReady(); ReadinessCheckerJob job = new ReadinessCheckerJob(); job.run(); scheduler.scheduleAtFixedRate(job, interval.toSeconds(), interval.toSeconds(), TimeUnit.SECONDS); @@ -79,31 +61,6 @@ public void start() { public void stop() throws InterruptedException { scheduler.shutdown(); scheduler.awaitTermination(1, TimeUnit.MINUTES); - try { - cache.close(); - } catch (Exception e) { - logger.warn("Failed to stop readiness cache", e); - } - } - - @Override - public void nodeChanged() { - refreshAdminReady(); - } - - private void refreshAdminReady() { - try { - ChildData nodeData = cache.getCurrentData(); - if (nodeData != null) { - byte[] data = nodeData.getData(); - Readiness value = mapper.readValue(data, Readiness.class); - adminReady = value.isReady(); - } else { - adminReady = true; - } - } catch (Exception e) { - logger.error("Failed reloading readiness cache. Current value: ready=" + ready, e); - } } private class ReadinessCheckerJob implements Runnable { @@ -111,7 +68,7 @@ private class ReadinessCheckerJob implements Runnable { @Override public void run() { - if (!adminReady) { + if (!adminReadinessService.isLocalDatacenterReady()) { ready = false; } else if (kafkaReady) { ready = true; diff --git a/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/services/HealthCheckService.java b/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/readiness/HealthCheckService.java similarity index 74% rename from hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/services/HealthCheckService.java rename to hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/readiness/HealthCheckService.java index 6f9c727bd3..f63cf9733a 100644 --- a/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/services/HealthCheckService.java +++ b/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/readiness/HealthCheckService.java @@ -1,8 +1,5 @@ -package pl.allegro.tech.hermes.frontend.services; +package pl.allegro.tech.hermes.frontend.readiness; -import jakarta.inject.Singleton; - -@Singleton public class HealthCheckService { private volatile boolean shutdown = true; diff --git a/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/server/ReadinessChecker.java b/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/readiness/ReadinessChecker.java similarity index 71% rename from hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/server/ReadinessChecker.java rename to hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/readiness/ReadinessChecker.java index 15ca439572..17c9c36c33 100644 --- a/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/server/ReadinessChecker.java +++ b/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/readiness/ReadinessChecker.java @@ -1,4 +1,4 @@ -package pl.allegro.tech.hermes.frontend.server; +package pl.allegro.tech.hermes.frontend.readiness; public interface ReadinessChecker { diff --git a/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/server/HealthCheckHandler.java b/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/server/HealthCheckHandler.java index d2e3cd5127..c55db74fe2 100644 --- a/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/server/HealthCheckHandler.java +++ b/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/server/HealthCheckHandler.java @@ -2,7 +2,7 @@ import io.undertow.server.HttpHandler; import io.undertow.server.HttpServerExchange; -import pl.allegro.tech.hermes.frontend.services.HealthCheckService; +import pl.allegro.tech.hermes.frontend.readiness.HealthCheckService; import static io.undertow.util.StatusCodes.OK; import static io.undertow.util.StatusCodes.SERVICE_UNAVAILABLE; diff --git a/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/server/HermesServer.java b/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/server/HermesServer.java index b2472f4b74..da15daf716 100644 --- a/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/server/HermesServer.java +++ b/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/server/HermesServer.java @@ -9,7 +9,8 @@ import pl.allegro.tech.hermes.common.metric.MetricsFacade; import pl.allegro.tech.hermes.frontend.publishing.handlers.ThroughputLimiter; import pl.allegro.tech.hermes.frontend.publishing.preview.MessagePreviewPersister; -import pl.allegro.tech.hermes.frontend.services.HealthCheckService; +import pl.allegro.tech.hermes.frontend.readiness.HealthCheckService; +import pl.allegro.tech.hermes.frontend.readiness.ReadinessChecker; import java.net.InetSocketAddress; @@ -46,6 +47,7 @@ public HermesServer( HermesServerParameters hermesServerParameters, MetricsFacade metricsFacade, HttpHandler publishingHandler, + HealthCheckService healthCheckService, ReadinessChecker readinessChecker, MessagePreviewPersister messagePreviewPersister, ThroughputLimiter throughputLimiter, @@ -59,7 +61,7 @@ public HermesServer( this.metricsFacade = metricsFacade; this.publishingHandler = publishingHandler; this.prometheusMeterRegistry = prometheusMeterRegistry; - this.healthCheckService = new HealthCheckService(); + this.healthCheckService = healthCheckService; this.readinessChecker = readinessChecker; this.messagePreviewPersister = messagePreviewPersister; this.topicMetadataLoadingJob = topicMetadataLoadingJob; diff --git a/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/server/MetadataLoadingResult.java b/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/server/MetadataLoadingResult.java index 5cd35dceb7..9008e668ef 100644 --- a/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/server/MetadataLoadingResult.java +++ b/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/server/MetadataLoadingResult.java @@ -2,7 +2,7 @@ import pl.allegro.tech.hermes.api.TopicName; -final class MetadataLoadingResult { +public final class MetadataLoadingResult { enum Type { SUCCESS, FAILURE } @@ -31,7 +31,7 @@ TopicName getTopicName() { return topicName; } - boolean isFailure() { + public boolean isFailure() { return Type.FAILURE == type; } } diff --git a/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/server/ReadinessCheckHandler.java b/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/server/ReadinessCheckHandler.java index 8bad1a15a1..cb93af2333 100644 --- a/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/server/ReadinessCheckHandler.java +++ b/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/server/ReadinessCheckHandler.java @@ -2,7 +2,8 @@ import io.undertow.server.HttpHandler; import io.undertow.server.HttpServerExchange; -import pl.allegro.tech.hermes.frontend.services.HealthCheckService; +import pl.allegro.tech.hermes.frontend.readiness.HealthCheckService; +import pl.allegro.tech.hermes.frontend.readiness.ReadinessChecker; import static io.undertow.util.StatusCodes.OK; import static io.undertow.util.StatusCodes.SERVICE_UNAVAILABLE; diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/api/ReadinessEndpoint.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/api/ReadinessEndpoint.java index f8f3b379be..be8a50732c 100644 --- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/api/ReadinessEndpoint.java +++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/api/ReadinessEndpoint.java @@ -45,6 +45,6 @@ public Response setReadiness(@PathParam("datacenter") String datacenter, Readine @Produces(APPLICATION_JSON) @RolesAllowed(Roles.ADMIN) public List getReadiness() { - return readinessService.getDatacentersReadinesses(); + return readinessService.getDatacentersReadiness(); } } diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/config/ReadinessConfiguration.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/config/ReadinessConfiguration.java index cef5e831dc..9a5a9d1984 100644 --- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/config/ReadinessConfiguration.java +++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/config/ReadinessConfiguration.java @@ -2,15 +2,24 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import pl.allegro.tech.hermes.management.config.storage.StorageClustersProperties; +import pl.allegro.tech.hermes.management.config.storage.StorageProperties; import pl.allegro.tech.hermes.management.domain.dc.MultiDatacenterRepositoryCommandExecutor; -import pl.allegro.tech.hermes.management.domain.dc.MultiDatacenterRepositoryQueryExecutor; +import pl.allegro.tech.hermes.management.domain.readiness.DatacenterReadinessRepository; import pl.allegro.tech.hermes.management.domain.readiness.ReadinessService; +import java.util.List; + @Configuration public class ReadinessConfiguration { + @Bean ReadinessService readinessService(MultiDatacenterRepositoryCommandExecutor commandExecutor, - MultiDatacenterRepositoryQueryExecutor queryExecutor) { - return new ReadinessService(commandExecutor, queryExecutor); + DatacenterReadinessRepository readinessRepository, + StorageClustersProperties storageClustersProperties) { + List datacenters = storageClustersProperties.getClusters().stream() + .map(StorageProperties::getDatacenter) + .toList(); + return new ReadinessService(commandExecutor, readinessRepository, datacenters); } } diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/config/storage/StorageConfiguration.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/config/storage/StorageConfiguration.java index 2dfe56d48c..9bf60b1c79 100644 --- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/config/storage/StorageConfiguration.java +++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/config/storage/StorageConfiguration.java @@ -29,11 +29,12 @@ import pl.allegro.tech.hermes.infrastructure.zookeeper.ZookeeperWorkloadConstraintsRepository; import pl.allegro.tech.hermes.management.domain.blacklist.TopicBlacklistRepository; import pl.allegro.tech.hermes.management.domain.dc.MultiDatacenterRepositoryCommandExecutor; -import pl.allegro.tech.hermes.management.domain.dc.MultiDatacenterRepositoryQueryExecutor; import pl.allegro.tech.hermes.management.domain.mode.ModeService; +import pl.allegro.tech.hermes.management.domain.readiness.DatacenterReadinessRepository; import pl.allegro.tech.hermes.management.domain.retransmit.OfflineRetransmissionRepository; import pl.allegro.tech.hermes.management.infrastructure.blacklist.ZookeeperTopicBlacklistRepository; import pl.allegro.tech.hermes.management.infrastructure.metrics.SummedSharedCounter; +import pl.allegro.tech.hermes.management.infrastructure.readiness.ZookeeperDatacenterReadinessRepository; import pl.allegro.tech.hermes.management.infrastructure.retransmit.ZookeeperOfflineRetransmissionRepository; import pl.allegro.tech.hermes.management.infrastructure.zookeeper.ZookeeperClient; import pl.allegro.tech.hermes.management.infrastructure.zookeeper.ZookeeperClientManager; @@ -95,12 +96,6 @@ MultiDatacenterRepositoryCommandExecutor multiDcRepositoryCommandExecutor( ); } - @Bean - MultiDatacenterRepositoryQueryExecutor multiDcRepositoryQueryExecutor( - ZookeeperGroupRepositoryFactory zookeeperGroupRepositoryFactory) { - return new MultiDatacenterRepositoryQueryExecutor(repositoryManager(zookeeperGroupRepositoryFactory)); - } - @Bean SummedSharedCounter summedSharedCounter(ZookeeperClientManager manager) { return new SummedSharedCounter( @@ -168,4 +163,10 @@ OfflineRetransmissionRepository zookeeperOfflineRetransmissionRepository() { ZookeeperClient localClient = clientManager().getLocalClient(); return new ZookeeperOfflineRetransmissionRepository(localClient.getCuratorFramework(), objectMapper, zookeeperPaths()); } + + @Bean + DatacenterReadinessRepository readinessRepository() { + ZookeeperClient localClient = clientManager().getLocalClient(); + return new ZookeeperDatacenterReadinessRepository(localClient.getCuratorFramework(), objectMapper, zookeeperPaths()); + } } diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/dc/MultiDatacenterRepositoryQueryExecutor.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/dc/MultiDatacenterRepositoryQueryExecutor.java deleted file mode 100644 index 483e558ae4..0000000000 --- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/dc/MultiDatacenterRepositoryQueryExecutor.java +++ /dev/null @@ -1,32 +0,0 @@ -package pl.allegro.tech.hermes.management.domain.dc; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.List; -import java.util.stream.Collectors; - -public class MultiDatacenterRepositoryQueryExecutor { - - private static final Logger logger = LoggerFactory.getLogger(MultiDatacenterRepositoryQueryExecutor.class); - - private final RepositoryManager repositoryManager; - - public MultiDatacenterRepositoryQueryExecutor(RepositoryManager repositoryManager) { - this.repositoryManager = repositoryManager; - } - - public List> execute(QueryCommand command) { - - List> repoHolders = repositoryManager.getRepositories(command.getRepositoryType()); - - return repoHolders.stream().map(holder -> { - try { - return new DatacenterBoundQueryResult<>(command.query(holder), holder.getDatacenterName()); - } catch (Exception e) { - logger.warn("Execute failed with an error", e); - throw ExceptionWrapper.wrapInInternalProcessingExceptionIfNeeded(e, command.toString(), holder.getDatacenterName()); - } - }).collect(Collectors.toList()); - } -} diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/dc/QueryCommand.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/dc/QueryCommand.java deleted file mode 100644 index b76d0c07c1..0000000000 --- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/dc/QueryCommand.java +++ /dev/null @@ -1,7 +0,0 @@ -package pl.allegro.tech.hermes.management.domain.dc; - -public abstract class QueryCommand { - public abstract T query(DatacenterBoundRepositoryHolder holder); - - public abstract Class getRepositoryType(); -} diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/readiness/DatacenterReadinessRepository.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/readiness/DatacenterReadinessRepository.java new file mode 100644 index 0000000000..d1270c3a14 --- /dev/null +++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/readiness/DatacenterReadinessRepository.java @@ -0,0 +1,12 @@ +package pl.allegro.tech.hermes.management.domain.readiness; + +import pl.allegro.tech.hermes.api.DatacenterReadiness; + +import java.util.List; + +public interface DatacenterReadinessRepository { + + List getReadiness(); + + void setReadiness(List datacenterReadiness); +} diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/readiness/GetReadinessQuery.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/readiness/GetReadinessQuery.java deleted file mode 100644 index 918592dd05..0000000000 --- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/readiness/GetReadinessQuery.java +++ /dev/null @@ -1,28 +0,0 @@ -package pl.allegro.tech.hermes.management.domain.readiness; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import pl.allegro.tech.hermes.api.DatacenterReadiness.ReadinessStatus; -import pl.allegro.tech.hermes.management.domain.dc.DatacenterBoundRepositoryHolder; -import pl.allegro.tech.hermes.management.domain.dc.QueryCommand; - -public class GetReadinessQuery extends QueryCommand { - - private static final Logger logger = LoggerFactory.getLogger(GetReadinessQuery.class); - - @Override - public ReadinessStatus query(DatacenterBoundRepositoryHolder holder) { - try { - boolean ready = holder.getRepository().isReady(); - return ready ? ReadinessStatus.READY : ReadinessStatus.NOT_READY; - } catch (Exception e) { - logger.error("Cannot obtain readiness status from {}", holder.getDatacenterName(), e); - return ReadinessStatus.UNDEFINED; - } - } - - @Override - public Class getRepositoryType() { - return ReadinessRepository.class; - } -} diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/readiness/ReadinessRepository.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/readiness/ReadinessRepository.java deleted file mode 100644 index c33487f53e..0000000000 --- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/readiness/ReadinessRepository.java +++ /dev/null @@ -1,8 +0,0 @@ -package pl.allegro.tech.hermes.management.domain.readiness; - -public interface ReadinessRepository { - - boolean isReady(); - - void setReadiness(boolean isReady); -} diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/readiness/ReadinessService.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/readiness/ReadinessService.java index f9a01f4f52..21dc0cde7b 100644 --- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/readiness/ReadinessService.java +++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/readiness/ReadinessService.java @@ -2,28 +2,56 @@ import pl.allegro.tech.hermes.api.DatacenterReadiness; import pl.allegro.tech.hermes.management.domain.dc.MultiDatacenterRepositoryCommandExecutor; -import pl.allegro.tech.hermes.management.domain.dc.MultiDatacenterRepositoryQueryExecutor; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.function.Function; import java.util.stream.Collectors; +import static pl.allegro.tech.hermes.api.DatacenterReadiness.ReadinessStatus.READY; + public class ReadinessService { + private final MultiDatacenterRepositoryCommandExecutor commandExecutor; - private final MultiDatacenterRepositoryQueryExecutor queryExecutor; + private final DatacenterReadinessRepository readinessRepository; + private final List datacenters; public ReadinessService(MultiDatacenterRepositoryCommandExecutor commandExecutor, - MultiDatacenterRepositoryQueryExecutor queryExecutor) { + DatacenterReadinessRepository readinessRepository, + List datacenters) { this.commandExecutor = commandExecutor; - this.queryExecutor = queryExecutor; + this.readinessRepository = readinessRepository; + this.datacenters = datacenters; } public void setReady(DatacenterReadiness datacenterReadiness) { - commandExecutor.execute(new SetReadinessCommand(datacenterReadiness)); + Map current = getReadiness(); + Map toSave = new HashMap<>(); + for (String datacenter : datacenters) { + toSave.put(datacenter, current.get(datacenter)); + } + toSave.put(datacenterReadiness.getDatacenter(), datacenterReadiness); + List readiness = toSave.values().stream().toList(); + commandExecutor.execute(new SetReadinessCommand(readiness)); + } + + public List getDatacentersReadiness() { + Map current = getReadiness(); + Map result = new HashMap<>(); + for (String datacenter : datacenters) { + DatacenterReadiness datacenterReadiness = current.get(datacenter); + if (datacenterReadiness == null) { + result.put(datacenter, new DatacenterReadiness(datacenter, READY)); + } else { + result.put(datacenter, datacenterReadiness); + } + } + return result.values().stream().toList(); } - public List getDatacentersReadinesses() { - return queryExecutor.execute(new GetReadinessQuery()).stream() - .map(r -> new DatacenterReadiness(r.getDatacenterName(), r.getResult())) - .collect(Collectors.toList()); + private Map getReadiness() { + return readinessRepository.getReadiness().stream() + .collect(Collectors.toMap(DatacenterReadiness::getDatacenter, Function.identity())); } } diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/readiness/SetReadinessCommand.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/readiness/SetReadinessCommand.java index a96f0caec9..5cc0777f49 100644 --- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/readiness/SetReadinessCommand.java +++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/readiness/SetReadinessCommand.java @@ -1,44 +1,32 @@ package pl.allegro.tech.hermes.management.domain.readiness; import pl.allegro.tech.hermes.api.DatacenterReadiness; -import pl.allegro.tech.hermes.common.exception.InternalProcessingException; import pl.allegro.tech.hermes.management.domain.dc.DatacenterBoundRepositoryHolder; import pl.allegro.tech.hermes.management.domain.dc.RepositoryCommand; -public class SetReadinessCommand extends RepositoryCommand { - private final DatacenterReadiness readiness; +import java.util.List; - public SetReadinessCommand(DatacenterReadiness readiness) { +public class SetReadinessCommand extends RepositoryCommand { + private final List readiness; + + public SetReadinessCommand(List readiness) { this.readiness = readiness; } @Override - public void backup(DatacenterBoundRepositoryHolder holder) { } + public void backup(DatacenterBoundRepositoryHolder holder) { } @Override - public void execute(DatacenterBoundRepositoryHolder holder) { - if (holder.getDatacenterName().equals(readiness.getDatacenter())) { - holder.getRepository().setReadiness(isReady()); - } - } - - private boolean isReady() { - switch (readiness.getStatus()) { - case READY: - return true; - case NOT_READY: - return false; - default: - throw new InternalProcessingException("Invalid readiness status: " + readiness.getStatus()); - } + public void execute(DatacenterBoundRepositoryHolder holder) { + holder.getRepository().setReadiness(readiness); } @Override - public void rollback(DatacenterBoundRepositoryHolder holder) { } + public void rollback(DatacenterBoundRepositoryHolder holder) { } @Override - public Class getRepositoryType() { - return ReadinessRepository.class; + public Class getRepositoryType() { + return DatacenterReadinessRepository.class; } @Override diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/readiness/ZookeeperDatacenterReadinessRepository.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/readiness/ZookeeperDatacenterReadinessRepository.java index ec1d0fe0f3..b928dcd461 100644 --- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/readiness/ZookeeperDatacenterReadinessRepository.java +++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/readiness/ZookeeperDatacenterReadinessRepository.java @@ -3,40 +3,46 @@ import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.curator.framework.CuratorFramework; import org.apache.zookeeper.KeeperException; -import pl.allegro.tech.hermes.api.Readiness; +import pl.allegro.tech.hermes.api.DatacenterReadiness; import pl.allegro.tech.hermes.common.exception.InternalProcessingException; +import pl.allegro.tech.hermes.domain.readiness.DatacenterReadinessList; +import pl.allegro.tech.hermes.infrastructure.MalformedDataException; import pl.allegro.tech.hermes.infrastructure.zookeeper.ZookeeperBasedRepository; import pl.allegro.tech.hermes.infrastructure.zookeeper.ZookeeperPaths; -import pl.allegro.tech.hermes.management.domain.readiness.ReadinessRepository; +import pl.allegro.tech.hermes.management.domain.readiness.DatacenterReadinessRepository; -public class ZookeeperDatacenterReadinessRepository extends ZookeeperBasedRepository implements ReadinessRepository { +import java.util.Collections; +import java.util.List; + +public class ZookeeperDatacenterReadinessRepository extends ZookeeperBasedRepository implements DatacenterReadinessRepository { public ZookeeperDatacenterReadinessRepository(CuratorFramework curator, ObjectMapper mapper, ZookeeperPaths paths) { super(curator, mapper, paths); } @Override - public boolean isReady() { + public List getReadiness() { try { - String path = paths.frontendReadinessPath(); - Readiness readiness = readFrom(path, Readiness.class); - return readiness.isReady(); + String path = paths.datacenterReadinessPath(); + return readFrom(path, DatacenterReadinessList.class).datacenters(); } catch (InternalProcessingException e) { if (e.getCause() instanceof KeeperException.NoNodeException) { - return true; + return Collections.emptyList(); } throw e; + } catch (MalformedDataException e) { + return Collections.emptyList(); } } @Override - public void setReadiness(boolean isReady) { + public void setReadiness(List datacenterReadiness) { try { - String path = paths.frontendReadinessPath(); + String path = paths.datacenterReadinessPath(); if (!pathExists(path)) { - createRecursively(path, new Readiness(isReady)); + createRecursively(path, new DatacenterReadinessList(datacenterReadiness)); } else { - overwrite(path, new Readiness(isReady)); + overwrite(path, new DatacenterReadinessList(datacenterReadiness)); } } catch (Exception ex) { throw new InternalProcessingException(ex); diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/zookeeper/ZookeeperRepositoryManager.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/zookeeper/ZookeeperRepositoryManager.java index 56724b7a24..8d72b11395 100644 --- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/zookeeper/ZookeeperRepositoryManager.java +++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/zookeeper/ZookeeperRepositoryManager.java @@ -28,7 +28,7 @@ import pl.allegro.tech.hermes.management.domain.blacklist.TopicBlacklistRepository; import pl.allegro.tech.hermes.management.domain.dc.DatacenterBoundRepositoryHolder; import pl.allegro.tech.hermes.management.domain.dc.RepositoryManager; -import pl.allegro.tech.hermes.management.domain.readiness.ReadinessRepository; +import pl.allegro.tech.hermes.management.domain.readiness.DatacenterReadinessRepository; import pl.allegro.tech.hermes.management.domain.retransmit.OfflineRetransmissionRepository; import pl.allegro.tech.hermes.management.infrastructure.blacklist.ZookeeperTopicBlacklistRepository; import pl.allegro.tech.hermes.management.infrastructure.readiness.ZookeeperDatacenterReadinessRepository; @@ -58,7 +58,7 @@ public class ZookeeperRepositoryManager implements RepositoryManager { private final Map workloadConstraintsRepositoriesByDc = new HashMap<>(); private final Map lastUndeliveredMessageReaderByDc = new HashMap<>(); private final Map adminToolByDc = new HashMap<>(); - private final Map readinessRepositoriesByDc = new HashMap<>(); + private final Map readinessRepositoriesByDc = new HashMap<>(); private final Map offlineRetransmissionRepositoriesByDc = new HashMap<>(); private final ZookeeperGroupRepositoryFactory zookeeperGroupRepositoryFactory; @@ -115,7 +115,7 @@ public void start() { AdminTool adminTool = new ZookeeperAdminTool(paths, client.getCuratorFramework(), mapper); adminToolByDc.put(dcName, adminTool); - ReadinessRepository readinessRepository = new ZookeeperDatacenterReadinessRepository(zookeeper, mapper, paths); + DatacenterReadinessRepository readinessRepository = new ZookeeperDatacenterReadinessRepository(zookeeper, mapper, paths); readinessRepositoriesByDc.put(dcName, readinessRepository); ZookeeperOfflineRetransmissionRepository offlineRetransmissionRepository = @@ -166,7 +166,7 @@ private void initRepositoryTypeMap() { repositoryByType.put(WorkloadConstraintsRepository.class, workloadConstraintsRepositoriesByDc); repositoryByType.put(LastUndeliveredMessageReader.class, lastUndeliveredMessageReaderByDc); repositoryByType.put(AdminTool.class, adminToolByDc); - repositoryByType.put(ReadinessRepository.class, readinessRepositoriesByDc); + repositoryByType.put(DatacenterReadinessRepository.class, readinessRepositoriesByDc); repositoryByType.put(OfflineRetransmissionRepository.class, offlineRetransmissionRepositoriesByDc); } } diff --git a/integration-tests/src/integrationTest/java/pl/allegro/tech/hermes/integrationtests/ReadinessCheckTest.java b/integration-tests/src/integrationTest/java/pl/allegro/tech/hermes/integrationtests/ReadinessCheckTest.java index 069482c8e8..90176dd5be 100644 --- a/integration-tests/src/integrationTest/java/pl/allegro/tech/hermes/integrationtests/ReadinessCheckTest.java +++ b/integration-tests/src/integrationTest/java/pl/allegro/tech/hermes/integrationtests/ReadinessCheckTest.java @@ -1,13 +1,10 @@ package pl.allegro.tech.hermes.integrationtests; import com.jayway.awaitility.Duration; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; import pl.allegro.tech.hermes.integrationtests.setup.HermesExtension; -import java.util.Objects; - import static com.jayway.awaitility.Awaitility.waitAtMost; import static pl.allegro.tech.hermes.infrastructure.dc.DefaultDatacenterNameProvider.DEFAULT_DC_NAME; @@ -16,25 +13,28 @@ public class ReadinessCheckTest { @RegisterExtension public static final HermesExtension hermes = new HermesExtension(); - //TODO Fix this test later - @Test - @Disabled public void shouldRespectReadinessStatusSetByAdmin() { // when hermes.api().setReadiness(DEFAULT_DC_NAME, false).expectStatus().isAccepted(); // then - waitAtMost(Duration.FIVE_SECONDS).until( - () -> Objects.equals(hermes.api().getReadiness().expectStatus().is5xxServerError().expectBody(String.class).returnResult().getResponseBody(), "NOT_READY") + waitAtMost(Duration.FIVE_SECONDS).until(() -> + hermes.api() + .getReadiness() + .expectStatus().is5xxServerError() + .expectBody(String.class).isEqualTo("NOT_READY") ); // when hermes.api().setReadiness(DEFAULT_DC_NAME, true).expectStatus().isAccepted(); // then - waitAtMost(Duration.FIVE_SECONDS).until( - () -> Objects.equals(hermes.api().getReadiness().expectStatus().isOk().expectBody(String.class).returnResult().getResponseBody(), "READY") + waitAtMost(Duration.FIVE_SECONDS).until(() -> + hermes.api() + .getReadiness() + .expectStatus().isOk() + .expectBody(String.class).isEqualTo("READY") ); } }