Skip to content

Commit

Permalink
Merge pull request #47 from jitendradhawan/exclude_services_zk_error_…
Browse files Browse the repository at this point in the history
…handling

Zk Failure Handling , Exclude Services and Fix RangerHealthCheck registration
  • Loading branch information
santanusinha authored Nov 21, 2024
2 parents bc5b586 + c0ecf7a commit 1b353aa
Show file tree
Hide file tree
Showing 17 changed files with 177 additions and 78 deletions.
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
All notable changes to this project will be documented in this file.

## [1.1-RC2]

- Add feature to exclude services from service data source
- Create one single `RangerHealthCheck` for all curatorFrameworks when giving multiple zookeeper connection strings
- Update `lastUpdatedTimeStamp` for service nodes in service registry for zk / http communication failures from any kind of node data source - zk/http/drove
- Pertaining to PR : https://github.com/appform-io/ranger/pull/22/, building of a ServiceFinderHub and a ServiceFinder are bounded.

## [1.0-RC18]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,14 @@
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Predicate;
import java.util.stream.Collectors;

import io.appform.ranger.core.util.FinderUtils;
import lombok.Getter;
import lombok.experimental.SuperBuilder;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -46,8 +51,18 @@ public abstract class AbstractRangerHubClient<T, R extends ServiceRegistry<T>, D
private int nodeRefreshTimeMs;
private ServiceFinderHub<T, R> hub;
private ServiceDataSource serviceDataSource;
private long serviceRefreshDurationMs;
private long hubRefreshDurationMs;

/**
* Initial time to wait for service node data to be refreshed in service registry (in milliseconds)
*/
private long serviceRefreshTimeoutMs;

/**
* Time to wait for Hub Start completion (in milliseconds)
* Hub Start is considered to be completed if service registry for all eligible services has been refreshed
*/
private long hubStartTimeoutMs;
private Set<String> excludedServices;

