diff --git a/CHANGELOG.md b/CHANGELOG.md index 2ccdca2f..636acb5e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,7 +2,9 @@ All notable changes to this project will be documented in this file. ## [1.1-RC2] - +- Add feature to exclude services from service data source +- Create one single `RangerHealthCheck` for all curatorFrameworks when giving multiple zookeeper connection strings +- Update `lastUpdatedTimeStamp` for service nodes in service registry for zk / http communication failures from any kind of node data source - zk/http/drove - Pertaining to PR : https://github.com/appform-io/ranger/pull/22/, building of a ServiceFinderHub and a ServiceFinder are bounded. ## [1.0-RC18] diff --git a/ranger-client/src/main/java/io/appform/ranger/client/AbstractRangerHubClient.java b/ranger-client/src/main/java/io/appform/ranger/client/AbstractRangerHubClient.java index 4c816a86..f48b6b38 100644 --- a/ranger-client/src/main/java/io/appform/ranger/client/AbstractRangerHubClient.java +++ b/ranger-client/src/main/java/io/appform/ranger/client/AbstractRangerHubClient.java @@ -26,9 +26,14 @@ import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Objects; import java.util.Optional; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.function.Predicate; +import java.util.stream.Collectors; + +import io.appform.ranger.core.util.FinderUtils; import lombok.Getter; import lombok.experimental.SuperBuilder; import lombok.extern.slf4j.Slf4j; @@ -46,8 +51,18 @@ public abstract class AbstractRangerHubClient, D private int nodeRefreshTimeMs; private ServiceFinderHub hub; private ServiceDataSource serviceDataSource; - private long serviceRefreshDurationMs; - private long hubRefreshDurationMs; + + /** + * Initial time to wait for service node data to be refreshed in service registry (in milliseconds) + */ + private long serviceRefreshTimeoutMs; + + /** + * Time to wait for Hub Start completion (in milliseconds) + * Hub Start is considered to be completed if service registry for all eligible services has been refreshed + */ + private long hubStartTimeoutMs; + private Set excludedServices; @Override public void start() { @@ -62,23 +77,26 @@ public void start() { } this.nodeRefreshTimeMs = Math.max(HubConstants.MINIMUM_REFRESH_TIME_MS, this.nodeRefreshTimeMs); - if (this.serviceRefreshDurationMs <= 0) { + if (this.serviceRefreshTimeoutMs <= 0) { log.warn("Service Refresh interval too low: {} ms. Has been upgraded to {} ms ", - this.serviceRefreshDurationMs, - HubConstants.SERVICE_REFRESH_DURATION_MS); - this.serviceRefreshDurationMs = HubConstants.SERVICE_REFRESH_DURATION_MS; + this.serviceRefreshTimeoutMs, + HubConstants.SERVICE_REFRESH_TIMEOUT_MS); + this.serviceRefreshTimeoutMs = HubConstants.SERVICE_REFRESH_TIMEOUT_MS; } - if (this.hubRefreshDurationMs <= 0) { + if (this.hubStartTimeoutMs <= 0) { log.warn("Hub Refresh interval too low: {} ms. Has been upgraded to {} ms ", - this.hubRefreshDurationMs, - HubConstants.HUB_REFRESH_DURATION_MS); - this.hubRefreshDurationMs = HubConstants.HUB_REFRESH_DURATION_MS; + this.hubStartTimeoutMs, + HubConstants.HUB_START_TIMEOUT_MS); + this.hubStartTimeoutMs = HubConstants.HUB_START_TIMEOUT_MS; } + this.excludedServices = Objects.requireNonNullElseGet(this.excludedServices, Set::of); + if(null == this.serviceDataSource){ this.serviceDataSource = getDefaultDataSource(); } + this.hub = buildHub(); this.hub.start(); } @@ -153,7 +171,7 @@ public List> getAllNodes( @Override public Collection getRegisteredServices() { try { - return this.getHub().getServiceDataSource().services(); + return FinderUtils.getEligibleServices(this.getHub().getServiceDataSource().services(), excludedServices); } catch (Exception e) { log.error("Call to the hub failed with exception, {}", e.getMessage()); diff --git a/ranger-core/src/main/java/io/appform/ranger/core/finder/serviceregistry/ServiceRegistryUpdater.java b/ranger-core/src/main/java/io/appform/ranger/core/finder/serviceregistry/ServiceRegistryUpdater.java index 92a2b340..4d8f17f7 100644 --- a/ranger-core/src/main/java/io/appform/ranger/core/finder/serviceregistry/ServiceRegistryUpdater.java +++ b/ranger-core/src/main/java/io/appform/ranger/core/finder/serviceregistry/ServiceRegistryUpdater.java @@ -26,6 +26,7 @@ import io.appform.ranger.core.signals.Signal; import io.appform.ranger.core.util.Exceptions; import io.appform.ranger.core.util.FinderUtils; +import java.util.concurrent.atomic.AtomicBoolean; import lombok.extern.slf4j.Slf4j; import lombok.val; @@ -47,7 +48,7 @@ public class ServiceRegistryUpdater> { private final Lock checkLock = new ReentrantLock(); private final Condition checkCondition = checkLock.newCondition(); - private boolean checkForUpdate = false; + private final AtomicBoolean checkForUpdate = new AtomicBoolean(false); private Future queryThreadFuture; private final ExecutorService executorService = Executors.newFixedThreadPool(1); @@ -95,7 +96,7 @@ public void checkForUpdate(T signalData) { Preconditions.checkArgument(null == signalData); try { checkLock.lock(); - checkForUpdate = true; + checkForUpdate.set(true); checkCondition.signalAll(); } finally { @@ -108,7 +109,7 @@ private Void queryExecutor() { while (true) { try { checkLock.lock(); - while (!checkForUpdate) { + while (!checkForUpdate.get()) { checkCondition.await(); } updateRegistry(); @@ -122,7 +123,7 @@ private Void queryExecutor() { log.error("Registry update failed for service: " + serviceRegistry.getService().name(), e); } finally { - checkForUpdate = false; + checkForUpdate.set(false); checkLock.unlock(); } } diff --git a/ranger-core/src/main/java/io/appform/ranger/core/finderhub/ServiceFinderHub.java b/ranger-core/src/main/java/io/appform/ranger/core/finderhub/ServiceFinderHub.java index 2611b4f0..f2ecd452 100644 --- a/ranger-core/src/main/java/io/appform/ranger/core/finderhub/ServiceFinderHub.java +++ b/ranger-core/src/main/java/io/appform/ranger/core/finderhub/ServiceFinderHub.java @@ -26,6 +26,7 @@ import io.appform.ranger.core.signals.ScheduledSignal; import io.appform.ranger.core.signals.Signal; import io.appform.ranger.core.util.Exceptions; +import io.appform.ranger.core.util.FinderUtils; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import lombok.val; @@ -52,7 +53,7 @@ public class ServiceFinderHub> { new AtomicReference<>(new ConcurrentHashMap<>()); private final Lock updateLock = new ReentrantLock(); private final Condition updateCond = updateLock.newCondition(); - private boolean updateAvailable = false; + private final AtomicBoolean updateAvailable = new AtomicBoolean(false); private final ExecutorService executorService = Executors.newFixedThreadPool(1); @Getter @@ -72,8 +73,10 @@ public class ServiceFinderHub> { private final AtomicInteger poolThreadIndex = new AtomicInteger(0); private Future monitorFuture = null; - private final long serviceRefreshDurationMs; - private final long hubRefreshDurationMs; + private final long serviceRefreshTimeoutMs; + private final long hubStartTimeoutMs; + + private final Set excludedServices; private final ForkJoinPool refresherPool; @@ -82,23 +85,25 @@ public ServiceFinderHub( ServiceFinderFactory finderFactory ) { this(serviceDataSource, finderFactory, - HubConstants.SERVICE_REFRESH_DURATION_MS, HubConstants.HUB_REFRESH_DURATION_MS); + HubConstants.SERVICE_REFRESH_TIMEOUT_MS, HubConstants.HUB_START_TIMEOUT_MS, Set.of()); } public ServiceFinderHub( ServiceDataSource serviceDataSource, ServiceFinderFactory finderFactory, - long serviceRefreshDurationMs, - long hubRefreshDurationMs) { + long serviceRefreshTimeoutMs, + long hubStartTimeoutMs, + final Set excludedServices) { this.serviceDataSource = serviceDataSource; this.finderFactory = finderFactory; - this.serviceRefreshDurationMs = serviceRefreshDurationMs == 0 ? HubConstants.SERVICE_REFRESH_DURATION_MS : serviceRefreshDurationMs; - this.hubRefreshDurationMs = hubRefreshDurationMs == 0 ? HubConstants.HUB_REFRESH_DURATION_MS : hubRefreshDurationMs; + this.serviceRefreshTimeoutMs = serviceRefreshTimeoutMs == 0 ? HubConstants.SERVICE_REFRESH_TIMEOUT_MS : serviceRefreshTimeoutMs; + this.hubStartTimeoutMs = hubStartTimeoutMs == 0 ? HubConstants.HUB_START_TIMEOUT_MS : hubStartTimeoutMs; this.refreshSignals.add(new ScheduledSignal<>("service-hub-updater", () -> null, Collections.emptyList(), - 10_000)); + 10_000)); this.refresherPool = createRefresherPool(); + this.excludedServices = Objects.requireNonNullElseGet(excludedServices, Set::of); } public Optional> finder(final Service service) { @@ -155,7 +160,7 @@ public void registerUpdateSignal(final Signal refreshSignal) { public void updateAvailable() { try { updateLock.lock(); - updateAvailable = true; + updateAvailable.set(true); updateCond.signalAll(); } finally { @@ -179,7 +184,7 @@ private void monitor() { while (true) { try { updateLock.lock(); - while (!updateAvailable) { + while (!updateAvailable.get()) { updateCond.await(); } updateRegistry(); @@ -190,7 +195,7 @@ private void monitor() { break; } finally { - updateAvailable = false; + updateAvailable.set(false); updateLock.unlock(); } } @@ -204,7 +209,7 @@ private void updateRegistry() { alreadyUpdating.set(true); val updatedFinders = new ConcurrentHashMap>(); try { - val services = serviceDataSource.services(); + val services = FinderUtils.getEligibleServices(serviceDataSource.services(), excludedServices); if (services.isEmpty()) { log.debug("No services found for the service data source. Skipping update on the registry"); return; @@ -239,13 +244,13 @@ private void updateRegistry() { } private void waitTillHubIsReady() { - val services = serviceDataSource.services(); - val timeToRefresh = Math.max(hubRefreshDurationMs, - (serviceRefreshDurationMs * services.size()) / refresherPool.getParallelism()); - if (timeToRefresh != hubRefreshDurationMs) { + val services = FinderUtils.getEligibleServices(serviceDataSource.services(), excludedServices); + val timeToRefresh = Math.max(hubStartTimeoutMs, + (serviceRefreshTimeoutMs * services.size()) / refresherPool.getParallelism()); + if (timeToRefresh != hubStartTimeoutMs) { log.warn("Max hub refresh time has been dynamically adjusted to {} ms from the provided {} ms as the " + "provided time would have been insufficient to refresh {} services.", - timeToRefresh, hubRefreshDurationMs, services.size()); + timeToRefresh, hubStartTimeoutMs, services.size()); } val hubRefresher = CompletableFuture.allOf( services.stream() @@ -273,7 +278,7 @@ private void waitTillServiceIsReady(Service service) { try { RetryerBuilder.newBuilder() .retryIfResult(r -> !r) - .withStopStrategy(StopStrategies.stopAfterDelay(serviceRefreshDurationMs, TimeUnit.MILLISECONDS)) + .withStopStrategy(StopStrategies.stopAfterDelay(serviceRefreshTimeoutMs, TimeUnit.MILLISECONDS)) .build() .call(() -> Optional.ofNullable(getFinders().get().get(service)) .map(ServiceFinder::getServiceRegistry) @@ -285,4 +290,5 @@ private void waitTillServiceIsReady(Service service) { .illegalState("Could not perform initial state for service: " + service.getServiceName(), e); } } + } diff --git a/ranger-core/src/main/java/io/appform/ranger/core/finderhub/ServiceFinderHubBuilder.java b/ranger-core/src/main/java/io/appform/ranger/core/finderhub/ServiceFinderHubBuilder.java index 4a47b397..6f85c4c1 100644 --- a/ranger-core/src/main/java/io/appform/ranger/core/finderhub/ServiceFinderHubBuilder.java +++ b/ranger-core/src/main/java/io/appform/ranger/core/finderhub/ServiceFinderHubBuilder.java @@ -20,11 +20,11 @@ import io.appform.ranger.core.model.ServiceRegistry; import io.appform.ranger.core.signals.ScheduledSignal; import io.appform.ranger.core.signals.Signal; + +import java.util.*; + import lombok.val; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; import java.util.function.Consumer; /** @@ -38,8 +38,9 @@ public abstract class ServiceFinderHubBuilder> { private final List> extraStartSignalConsumers = new ArrayList<>(); private final List> extraStopSignalConsumers = new ArrayList<>(); private final List> extraRefreshSignals = new ArrayList<>(); - private long serviceRefreshDurationMs = HubConstants.SERVICE_REFRESH_DURATION_MS; - private long hubRefreshDurationMs = HubConstants.HUB_REFRESH_DURATION_MS; + private long serviceRefreshTimeoutMs = HubConstants.SERVICE_REFRESH_TIMEOUT_MS; + private long hubStartTimeoutMs = HubConstants.HUB_START_TIMEOUT_MS; + private Set excludedServices = new HashSet<>(); public ServiceFinderHubBuilder withServiceDataSource(ServiceDataSource serviceDataSource) { this.serviceDataSource = serviceDataSource; @@ -50,7 +51,7 @@ public ServiceFinderHubBuilder withServiceFinderFactory(ServiceFinderFacto this.serviceFinderFactory = serviceFinderFactory; return this; } - + public ServiceFinderHubBuilder withRefreshFrequencyMs(long refreshFrequencyMs) { this.refreshFrequencyMs = refreshFrequencyMs; return this; @@ -71,13 +72,18 @@ public ServiceFinderHubBuilder withExtraRefreshSignal(Signal extraRe return this; } - public ServiceFinderHubBuilder withServiceRefreshDuration(long serviceRefreshDurationMs) { - this.serviceRefreshDurationMs = serviceRefreshDurationMs; + public ServiceFinderHubBuilder withServiceRefreshTimeout(long serviceRefreshTimeoutMs) { + this.serviceRefreshTimeoutMs = serviceRefreshTimeoutMs; + return this; + } + + public ServiceFinderHubBuilder withHubStartTimeout(long hubStartTimeoutMs) { + this.hubStartTimeoutMs = hubStartTimeoutMs; return this; } - public ServiceFinderHubBuilder withHubRefreshDuration(long hubRefreshDurationMs) { - this.hubRefreshDurationMs = hubRefreshDurationMs; + public ServiceFinderHubBuilder withExcludedServices(Set excludedServices) { + this.excludedServices = Objects.requireNonNullElseGet(excludedServices, Set::of); return this; } @@ -86,8 +92,8 @@ public ServiceFinderHub build() { Preconditions.checkNotNull(serviceDataSource, "Provide a non-null service data source"); Preconditions.checkNotNull(serviceFinderFactory, "Provide a non-null service finder factory"); - val hub = new ServiceFinderHub<>(serviceDataSource, serviceFinderFactory, - serviceRefreshDurationMs, hubRefreshDurationMs); + val hub = new ServiceFinderHub<>(serviceDataSource, serviceFinderFactory, serviceRefreshTimeoutMs, + hubStartTimeoutMs, excludedServices); final ScheduledSignal refreshSignal = new ScheduledSignal<>("service-hub-refresh-timer", () -> null, Collections.emptyList(), diff --git a/ranger-core/src/main/java/io/appform/ranger/core/model/HubConstants.java b/ranger-core/src/main/java/io/appform/ranger/core/model/HubConstants.java index 6e782d8f..9dca422a 100644 --- a/ranger-core/src/main/java/io/appform/ranger/core/model/HubConstants.java +++ b/ranger-core/src/main/java/io/appform/ranger/core/model/HubConstants.java @@ -19,9 +19,11 @@ @UtilityClass public class HubConstants { - public static final long SERVICE_REFRESH_DURATION_MS = 10_000; - public static final long HUB_REFRESH_DURATION_MS = 30_000; + public static final int SERVICE_REFRESH_TIMEOUT_MS = 10_000; + public static final int HUB_START_TIMEOUT_MS = 30_000; public static final long REFRESH_FREQUENCY_MS = 10_000; public static final int CONNECTION_RETRY_TIME_MS = 5_000; public static final int MINIMUM_REFRESH_TIME_MS = 5_000; + public static final int MINIMUM_SERVICE_REFRESH_TIMEOUT_MS = 1_000; + public static final int MINIMUM_HUB_START_TIMEOUT_MS = 5_000; } diff --git a/ranger-core/src/main/java/io/appform/ranger/core/util/FinderUtils.java b/ranger-core/src/main/java/io/appform/ranger/core/util/FinderUtils.java index 5fa2f760..412486b4 100644 --- a/ranger-core/src/main/java/io/appform/ranger/core/util/FinderUtils.java +++ b/ranger-core/src/main/java/io/appform/ranger/core/util/FinderUtils.java @@ -23,6 +23,7 @@ import java.util.Collection; import java.util.List; +import java.util.Set; import java.util.stream.Collectors; /** @@ -32,6 +33,14 @@ @UtilityClass public class FinderUtils { + public static Set getEligibleServices(final Collection services, + final Set excludedServices) { + return services.stream() + .filter(service -> !excludedServices.contains(service.getServiceName())) + .collect(Collectors.toSet()); + } + + public static List> filterValidNodes( final Service service, final Collection> serviceNodes, diff --git a/ranger-core/src/test/java/io/appform/ranger/core/finderhub/ServiceFinderHubTest.java b/ranger-core/src/test/java/io/appform/ranger/core/finderhub/ServiceFinderHubTest.java index 3778ee5c..535f99ad 100644 --- a/ranger-core/src/test/java/io/appform/ranger/core/finderhub/ServiceFinderHubTest.java +++ b/ranger-core/src/test/java/io/appform/ranger/core/finderhub/ServiceFinderHubTest.java @@ -27,6 +27,7 @@ import io.appform.ranger.core.model.*; import io.appform.ranger.core.units.TestNodeData; import io.appform.ranger.core.utils.RangerTestUtils; +import java.util.Set; import lombok.val; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -73,7 +74,7 @@ void testDelayedServiceAddition() { .withServiceName(service.getServiceName()) .withDeserializer(new Deserializer() {}) .withSleepDuration(5) - .build(), 1_000, 5_000 + .build(), 1_000, 5_000, Set.of() ); Assertions.assertThrows(IllegalStateException.class, delayedHub::start); val serviceFinderHub = new ServiceFinderHub<>(new DynamicDataSource(Lists.newArrayList(new Service("NS", "SERVICE"))), @@ -82,7 +83,7 @@ void testDelayedServiceAddition() { .withServiceName(service.getServiceName()) .withDeserializer(new Deserializer() {}) .withSleepDuration(1) - .build(), 5_000, 5_000 + .build(), 5_000, 5_000, Set.of() ); serviceFinderHub.start(); Assertions.assertTrue(serviceFinderHub.finder(new Service("NS", "SERVICE")).isPresent()); diff --git a/ranger-drove-client/src/main/java/io/appform/ranger/client/drove/AbstractRangerDroveHubClient.java b/ranger-drove-client/src/main/java/io/appform/ranger/client/drove/AbstractRangerDroveHubClient.java index 31604db5..8d028cca 100644 --- a/ranger-drove-client/src/main/java/io/appform/ranger/client/drove/AbstractRangerDroveHubClient.java +++ b/ranger-drove-client/src/main/java/io/appform/ranger/client/drove/AbstractRangerDroveHubClient.java @@ -35,7 +35,7 @@ @Getter @SuperBuilder public abstract class AbstractRangerDroveHubClient, D extends DroveResponseDataDeserializer> - extends AbstractRangerHubClient { + extends AbstractRangerHubClient { private final DroveUpstreamConfig clientConfig; private final DroveCommunicator droveCommunicator; @@ -54,8 +54,9 @@ protected ServiceFinderHub buildHub() { .withServiceDataSource(getServiceDataSource()) .withServiceFinderFactory(getFinderFactory()) .withRefreshFrequencyMs(getNodeRefreshTimeMs()) - .withHubRefreshDuration(getHubRefreshDurationMs()) - .withServiceRefreshDuration(getServiceRefreshDurationMs()) + .withHubStartTimeout(getHubStartTimeoutMs()) + .withServiceRefreshTimeout(getServiceRefreshTimeoutMs()) + .withExcludedServices(getExcludedServices()) .build(); } } diff --git a/ranger-http-client/src/main/java/io/appform/ranger/client/http/AbstractRangerHttpHubClient.java b/ranger-http-client/src/main/java/io/appform/ranger/client/http/AbstractRangerHttpHubClient.java index 139363c3..9f9430e6 100644 --- a/ranger-http-client/src/main/java/io/appform/ranger/client/http/AbstractRangerHttpHubClient.java +++ b/ranger-http-client/src/main/java/io/appform/ranger/client/http/AbstractRangerHttpHubClient.java @@ -63,8 +63,9 @@ protected ServiceFinderHub buildHub() { .withServiceDataSource(getServiceDataSource()) .withServiceFinderFactory(getFinderFactory()) .withRefreshFrequencyMs(getNodeRefreshTimeMs()) - .withHubRefreshDuration(getHubRefreshDurationMs()) - .withServiceRefreshDuration(getServiceRefreshDurationMs()) + .withHubStartTimeout(getHubStartTimeoutMs()) + .withServiceRefreshTimeout(getServiceRefreshTimeoutMs()) + .withExcludedServices(getExcludedServices()) .build(); } } diff --git a/ranger-hub-server-bundle/src/main/java/io/appform/ranger/hub/server/bundle/RangerHubServerBundle.java b/ranger-hub-server-bundle/src/main/java/io/appform/ranger/hub/server/bundle/RangerHubServerBundle.java index b6013c1d..8a3eb670 100644 --- a/ranger-hub-server-bundle/src/main/java/io/appform/ranger/hub/server/bundle/RangerHubServerBundle.java +++ b/ranger-hub-server-bundle/src/main/java/io/appform/ranger/hub/server/bundle/RangerHubServerBundle.java @@ -67,7 +67,7 @@ protected List>> serverConfig.getUpstreams(), Collections.emptyList()); return upstreams.stream() .map(rangerUpstreamConfiguration -> rangerUpstreamConfiguration.accept(new HubCreatorVisitor( - serverConfig.getNamespace()))) + serverConfig.getNamespace(), serverConfig.getExcludedServices()))) .flatMap(Collection::stream) .toList(); } @@ -81,9 +81,7 @@ protected List> withLifecycleSignals(U configuration) { @Override protected List withHealthChecks(U configuration) { - return curatorFrameworks.stream() - .map(curatorFramework -> (HealthCheck) new RangerHealthCheck(curatorFramework)) - .toList(); + return List.of(new RangerHealthCheck(curatorFrameworks)); } @AllArgsConstructor @@ -91,6 +89,7 @@ private class HubCreatorVisitor implements RangerConfigurationVisitor>>> { private final String namespace; + private final Set excludedServices; private RangerHubClient> addCuratorAndGetZkHubClient( String zookeeper, RangerZkUpstreamConfiguration zkConfiguration) { @@ -106,9 +105,10 @@ private RangerHubClient> addCurat .curatorFramework(curatorFramework) .disablePushUpdaters(zkConfiguration.isDisablePushUpdaters()) .mapper(getMapper()) - .serviceRefreshDurationMs(zkConfiguration.getServiceRefreshDurationMs()) - .hubRefreshDurationMs(zkConfiguration.getServiceRefreshDurationMs()) + .serviceRefreshTimeoutMs(zkConfiguration.getServiceRefreshTimeoutMs()) + .hubStartTimeoutMs(zkConfiguration.getHubStartTimeoutMs()) .nodeRefreshTimeMs(zkConfiguration.getNodeRefreshTimeMs()) + .excludedServices(excludedServices) .deserializer(data -> { try { return getMapper().readValue(data, new TypeReference>() { @@ -129,9 +129,10 @@ private RangerHubClient> getHttpH .mapper(getMapper()) .clientConfig(httpClientConfig) .httpClient(RangerHttpUtils.httpClient(httpClientConfig, getMapper())) - .serviceRefreshDurationMs(httpConfiguration.getServiceRefreshDurationMs()) - .hubRefreshDurationMs(httpConfiguration.getServiceRefreshDurationMs()) + .serviceRefreshTimeoutMs(httpConfiguration.getServiceRefreshTimeoutMs()) + .hubStartTimeoutMs(httpConfiguration.getHubStartTimeoutMs()) .nodeRefreshTimeMs(httpConfiguration.getNodeRefreshTimeMs()) + .excludedServices(excludedServices) .deserializer(data -> { try { return getMapper().readValue(data, new TypeReference<>() {}); @@ -156,9 +157,10 @@ private RangerHubClient> getDrove .mapper(getMapper()) .clientConfig(droveConfig) .droveCommunicator(droveCommunicator) - .serviceRefreshDurationMs(droveUpstreamConfiguration.getServiceRefreshDurationMs()) - .hubRefreshDurationMs(droveUpstreamConfiguration.getServiceRefreshDurationMs()) + .serviceRefreshTimeoutMs(droveUpstreamConfiguration.getServiceRefreshTimeoutMs()) + .hubStartTimeoutMs(droveUpstreamConfiguration.getHubStartTimeoutMs()) .nodeRefreshTimeMs(droveUpstreamConfiguration.getNodeRefreshTimeMs()) + .excludedServices(excludedServices) .deserializer(new DroveResponseDataDeserializer<>() { @Override protected ShardInfo translate(ExposedAppInfo appInfo, ExposedAppInfo.ExposedHost host) { diff --git a/ranger-hub-server-bundle/src/main/java/io/appform/ranger/hub/server/bundle/configuration/RangerServerConfiguration.java b/ranger-hub-server-bundle/src/main/java/io/appform/ranger/hub/server/bundle/configuration/RangerServerConfiguration.java index 7211b6f1..e2232a33 100644 --- a/ranger-hub-server-bundle/src/main/java/io/appform/ranger/hub/server/bundle/configuration/RangerServerConfiguration.java +++ b/ranger-hub-server-bundle/src/main/java/io/appform/ranger/hub/server/bundle/configuration/RangerServerConfiguration.java @@ -15,6 +15,8 @@ */ package io.appform.ranger.hub.server.bundle.configuration; +import java.util.Objects; +import java.util.Set; import lombok.Builder; import lombok.Value; import lombok.extern.jackson.Jacksonized; @@ -38,4 +40,11 @@ public class RangerServerConfiguration { @NotEmpty @Valid List upstreams; + + Set excludedServices; + + public Set getExcludedServices() { + return Objects.requireNonNullElseGet(excludedServices,Set::of); + } + } diff --git a/ranger-hub-server-bundle/src/main/java/io/appform/ranger/hub/server/bundle/configuration/RangerUpstreamConfiguration.java b/ranger-hub-server-bundle/src/main/java/io/appform/ranger/hub/server/bundle/configuration/RangerUpstreamConfiguration.java index c418df4c..06ec4b43 100644 --- a/ranger-hub-server-bundle/src/main/java/io/appform/ranger/hub/server/bundle/configuration/RangerUpstreamConfiguration.java +++ b/ranger-hub-server-bundle/src/main/java/io/appform/ranger/hub/server/bundle/configuration/RangerUpstreamConfiguration.java @@ -41,8 +41,11 @@ public abstract class RangerUpstreamConfiguration { @Min(HubConstants.MINIMUM_REFRESH_TIME_MS) private int nodeRefreshTimeMs = HubConstants.MINIMUM_REFRESH_TIME_MS; - @Min(HubConstants.MINIMUM_REFRESH_TIME_MS) - private int serviceRefreshDurationMs = HubConstants.MINIMUM_REFRESH_TIME_MS; + @Min(HubConstants.MINIMUM_SERVICE_REFRESH_TIMEOUT_MS) + private int serviceRefreshTimeoutMs = HubConstants.SERVICE_REFRESH_TIMEOUT_MS; + + @Min(HubConstants.MINIMUM_HUB_START_TIMEOUT_MS) + private int hubStartTimeoutMs = HubConstants.HUB_START_TIMEOUT_MS; protected RangerUpstreamConfiguration(BackendType type) { this.type = type; diff --git a/ranger-hub-server-bundle/src/main/java/io/appform/ranger/hub/server/bundle/healthcheck/RangerHealthCheck.java b/ranger-hub-server-bundle/src/main/java/io/appform/ranger/hub/server/bundle/healthcheck/RangerHealthCheck.java index 69d4ce95..76fb0d3c 100644 --- a/ranger-hub-server-bundle/src/main/java/io/appform/ranger/hub/server/bundle/healthcheck/RangerHealthCheck.java +++ b/ranger-hub-server-bundle/src/main/java/io/appform/ranger/hub/server/bundle/healthcheck/RangerHealthCheck.java @@ -19,18 +19,22 @@ import lombok.extern.slf4j.Slf4j; import org.apache.curator.framework.CuratorFramework; +import java.util.List; + @Slf4j public class RangerHealthCheck extends HealthCheck { - private final CuratorFramework curatorFramework; + private final List curatorFrameworks; - public RangerHealthCheck(CuratorFramework curatorFramework){ - this.curatorFramework = curatorFramework; + public RangerHealthCheck(final List curatorFrameworks){ + this.curatorFrameworks = curatorFrameworks; } @Override protected Result check() { - return curatorFramework.getZookeeperClient().isConnected() ? - Result.healthy("Service is healthy") : Result.unhealthy("Can't connect to zookeeper"); + return curatorFrameworks.stream() + .allMatch(curatorFramework -> curatorFramework.getZookeeperClient().isConnected()) + ? Result.healthy("Service is healthy") + : Result.unhealthy("Can't connect to zookeeper"); } } diff --git a/ranger-zk-client/src/main/java/io/appform/ranger/client/zk/AbstractRangerZKHubClient.java b/ranger-zk-client/src/main/java/io/appform/ranger/client/zk/AbstractRangerZKHubClient.java index 36e46bdb..3d5b6694 100644 --- a/ranger-zk-client/src/main/java/io/appform/ranger/client/zk/AbstractRangerZKHubClient.java +++ b/ranger-zk-client/src/main/java/io/appform/ranger/client/zk/AbstractRangerZKHubClient.java @@ -46,8 +46,9 @@ protected ServiceFinderHub buildHub() { .withRefreshFrequencyMs(getNodeRefreshTimeMs()) .withServiceDataSource(getServiceDataSource()) .withServiceFinderFactory(getFinderFactory()) - .withHubRefreshDuration(getHubRefreshDurationMs()) - .withServiceRefreshDuration(getServiceRefreshDurationMs()) + .withHubStartTimeout(getHubStartTimeoutMs()) + .withServiceRefreshTimeout(getServiceRefreshTimeoutMs()) + .withExcludedServices(getExcludedServices()) .build(); } diff --git a/ranger-zookeeper/src/main/java/io/appform/ranger/zookeeper/servicefinder/ZkCommunicationException.java b/ranger-zookeeper/src/main/java/io/appform/ranger/zookeeper/servicefinder/ZkCommunicationException.java new file mode 100644 index 00000000..3de05d85 --- /dev/null +++ b/ranger-zookeeper/src/main/java/io/appform/ranger/zookeeper/servicefinder/ZkCommunicationException.java @@ -0,0 +1,29 @@ +/* + * Copyright 2024 Authors, Flipkart Internet Pvt. Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.appform.ranger.zookeeper.servicefinder; + +import io.appform.ranger.core.exceptions.CommunicationException; + +/** + * Thrown in case there is an issue communicating with the Zookeeper upstream. + + */ +public class ZkCommunicationException extends CommunicationException { + public ZkCommunicationException(final String message) { + super(message); + } +} diff --git a/ranger-zookeeper/src/main/java/io/appform/ranger/zookeeper/servicefinder/ZkNodeDataSource.java b/ranger-zookeeper/src/main/java/io/appform/ranger/zookeeper/servicefinder/ZkNodeDataSource.java index f2fea6fa..0548e622 100644 --- a/ranger-zookeeper/src/main/java/io/appform/ranger/zookeeper/servicefinder/ZkNodeDataSource.java +++ b/ranger-zookeeper/src/main/java/io/appform/ranger/zookeeper/servicefinder/ZkNodeDataSource.java @@ -92,9 +92,10 @@ private Optional>> checkForUpdateOnZookeeper(D deserializer) return Optional.of(Collections.emptyList()); } catch (Exception e) { - log.error("Error getting service data from zookeeper: ", e); + log.error("Error getting node data from zookeeper: ", e); + throw new ZkCommunicationException("Error getting node data from zookeeper: exception %s , message: %s" + .formatted(e.getClass().getSimpleName(), e.getMessage())); } - return Optional.empty(); } private Optional readChild(String parentPath, String child) throws Exception { @@ -107,8 +108,11 @@ private Optional readChild(String parentPath, String child) throws Excep return Optional.empty(); } catch (KeeperException e) { - log.error("Could not get data for node: " + path, e); + log.error("Could not get data for node: {}", path, e); return Optional.empty(); + } catch (Exception e){ + log.error("Could not read child for node: {}", path, e); + throw e; } }