@Override
public void start() {
Expand All @@ -62,23 +77,26 @@ public void start() {
}
this.nodeRefreshTimeMs = Math.max(HubConstants.MINIMUM_REFRESH_TIME_MS, this.nodeRefreshTimeMs);

if (this.serviceRefreshDurationMs <= 0) {
if (this.serviceRefreshTimeoutMs <= 0) {
log.warn("Service Refresh interval too low: {} ms. Has been upgraded to {} ms ",
this.serviceRefreshDurationMs,
HubConstants.SERVICE_REFRESH_DURATION_MS);
this.serviceRefreshDurationMs = HubConstants.SERVICE_REFRESH_DURATION_MS;
this.serviceRefreshTimeoutMs,
HubConstants.SERVICE_REFRESH_TIMEOUT_MS);
this.serviceRefreshTimeoutMs = HubConstants.SERVICE_REFRESH_TIMEOUT_MS;
}

if (this.hubRefreshDurationMs <= 0) {
if (this.hubStartTimeoutMs <= 0) {
log.warn("Hub Refresh interval too low: {} ms. Has been upgraded to {} ms ",
this.hubRefreshDurationMs,
HubConstants.HUB_REFRESH_DURATION_MS);
this.hubRefreshDurationMs = HubConstants.HUB_REFRESH_DURATION_MS;
this.hubStartTimeoutMs,
HubConstants.HUB_START_TIMEOUT_MS);
this.hubStartTimeoutMs = HubConstants.HUB_START_TIMEOUT_MS;
}

this.excludedServices = Objects.requireNonNullElseGet(this.excludedServices, Set::of);

if(null == this.serviceDataSource){
this.serviceDataSource = getDefaultDataSource();
}

this.hub = buildHub();
this.hub.start();
}
Expand Down Expand Up @@ -153,7 +171,7 @@ public List<ServiceNode<T>> getAllNodes(
@Override
public Collection<Service> getRegisteredServices() {
try {
return this.getHub().getServiceDataSource().services();
return FinderUtils.getEligibleServices(this.getHub().getServiceDataSource().services(), excludedServices);
}
catch (Exception e) {
log.error("Call to the hub failed with exception, {}", e.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import io.appform.ranger.core.signals.Signal;
import io.appform.ranger.core.util.Exceptions;
import io.appform.ranger.core.util.FinderUtils;
import java.util.concurrent.atomic.AtomicBoolean;
import lombok.extern.slf4j.Slf4j;
import lombok.val;

Expand All @@ -47,7 +48,7 @@ public class ServiceRegistryUpdater<T, D extends Deserializer<T>> {

private final Lock checkLock = new ReentrantLock();
private final Condition checkCondition = checkLock.newCondition();
private boolean checkForUpdate = false;
private final AtomicBoolean checkForUpdate = new AtomicBoolean(false);
private Future<Void> queryThreadFuture;

private final ExecutorService executorService = Executors.newFixedThreadPool(1);
Expand Down Expand Up @@ -95,7 +96,7 @@ public void checkForUpdate(T signalData) {
Preconditions.checkArgument(null == signalData);
try {
checkLock.lock();
checkForUpdate = true;
checkForUpdate.set(true);
checkCondition.signalAll();
}
finally {
Expand All @@ -108,7 +109,7 @@ private Void queryExecutor() {
while (true) {
try {
checkLock.lock();
while (!checkForUpdate) {
while (!checkForUpdate.get()) {
checkCondition.await();
}
updateRegistry();
Expand All @@ -122,7 +123,7 @@ private Void queryExecutor() {
log.error("Registry update failed for service: " + serviceRegistry.getService().name(), e);
}
finally {
checkForUpdate = false;
checkForUpdate.set(false);
checkLock.unlock();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import io.appform.ranger.core.signals.ScheduledSignal;
import io.appform.ranger.core.signals.Signal;
import io.appform.ranger.core.util.Exceptions;
import io.appform.ranger.core.util.FinderUtils;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import lombok.val;
Expand All @@ -52,7 +53,7 @@ public class ServiceFinderHub<T, R extends ServiceRegistry<T>> {
new AtomicReference<>(new ConcurrentHashMap<>());
private final Lock updateLock = new ReentrantLock();
private final Condition updateCond = updateLock.newCondition();
private boolean updateAvailable = false;
private final AtomicBoolean updateAvailable = new AtomicBoolean(false);
private final ExecutorService executorService = Executors.newFixedThreadPool(1);

@Getter
Expand All @@ -72,8 +73,10 @@ public class ServiceFinderHub<T, R extends ServiceRegistry<T>> {
private final AtomicInteger poolThreadIndex = new AtomicInteger(0);
private Future<?> monitorFuture = null;

private final long serviceRefreshDurationMs;
private final long hubRefreshDurationMs;
private final long serviceRefreshTimeoutMs;
private final long hubStartTimeoutMs;

private final Set<String> excludedServices;

private final ForkJoinPool refresherPool;

Expand All @@ -82,23 +85,25 @@ public ServiceFinderHub(
ServiceFinderFactory<T, R> finderFactory
) {
this(serviceDataSource, finderFactory,
HubConstants.SERVICE_REFRESH_DURATION_MS, HubConstants.HUB_REFRESH_DURATION_MS);
HubConstants.SERVICE_REFRESH_TIMEOUT_MS, HubConstants.HUB_START_TIMEOUT_MS, Set.of());
}

public ServiceFinderHub(
ServiceDataSource serviceDataSource,
ServiceFinderFactory<T, R> finderFactory,
long serviceRefreshDurationMs,
long hubRefreshDurationMs) {
long serviceRefreshTimeoutMs,
long hubStartTimeoutMs,
final Set<String> excludedServices) {
this.serviceDataSource = serviceDataSource;
this.finderFactory = finderFactory;
this.serviceRefreshDurationMs = serviceRefreshDurationMs == 0 ? HubConstants.SERVICE_REFRESH_DURATION_MS : serviceRefreshDurationMs;
this.hubRefreshDurationMs = hubRefreshDurationMs == 0 ? HubConstants.HUB_REFRESH_DURATION_MS : hubRefreshDurationMs;
this.serviceRefreshTimeoutMs = serviceRefreshTimeoutMs == 0 ? HubConstants.SERVICE_REFRESH_TIMEOUT_MS : serviceRefreshTimeoutMs;
this.hubStartTimeoutMs = hubStartTimeoutMs == 0 ? HubConstants.HUB_START_TIMEOUT_MS : hubStartTimeoutMs;
this.refreshSignals.add(new ScheduledSignal<>("service-hub-updater",
() -> null,
Collections.emptyList(),
10_000));
10_000));
this.refresherPool = createRefresherPool();
this.excludedServices = Objects.requireNonNullElseGet(excludedServices, Set::of);
}

public Optional<ServiceFinder<T, R>> finder(final Service service) {
Expand Down Expand Up @@ -155,7 +160,7 @@ public void registerUpdateSignal(final Signal<Void> refreshSignal) {
public void updateAvailable() {
try {
updateLock.lock();
updateAvailable = true;
updateAvailable.set(true);
updateCond.signalAll();
}
finally {
Expand All @@ -179,7 +184,7 @@ private void monitor() {
while (true) {
try {
updateLock.lock();
while (!updateAvailable) {
while (!updateAvailable.get()) {
updateCond.await();
}
updateRegistry();
Expand All @@ -190,7 +195,7 @@ private void monitor() {
break;
}
finally {
updateAvailable = false;
updateAvailable.set(false);
updateLock.unlock();
}
}
Expand All @@ -204,7 +209,7 @@ private void updateRegistry() {
alreadyUpdating.set(true);
val updatedFinders = new ConcurrentHashMap<Service, ServiceFinder<T, R>>();
try {
val services = serviceDataSource.services();
val services = FinderUtils.getEligibleServices(serviceDataSource.services(), excludedServices);
if (services.isEmpty()) {
log.debug("No services found for the service data source. Skipping update on the registry");
return;
Expand Down Expand Up @@ -239,13 +244,13 @@ private void updateRegistry() {
}

private void waitTillHubIsReady() {
val services = serviceDataSource.services();
val timeToRefresh = Math.max(hubRefreshDurationMs,
(serviceRefreshDurationMs * services.size()) / refresherPool.getParallelism());
if (timeToRefresh != hubRefreshDurationMs) {
val services = FinderUtils.getEligibleServices(serviceDataSource.services(), excludedServices);
val timeToRefresh = Math.max(hubStartTimeoutMs,
(serviceRefreshTimeoutMs * services.size()) / refresherPool.getParallelism());
if (timeToRefresh != hubStartTimeoutMs) {
log.warn("Max hub refresh time has been dynamically adjusted to {} ms from the provided {} ms as the " +
"provided time would have been insufficient to refresh {} services.",
timeToRefresh, hubRefreshDurationMs, services.size());
timeToRefresh, hubStartTimeoutMs, services.size());
}
val hubRefresher = CompletableFuture.allOf(
services.stream()
Expand Down Expand Up @@ -273,7 +278,7 @@ private void waitTillServiceIsReady(Service service) {
try {
RetryerBuilder.<Boolean>newBuilder()
.retryIfResult(r -> !r)
.withStopStrategy(StopStrategies.stopAfterDelay(serviceRefreshDurationMs, TimeUnit.MILLISECONDS))
.withStopStrategy(StopStrategies.stopAfterDelay(serviceRefreshTimeoutMs, TimeUnit.MILLISECONDS))
.build()
.call(() -> Optional.ofNullable(getFinders().get().get(service))
.map(ServiceFinder::getServiceRegistry)
Expand All @@ -285,4 +290,5 @@ private void waitTillServiceIsReady(Service service) {
.illegalState("Could not perform initial state for service: " + service.getServiceName(), e);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@
import io.appform.ranger.core.model.ServiceRegistry;
import io.appform.ranger.core.signals.ScheduledSignal;
import io.appform.ranger.core.signals.Signal;

import java.util.*;

import lombok.val;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.function.Consumer;

/**
Expand All @@ -38,8 +38,9 @@ public abstract class ServiceFinderHubBuilder<T, R extends ServiceRegistry<T>> {
private final List<Consumer<Void>> extraStartSignalConsumers = new ArrayList<>();
private final List<Consumer<Void>> extraStopSignalConsumers = new ArrayList<>();
private final List<Signal<Void>> extraRefreshSignals = new ArrayList<>();
private long serviceRefreshDurationMs = HubConstants.SERVICE_REFRESH_DURATION_MS;
private long hubRefreshDurationMs = HubConstants.HUB_REFRESH_DURATION_MS;
private long serviceRefreshTimeoutMs = HubConstants.SERVICE_REFRESH_TIMEOUT_MS;
private long hubStartTimeoutMs = HubConstants.HUB_START_TIMEOUT_MS;
private Set<String> excludedServices = new HashSet<>();

public ServiceFinderHubBuilder<T, R> withServiceDataSource(ServiceDataSource serviceDataSource) {
this.serviceDataSource = serviceDataSource;
Expand All @@ -50,7 +51,7 @@ public ServiceFinderHubBuilder<T, R> withServiceFinderFactory(ServiceFinderFacto
this.serviceFinderFactory = serviceFinderFactory;
return this;
}

public ServiceFinderHubBuilder<T, R> withRefreshFrequencyMs(long refreshFrequencyMs) {
this.refreshFrequencyMs = refreshFrequencyMs;
return this;
Expand All @@ -71,13 +72,18 @@ public ServiceFinderHubBuilder<T, R> withExtraRefreshSignal(Signal<Void> extraRe
return this;
}

public ServiceFinderHubBuilder<T, R> withServiceRefreshDuration(long serviceRefreshDurationMs) {
this.serviceRefreshDurationMs = serviceRefreshDurationMs;
public ServiceFinderHubBuilder<T, R> withServiceRefreshTimeout(long serviceRefreshTimeoutMs) {
this.serviceRefreshTimeoutMs = serviceRefreshTimeoutMs;
return this;
}

public ServiceFinderHubBuilder<T, R> withHubStartTimeout(long hubStartTimeoutMs) {
this.hubStartTimeoutMs = hubStartTimeoutMs;
return this;
}

public ServiceFinderHubBuilder<T, R> withHubRefreshDuration(long hubRefreshDurationMs) {
this.hubRefreshDurationMs = hubRefreshDurationMs;
public ServiceFinderHubBuilder<T, R> withExcludedServices(Set<String> excludedServices) {
this.excludedServices = Objects.requireNonNullElseGet(excludedServices, Set::of);
return this;
}

Expand All @@ -86,8 +92,8 @@ public ServiceFinderHub<T, R> build() {
Preconditions.checkNotNull(serviceDataSource, "Provide a non-null service data source");
Preconditions.checkNotNull(serviceFinderFactory, "Provide a non-null service finder factory");

val hub = new ServiceFinderHub<>(serviceDataSource, serviceFinderFactory,
serviceRefreshDurationMs, hubRefreshDurationMs);
val hub = new ServiceFinderHub<>(serviceDataSource, serviceFinderFactory, serviceRefreshTimeoutMs,
hubStartTimeoutMs, excludedServices);
final ScheduledSignal<Void> refreshSignal = new ScheduledSignal<>("service-hub-refresh-timer",
() -> null,
Collections.emptyList(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@

@UtilityClass
public class HubConstants {
public static final long SERVICE_REFRESH_DURATION_MS = 10_000;
public static final long HUB_REFRESH_DURATION_MS = 30_000;
public static final int SERVICE_REFRESH_TIMEOUT_MS = 10_000;
public static final int HUB_START_TIMEOUT_MS = 30_000;
public static final long REFRESH_FREQUENCY_MS = 10_000;
public static final int CONNECTION_RETRY_TIME_MS = 5_000;
public static final int MINIMUM_REFRESH_TIME_MS = 5_000;
public static final int MINIMUM_SERVICE_REFRESH_TIMEOUT_MS = 1_000;
public static final int MINIMUM_HUB_START_TIMEOUT_MS = 5_000;
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

/**
Expand All @@ -32,6 +33,14 @@
@UtilityClass
public class FinderUtils {

public static Set<Service> getEligibleServices(final Collection<Service> services,
final Set<String> excludedServices) {
return services.stream()
.filter(service -> !excludedServices.contains(service.getServiceName()))
.collect(Collectors.toSet());
}


public static<T> List<ServiceNode<T>> filterValidNodes(
final Service service,
final Collection<ServiceNode<T>> serviceNodes,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.appform.ranger.core.model.*;
import io.appform.ranger.core.units.TestNodeData;
import io.appform.ranger.core.utils.RangerTestUtils;
import java.util.Set;
import lombok.val;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -73,7 +74,7 @@ void testDelayedServiceAddition() {
.withServiceName(service.getServiceName())
.withDeserializer(new Deserializer<TestNodeData>() {})
.withSleepDuration(5)
.build(), 1_000, 5_000
.build(), 1_000, 5_000, Set.of()
);
Assertions.assertThrows(IllegalStateException.class, delayedHub::start);
val serviceFinderHub = new ServiceFinderHub<>(new DynamicDataSource(Lists.newArrayList(new Service("NS", "SERVICE"))),
Expand All @@ -82,7 +83,7 @@ void testDelayedServiceAddition() {
.withServiceName(service.getServiceName())
.withDeserializer(new Deserializer<TestNodeData>() {})
.withSleepDuration(1)
.build(), 5_000, 5_000
.build(), 5_000, 5_000, Set.of()
);
serviceFinderHub.start();
Assertions.assertTrue(serviceFinderHub.finder(new Service("NS", "SERVICE")).isPresent());
Expand Down
Loading

0 comments on commit 1b353aa

Please sign in to comment.