diff --git a/pom.xml b/pom.xml
index 3b1effe5..7464ef80 100644
--- a/pom.xml
+++ b/pom.xml
@@ -98,7 +98,7 @@
UTF-8
- 33.2.0-jre
+ 33.3.0-jre
5.5.0
1.7.32
@@ -116,7 +116,7 @@
3.3.1
4.2.0
- 2.1.10
+ 2.1.12
1.2.12
diff --git a/ranger-client/src/test/java/io/appform/ranger/client/stubs/TestSimpleUnshardedServiceFinder.java b/ranger-client/src/test/java/io/appform/ranger/client/stubs/TestSimpleUnshardedServiceFinder.java
index f167d08b..3cbb0258 100644
--- a/ranger-client/src/test/java/io/appform/ranger/client/stubs/TestSimpleUnshardedServiceFinder.java
+++ b/ranger-client/src/test/java/io/appform/ranger/client/stubs/TestSimpleUnshardedServiceFinder.java
@@ -22,11 +22,11 @@
import io.appform.ranger.core.model.Service;
import io.appform.ranger.core.model.ServiceNode;
import io.appform.ranger.core.units.TestNodeData;
-import java.util.Optional;
import lombok.Builder;
import java.util.Collections;
import java.util.List;
+import java.util.Optional;
@Builder
public class TestSimpleUnshardedServiceFinder
@@ -51,6 +51,7 @@ public Optional>> refresh(Deserializer> {
private final ServiceRegistry serviceRegistry;
- private final NodeDataSource nodeDataSource;
+ private final NodeDataSource nodeDataSource;
private final D deserializer;
private final Lock checkLock = new ReentrantLock();
@@ -51,7 +54,7 @@ public class ServiceRegistryUpdater> {
public ServiceRegistryUpdater(
ServiceRegistry serviceRegistry,
- NodeDataSource nodeDataSource,
+ NodeDataSource nodeDataSource,
List> signalGenerators,
D deserializer) {
this.serviceRegistry = serviceRegistry;
@@ -70,6 +73,8 @@ public void start() {
try {
RetryerBuilder.newBuilder()
.retryIfResult(r -> null == r || !r)
+ .retryIfException()
+ .withWaitStrategy(WaitStrategies.fixedWait(1, TimeUnit.SECONDS))
.build()
.call(serviceRegistry::isRefreshed);
}
@@ -81,7 +86,7 @@ public void start() {
}
public void stop() {
- if(null != queryThreadFuture) {
+ if (null != queryThreadFuture) {
executorService.shutdownNow();
}
}
@@ -125,21 +130,42 @@ private Void queryExecutor() {
private void updateRegistry() throws InterruptedException {
log.debug("Checking for updates on data source for service: {}",
- serviceRegistry.getService().getServiceName());
- if(!nodeDataSource.isActive()) {
- log.warn("Node data source seems to be down. Keeping old list for {}",
- serviceRegistry.getService().getServiceName());
- return;
- }
- val nodeList = nodeDataSource.refresh(deserializer).orElse(null);
- if (null != nodeList) {
- log.debug("Updating nodeList of size: {} for [{}]", nodeList.size(),
- serviceRegistry.getService().getServiceName());
- serviceRegistry.updateNodes(nodeList);
+ serviceRegistry.getService().getServiceName());
+ var callFailed = false;
+ if (nodeDataSource.isActive()) { //Source should implement circuit breaker to fail fast and reopen after some
+ // time
+ try {
+ val nodeList = nodeDataSource.refresh(deserializer).orElse(null);
+ if (null != nodeList) {
+ log.debug("Updating nodeList of size: {} for [{}]", nodeList.size(),
+ serviceRegistry.getService().getServiceName());
+ val livenessCheckMaxAge = nodeDataSource.healthcheckZombieCheckThresholdTime(serviceRegistry.getService());
+ //Remove all stale nodes before updating. This is done centrally to ensure some data sources
+ //don't skip this check. Some control is still provided so that they can overload.
+ serviceRegistry.updateNodes(FinderUtils.filterValidNodes(serviceRegistry.getService(), nodeList, livenessCheckMaxAge));
+ }
+ else {
+ log.warn("Empty list returned from node data source. We are in a weird state. Keeping old list for {}",
+ serviceRegistry.getService().getServiceName());
+ }
+ }
+ catch (Exception e) {
+ log.error("Error updating data from registry. Error: [{}] {}",
+ e.getClass().getSimpleName(),
+ e.getMessage());
+ callFailed = true;
+ }
}
- else {
- log.warn("Empty list returned from node data source. We are in a weird state. Keeping old list for {}",
+ if (!nodeDataSource.isActive() || callFailed) {
+ val currTime = System.currentTimeMillis();
+ log.warn("Node data source seems to be down. Keeping old list for {}." +
+ " Will update timestamp to keep stale date relevant.",
serviceRegistry.getService().getServiceName());
+ serviceRegistry.updateNodes(serviceRegistry.nodeList()
+ .stream()
+ .filter(node -> HealthcheckStatus.healthy == node.getHealthcheckStatus())
+ .map(node -> node.setLastUpdatedTimeStamp(currTime))
+ .toList());
}
}
diff --git a/ranger-core/src/main/java/io/appform/ranger/core/finderhub/ServiceFinderHub.java b/ranger-core/src/main/java/io/appform/ranger/core/finderhub/ServiceFinderHub.java
index 9cc54a25..2611b4f0 100644
--- a/ranger-core/src/main/java/io/appform/ranger/core/finderhub/ServiceFinderHub.java
+++ b/ranger-core/src/main/java/io/appform/ranger/core/finderhub/ServiceFinderHub.java
@@ -18,7 +18,6 @@
import com.github.rholder.retry.RetryerBuilder;
import com.github.rholder.retry.StopStrategies;
import com.google.common.base.Stopwatch;
-import com.google.common.base.Supplier;
import io.appform.ranger.core.finder.ServiceFinder;
import io.appform.ranger.core.model.HubConstants;
import io.appform.ranger.core.model.Service;
@@ -34,11 +33,13 @@
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
+import java.util.function.Supplier;
import java.util.stream.Collectors;
/**
@@ -68,11 +69,14 @@ public class ServiceFinderHub> {
private final ServiceFinderFactory finderFactory;
private final AtomicBoolean alreadyUpdating = new AtomicBoolean(false);
+ private final AtomicInteger poolThreadIndex = new AtomicInteger(0);
private Future> monitorFuture = null;
private final long serviceRefreshDurationMs;
private final long hubRefreshDurationMs;
+ private final ForkJoinPool refresherPool;
+
public ServiceFinderHub(
ServiceDataSource serviceDataSource,
ServiceFinderFactory finderFactory
@@ -88,12 +92,13 @@ public ServiceFinderHub(
long hubRefreshDurationMs) {
this.serviceDataSource = serviceDataSource;
this.finderFactory = finderFactory;
- this.serviceRefreshDurationMs = serviceRefreshDurationMs;
- this.hubRefreshDurationMs = hubRefreshDurationMs;
+ this.serviceRefreshDurationMs = serviceRefreshDurationMs == 0 ? HubConstants.SERVICE_REFRESH_DURATION_MS : serviceRefreshDurationMs;
+ this.hubRefreshDurationMs = hubRefreshDurationMs == 0 ? HubConstants.HUB_REFRESH_DURATION_MS : hubRefreshDurationMs;
this.refreshSignals.add(new ScheduledSignal<>("service-hub-updater",
() -> null,
Collections.emptyList(),
10_000));
+ this.refresherPool = createRefresherPool();
}
public Optional> finder(final Service service) {
@@ -158,6 +163,18 @@ public void updateAvailable() {
}
}
+ private ForkJoinPool createRefresherPool() {
+ return new ForkJoinPool(
+ Math.max(20, Runtime.getRuntime().availableProcessors()),
+ pool -> {
+ val thread = ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool);
+ thread.setName("hub-refresher-" + poolThreadIndex.getAndIncrement());
+ return thread;
+ },
+ null,
+ false);
+ }
+
private void monitor() {
while (true) {
try {
@@ -185,7 +202,7 @@ private void updateRegistry() {
return;
}
alreadyUpdating.set(true);
- final Map> updatedFinders = new HashMap<>();
+ val updatedFinders = new ConcurrentHashMap>();
try {
val services = serviceDataSource.services();
if (services.isEmpty()) {
@@ -193,9 +210,10 @@ private void updateRegistry() {
return;
}
val knownServiceFinders = finders.get();
- val newFinders = services.stream()
- .filter(service -> !knownServiceFinders.containsKey(service))
- .collect(Collectors.toMap(Function.identity(), finderFactory::buildFinder));
+ val newFinders = refresherPool.submit(() -> services.parallelStream()
+ .filter(service -> !knownServiceFinders.containsKey(service))
+ .collect(Collectors.toMap(Function.identity(), finderFactory::buildFinder)))
+ .get();
val matchingServices = knownServiceFinders.entrySet()
.stream()
.filter(entry -> services.contains(entry.getKey()))
@@ -208,6 +226,10 @@ private void updateRegistry() {
updatedFinders.putAll(matchingServices);
finders.set(updatedFinders);
}
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ log.error("Refresh interrupted.");
+ }
catch (Exception e) {
log.error("Error updating service list. Will maintain older list", e);
}
@@ -217,29 +239,33 @@ private void updateRegistry() {
}
private void waitTillHubIsReady() {
+ val services = serviceDataSource.services();
+ val timeToRefresh = Math.max(hubRefreshDurationMs,
+ (serviceRefreshDurationMs * services.size()) / refresherPool.getParallelism());
+ if (timeToRefresh != hubRefreshDurationMs) {
+ log.warn("Max hub refresh time has been dynamically adjusted to {} ms from the provided {} ms as the " +
+ "provided time would have been insufficient to refresh {} services.",
+ timeToRefresh, hubRefreshDurationMs, services.size());
+ }
val hubRefresher = CompletableFuture.allOf(
- serviceDataSource.services()
- .stream()
+ services.stream()
.map(service -> CompletableFuture.supplyAsync((Supplier) () -> {
waitTillServiceIsReady(service);
return null;
- })).toArray(CompletableFuture[]::new)
- );
+ })).toArray(CompletableFuture[]::new));
try {
- hubRefresher.get(hubRefreshDurationMs, TimeUnit.MILLISECONDS);
+ hubRefresher.get(timeToRefresh, TimeUnit.MILLISECONDS);
}
catch (InterruptedException ie) {
Thread.currentThread().interrupt();
Exceptions.illegalState("Refresh interrupted");
}
catch (TimeoutException e) {
- Exceptions
- .illegalState("Couldn't perform service hub refresh at this time. " +
+ Exceptions.illegalState("Couldn't perform service hub refresh at this time. " +
"Refresh exceeded the start up time specified");
}
catch (Exception e) {
- Exceptions
- .illegalState("Couldn't perform hub refresh at this time", e);
+ Exceptions.illegalState("Couldn't perform hub refresh at this time", e);
}
}
diff --git a/ranger-core/src/main/java/io/appform/ranger/core/model/NodeDataSource.java b/ranger-core/src/main/java/io/appform/ranger/core/model/NodeDataSource.java
index bc4e3da1..86276b2e 100644
--- a/ranger-core/src/main/java/io/appform/ranger/core/model/NodeDataSource.java
+++ b/ranger-core/src/main/java/io/appform/ranger/core/model/NodeDataSource.java
@@ -15,6 +15,8 @@
*/
package io.appform.ranger.core.model;
+import io.appform.ranger.core.exceptions.CommunicationException;
+
import java.util.List;
import java.util.Optional;
@@ -24,9 +26,9 @@
@SuppressWarnings("unused")
public interface NodeDataSource> extends NodeDataStoreConnector {
- Optional>> refresh(D deserializer);
+ Optional>> refresh(D deserializer) throws CommunicationException;
default long healthcheckZombieCheckThresholdTime(Service service) {
- return System.currentTimeMillis() - 60000; //1 Minute
+ return isActive() ? (System.currentTimeMillis() - 60000) : 0; //1 Minute
}
}
diff --git a/ranger-core/src/main/java/io/appform/ranger/core/signals/ScheduledSignal.java b/ranger-core/src/main/java/io/appform/ranger/core/signals/ScheduledSignal.java
index 150514d4..a35342fb 100644
--- a/ranger-core/src/main/java/io/appform/ranger/core/signals/ScheduledSignal.java
+++ b/ranger-core/src/main/java/io/appform/ranger/core/signals/ScheduledSignal.java
@@ -36,7 +36,7 @@ public class ScheduledSignal extends Signal {
private final String name;
private final long refreshIntervalMillis;
- private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
+ private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
private ScheduledFuture> scheduledFuture = null;
diff --git a/ranger-core/src/test/java/io/appform/ranger/core/finderhub/ServiceFinderHubTest.java b/ranger-core/src/test/java/io/appform/ranger/core/finderhub/ServiceFinderHubTest.java
index e5e70c74..3778ee5c 100644
--- a/ranger-core/src/test/java/io/appform/ranger/core/finderhub/ServiceFinderHubTest.java
+++ b/ranger-core/src/test/java/io/appform/ranger/core/finderhub/ServiceFinderHubTest.java
@@ -26,8 +26,6 @@
import io.appform.ranger.core.healthcheck.HealthcheckStatus;
import io.appform.ranger.core.model.*;
import io.appform.ranger.core.units.TestNodeData;
-import java.util.Optional;
-
import io.appform.ranger.core.utils.RangerTestUtils;
import lombok.val;
import org.junit.jupiter.api.Assertions;
@@ -36,6 +34,7 @@
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
+import java.util.Optional;
class ServiceFinderHubTest {
@@ -83,7 +82,7 @@ void testDelayedServiceAddition() {
.withServiceName(service.getServiceName())
.withDeserializer(new Deserializer() {})
.withSleepDuration(1)
- .build(), 2_000, 5_000
+ .build(), 5_000, 5_000
);
serviceFinderHub.start();
Assertions.assertTrue(serviceFinderHub.finder(new Service("NS", "SERVICE")).isPresent());
@@ -141,7 +140,7 @@ private static class TestNodeDataSource implements NodeDataSource>> refresh(Deserializer deserializer) {
val list = new ArrayList>();
- list.add(new ServiceNode<>("HOST", 0, TestNodeData.builder().shardId(1).build(), HealthcheckStatus.healthy, 10L, "HTTP"));
+ list.add(new ServiceNode<>("HOST", 0, TestNodeData.builder().shardId(1).build(), HealthcheckStatus.healthy, Long.MAX_VALUE, "HTTP"));
return Optional.of(list);
}
diff --git a/ranger-discovery-bundle/perf/results/io.appform.ranger.discovery.bundle.id.IdGeneratorPerfTest.testGenerate.json b/ranger-discovery-bundle/perf/results/io.appform.ranger.discovery.bundle.id.IdGeneratorPerfTest.testGenerate.json
index 005b0dd9..f0855806 100644
--- a/ranger-discovery-bundle/perf/results/io.appform.ranger.discovery.bundle.id.IdGeneratorPerfTest.testGenerate.json
+++ b/ranger-discovery-bundle/perf/results/io.appform.ranger.discovery.bundle.id.IdGeneratorPerfTest.testGenerate.json
@@ -4,5 +4,5 @@
"iterations" : 4,
"threads" : 1,
"forks" : 3,
- "mean_ops" : 787544.107063881
+ "mean_ops" : 812476.3197574528
}
\ No newline at end of file
diff --git a/ranger-discovery-bundle/perf/results/io.appform.ranger.discovery.bundle.id.IdGeneratorPerfTest.testGenerateBase36.json b/ranger-discovery-bundle/perf/results/io.appform.ranger.discovery.bundle.id.IdGeneratorPerfTest.testGenerateBase36.json
index a9486eef..179cec48 100644
--- a/ranger-discovery-bundle/perf/results/io.appform.ranger.discovery.bundle.id.IdGeneratorPerfTest.testGenerateBase36.json
+++ b/ranger-discovery-bundle/perf/results/io.appform.ranger.discovery.bundle.id.IdGeneratorPerfTest.testGenerateBase36.json
@@ -4,5 +4,5 @@
"iterations" : 4,
"threads" : 1,
"forks" : 3,
- "mean_ops" : 594383.3501367184
+ "mean_ops" : 592802.8071907263
}
\ No newline at end of file
diff --git a/ranger-drove/pom.xml b/ranger-drove/pom.xml
index 10ed2d10..e3ec4394 100644
--- a/ranger-drove/pom.xml
+++ b/ranger-drove/pom.xml
@@ -24,7 +24,7 @@
1.1-RC2
- 1.29
+ 1.30
4.0.0
diff --git a/ranger-drove/src/main/java/io/appform/ranger/drove/common/DroveApiCommunicator.java b/ranger-drove/src/main/java/io/appform/ranger/drove/common/DroveApiCommunicator.java
index 47f34e3d..0bfdeee7 100644
--- a/ranger-drove/src/main/java/io/appform/ranger/drove/common/DroveApiCommunicator.java
+++ b/ranger-drove/src/main/java/io/appform/ranger/drove/common/DroveApiCommunicator.java
@@ -37,6 +37,11 @@
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
@@ -49,6 +54,8 @@ public class DroveApiCommunicator implements DroveCommunicator {
private final DroveUpstreamConfig config;
private final DroveClient droveClient;
private final ObjectMapper mapper;
+ private final AtomicBoolean upstreamAvailable = new AtomicBoolean(true);
+ private final ScheduledExecutorService resetter = Executors.newSingleThreadScheduledExecutor();
public DroveApiCommunicator(
String namespace, DroveUpstreamConfig config,
@@ -58,6 +65,7 @@ public DroveApiCommunicator(
this.config = config;
this.droveClient = droveClient;
this.mapper = mapper;
+ resetter.scheduleWithFixedDelay(() -> upstreamAvailable.set(true), 0, 60, TimeUnit.SECONDS);
}
@Override
@@ -70,6 +78,11 @@ public Optional leader() {
return droveClient.leader();
}
+ @Override
+ public boolean healthy() {
+ return upstreamAvailable.get();
+ }
+
@Override
public List services() {
log.debug("Loading services list");
@@ -77,7 +90,7 @@ public List services() {
config.getSkipTagName(),
DroveUpstreamConfig.DEFAULT_SKIP_TAG_NAME);
val url = "/apis/v1/applications";
- return droveClient.execute(
+ return executeRemoteCall(() -> droveClient.execute(
new DroveClient.Request(DroveClient.Method.GET, url),
new DroveClient.ResponseHandler<>() {
@Override
@@ -111,20 +124,20 @@ public List handle(DroveClient.Response response) throws Exception {
.distinct()
.toList();
}
- });
+ }));
}
@Override
@SuppressWarnings("java:S1168")
public Map> listNodes(Iterable extends Service> services) {
- log.info("Loading nodes list for services: {}", Lists.newArrayList(services));
+ log.debug("Loading nodes list for services: {}", Lists.newArrayList(services));
val url = String.format("/apis/v1/endpoints?%s", Joiner.on("&")
.join(StreamSupport.stream(services.spliterator(), false)
.map(service -> "app=" + service.getServiceName())
.toList()));
logUrl(url);
- return droveClient.execute(new DroveClient.Request(DroveClient.Method.GET, url),
+ return executeRemoteCall(() -> droveClient.execute(new DroveClient.Request(DroveClient.Method.GET, url),
new DroveClient.ResponseHandler<>() {
@Override
public Map> defaultValue() {
@@ -150,7 +163,18 @@ public Map> handle(DroveClient.Response response)
appInfo -> new Service(namespace, appInfo.getAppName()),
Collectors.toList()));
}
- });
+ }));
+ }
+
+ private T executeRemoteCall(Supplier executor) {
+ upstreamAvailable.set(true);
+ try {
+ return executor.get();
+ }
+ catch (DroveCommunicationException e) {
+ upstreamAvailable.set(false);
+ throw e;
+ }
}
private static void throwDroveCommError(DroveClient.Response response) {
diff --git a/ranger-drove/src/main/java/io/appform/ranger/drove/common/DroveCachingCommunicator.java b/ranger-drove/src/main/java/io/appform/ranger/drove/common/DroveCachingCommunicator.java
index 94e08180..892a39ec 100644
--- a/ranger-drove/src/main/java/io/appform/ranger/drove/common/DroveCachingCommunicator.java
+++ b/ranger-drove/src/main/java/io/appform/ranger/drove/common/DroveCachingCommunicator.java
@@ -110,6 +110,11 @@ public Optional leader() {
return root.leader();
}
+ @Override
+ public boolean healthy() {
+ return root.healthy();
+ }
+
@Override
public List services() {
return root.services();
diff --git a/ranger-drove/src/main/java/io/appform/ranger/drove/common/DroveCommunicationException.java b/ranger-drove/src/main/java/io/appform/ranger/drove/common/DroveCommunicationException.java
index 858e309c..4c25d372 100644
--- a/ranger-drove/src/main/java/io/appform/ranger/drove/common/DroveCommunicationException.java
+++ b/ranger-drove/src/main/java/io/appform/ranger/drove/common/DroveCommunicationException.java
@@ -16,10 +16,12 @@
package io.appform.ranger.drove.common;
+import io.appform.ranger.core.exceptions.CommunicationException;
+
/**
- *
+ * Thrown in case there is an issue communicating with the drove upstream.
*/
-public class DroveCommunicationException extends RuntimeException {
+public class DroveCommunicationException extends CommunicationException {
public DroveCommunicationException(final String message) {
super(message);
}
diff --git a/ranger-drove/src/main/java/io/appform/ranger/drove/common/DroveCommunicator.java b/ranger-drove/src/main/java/io/appform/ranger/drove/common/DroveCommunicator.java
index bb830663..bd664798 100644
--- a/ranger-drove/src/main/java/io/appform/ranger/drove/common/DroveCommunicator.java
+++ b/ranger-drove/src/main/java/io/appform/ranger/drove/common/DroveCommunicator.java
@@ -30,6 +30,8 @@
public interface DroveCommunicator extends AutoCloseable {
Optional leader();
+ boolean healthy();
+
List services();
default List listNodes(final Service service) {
diff --git a/ranger-drove/src/main/java/io/appform/ranger/drove/common/DroveNodeDataStoreConnector.java b/ranger-drove/src/main/java/io/appform/ranger/drove/common/DroveNodeDataStoreConnector.java
index 49abbf99..5b8d6c3e 100644
--- a/ranger-drove/src/main/java/io/appform/ranger/drove/common/DroveNodeDataStoreConnector.java
+++ b/ranger-drove/src/main/java/io/appform/ranger/drove/common/DroveNodeDataStoreConnector.java
@@ -62,7 +62,7 @@ public void stop() {
@Override
public boolean isActive() {
- return droveClient.leader().isPresent();
+ return droveClient.healthy();
}
}
diff --git a/ranger-drove/src/main/java/io/appform/ranger/drove/common/DroveOkHttpTransport.java b/ranger-drove/src/main/java/io/appform/ranger/drove/common/DroveOkHttpTransport.java
index a84a8045..928a91a1 100644
--- a/ranger-drove/src/main/java/io/appform/ranger/drove/common/DroveOkHttpTransport.java
+++ b/ranger-drove/src/main/java/io/appform/ranger/drove/common/DroveOkHttpTransport.java
@@ -68,7 +68,7 @@ public T get(
return responseHandler.handle(droveResponse);
}
catch (Exception e) {
- log.error("Error calling drove: " + e.getMessage(), e);
+ log.error("Error calling drove: {}. Error: {}", e.getMessage(), e.getClass().getSimpleName());
throw new DroveCommunicationException(e.getMessage());
}
}
diff --git a/ranger-drove/src/main/java/io/appform/ranger/drove/servicefinder/DroveNodeDataSource.java b/ranger-drove/src/main/java/io/appform/ranger/drove/servicefinder/DroveNodeDataSource.java
index a202565d..dc266d2a 100644
--- a/ranger-drove/src/main/java/io/appform/ranger/drove/servicefinder/DroveNodeDataSource.java
+++ b/ranger-drove/src/main/java/io/appform/ranger/drove/servicefinder/DroveNodeDataSource.java
@@ -20,7 +20,6 @@
import io.appform.ranger.core.model.NodeDataSource;
import io.appform.ranger.core.model.Service;
import io.appform.ranger.core.model.ServiceNode;
-import io.appform.ranger.core.util.FinderUtils;
import io.appform.ranger.drove.common.DroveCommunicationException;
import io.appform.ranger.drove.common.DroveCommunicator;
import io.appform.ranger.drove.common.DroveNodeDataStoreConnector;
@@ -58,20 +57,17 @@ public Optional>> refresh(D deserializer) {
val exposedAppInfos = droveClient.listNodes(service);
val nodes = deserializer.deserialize(
Objects.requireNonNull(exposedAppInfos, "Unexpected empty response from server"));
- return Optional.of(FinderUtils.filterValidNodes(
- service,
- nodes,
- healthcheckZombieCheckThresholdTime(service)));
+ return Optional.of(nodes);
}
catch (DroveCommunicationException e) {
log.error("Drove communication error", e);
return Optional.empty(); //In case of refresh failure, maintain old list
}
-
}
@Override
public boolean isActive() {
- return droveClient.leader().isPresent();
+ return droveClient.healthy();
}
+
}
diff --git a/ranger-http-client/src/main/java/io/appform/ranger/client/http/AbstractRangerHttpHubClient.java b/ranger-http-client/src/main/java/io/appform/ranger/client/http/AbstractRangerHttpHubClient.java
index c2dfc2b4..139363c3 100644
--- a/ranger-http-client/src/main/java/io/appform/ranger/client/http/AbstractRangerHttpHubClient.java
+++ b/ranger-http-client/src/main/java/io/appform/ranger/client/http/AbstractRangerHttpHubClient.java
@@ -23,6 +23,7 @@
import io.appform.ranger.core.model.ServiceRegistry;
import io.appform.ranger.http.config.HttpClientConfig;
import io.appform.ranger.http.serde.HTTPResponseDataDeserializer;
+import io.appform.ranger.http.servicefinder.HttpCommunicator;
import io.appform.ranger.http.servicefinderhub.HttpServiceDataSource;
import io.appform.ranger.http.servicefinderhub.HttpServiceFinderHubBuilder;
import io.appform.ranger.http.utils.RangerHttpUtils;
@@ -30,37 +31,40 @@
import lombok.Getter;
import lombok.experimental.SuperBuilder;
import lombok.extern.slf4j.Slf4j;
-import okhttp3.OkHttpClient;
import java.util.Objects;
@Slf4j
@Getter
@SuperBuilder
-public abstract class AbstractRangerHttpHubClient, D extends HTTPResponseDataDeserializer>
- extends AbstractRangerHubClient {
+public abstract class AbstractRangerHttpHubClient,
+ D extends HTTPResponseDataDeserializer>
+ extends AbstractRangerHubClient {
- private final HttpClientConfig clientConfig;
+ private final HttpClientConfig clientConfig;
- private final OkHttpClient httpClient;
+ private final HttpCommunicator httpClient;
- @Builder.Default
- private final ServiceNodeSelector nodeSelector = new RandomServiceNodeSelector<>();
+ @Builder.Default
+ private final ServiceNodeSelector nodeSelector = new RandomServiceNodeSelector<>();
- @Override
- protected ServiceDataSource getDefaultDataSource() {
- return new HttpServiceDataSource<>(clientConfig, getMapper(), Objects.requireNonNullElseGet(getHttpClient(),
- () -> RangerHttpUtils.httpClient(clientConfig)));
- }
+ @Override
+ protected ServiceDataSource getDefaultDataSource() {
+ return new HttpServiceDataSource<>(clientConfig,
+ Objects.requireNonNullElseGet(getHttpClient(),
+ () -> RangerHttpUtils.httpClient(
+ clientConfig,
+ getMapper())));
+ }
- @Override
- protected ServiceFinderHub buildHub() {
- return new HttpServiceFinderHubBuilder()
- .withServiceDataSource(getServiceDataSource())
- .withServiceFinderFactory(getFinderFactory())
- .withRefreshFrequencyMs(getNodeRefreshTimeMs())
- .withHubRefreshDuration(getHubRefreshDurationMs())
- .withServiceRefreshDuration(getServiceRefreshDurationMs())
- .build();
- }
+ @Override
+ protected ServiceFinderHub buildHub() {
+ return new HttpServiceFinderHubBuilder()
+ .withServiceDataSource(getServiceDataSource())
+ .withServiceFinderFactory(getFinderFactory())
+ .withRefreshFrequencyMs(getNodeRefreshTimeMs())
+ .withHubRefreshDuration(getHubRefreshDurationMs())
+ .withServiceRefreshDuration(getServiceRefreshDurationMs())
+ .build();
+ }
}
diff --git a/ranger-http-client/src/main/java/io/appform/ranger/client/http/SimpleRangerHttpClient.java b/ranger-http-client/src/main/java/io/appform/ranger/client/http/SimpleRangerHttpClient.java
index f71e3d96..a8f307ec 100644
--- a/ranger-http-client/src/main/java/io/appform/ranger/client/http/SimpleRangerHttpClient.java
+++ b/ranger-http-client/src/main/java/io/appform/ranger/client/http/SimpleRangerHttpClient.java
@@ -25,11 +25,11 @@
import io.appform.ranger.http.HttpServiceFinderBuilders;
import io.appform.ranger.http.config.HttpClientConfig;
import io.appform.ranger.http.serde.HTTPResponseDataDeserializer;
+import io.appform.ranger.http.servicefinder.HttpCommunicator;
import lombok.Builder;
import lombok.Getter;
import lombok.experimental.SuperBuilder;
import lombok.extern.slf4j.Slf4j;
-import okhttp3.OkHttpClient;
@Slf4j
@SuperBuilder
@@ -40,7 +40,7 @@ public class SimpleRangerHttpClient extends AbstractRangerClient httpClient;
private final HTTPResponseDataDeserializer deserializer;
@Builder.Default
private final ShardSelector> shardSelector = new ListShardSelector<>();
diff --git a/ranger-http-client/src/test/java/io/appform/ranger/client/http/ShardedRangerHttpClientTest.java b/ranger-http-client/src/test/java/io/appform/ranger/client/http/ShardedRangerHttpClientTest.java
index 90bea638..956c5ff6 100644
--- a/ranger-http-client/src/test/java/io/appform/ranger/client/http/ShardedRangerHttpClientTest.java
+++ b/ranger-http-client/src/test/java/io/appform/ranger/client/http/ShardedRangerHttpClientTest.java
@@ -29,7 +29,7 @@ void testShardedHttpHubClient(){
val httpClientConfig = getHttpClientConfig();
val client = ShardedRangerHttpHubClient.builder()
.clientConfig(httpClientConfig)
- .httpClient(RangerHttpUtils.httpClient(httpClientConfig))
+ .httpClient(RangerHttpUtils.httpClient(httpClientConfig, getObjectMapper()))
.namespace("test-n")
.deserializer(this::read)
.mapper(getObjectMapper())
diff --git a/ranger-http-client/src/test/java/io/appform/ranger/client/http/SimpleRangerHttpClientTest.java b/ranger-http-client/src/test/java/io/appform/ranger/client/http/SimpleRangerHttpClientTest.java
index c09b3193..64f6b3cc 100644
--- a/ranger-http-client/src/test/java/io/appform/ranger/client/http/SimpleRangerHttpClientTest.java
+++ b/ranger-http-client/src/test/java/io/appform/ranger/client/http/SimpleRangerHttpClientTest.java
@@ -30,7 +30,7 @@ void testSimpleHttpRangerClient(){
val client = SimpleRangerHttpClient.builder()
.clientConfig(httpClientConfig)
.mapper(getObjectMapper())
- .httpClient(RangerHttpUtils.httpClient(httpClientConfig))
+ .httpClient(RangerHttpUtils.httpClient(httpClientConfig, getObjectMapper()))
.deserializer(this::read)
.namespace("test-n")
.serviceName("test-s")
diff --git a/ranger-http-client/src/test/java/io/appform/ranger/client/http/UnshardedRangerHttpClientTest.java b/ranger-http-client/src/test/java/io/appform/ranger/client/http/UnshardedRangerHttpClientTest.java
index c9cfd533..b4b64325 100644
--- a/ranger-http-client/src/test/java/io/appform/ranger/client/http/UnshardedRangerHttpClientTest.java
+++ b/ranger-http-client/src/test/java/io/appform/ranger/client/http/UnshardedRangerHttpClientTest.java
@@ -29,7 +29,7 @@ void testUnshardedRangerHubClient(){
val httpClientConfig = getHttpClientConfig();
val client = UnshardedRangerHttpHubClient.builder()
.clientConfig(httpClientConfig)
- .httpClient(RangerHttpUtils.httpClient(httpClientConfig))
+ .httpClient(RangerHttpUtils.httpClient(httpClientConfig, getObjectMapper()))
.namespace("test-n")
.deserializer(this::read)
.mapper(getObjectMapper())
diff --git a/ranger-http/src/main/java/io/appform/ranger/http/common/HttpNodeDataStoreConnector.java b/ranger-http/src/main/java/io/appform/ranger/http/common/HttpNodeDataStoreConnector.java
index aaece10c..210a6e02 100644
--- a/ranger-http/src/main/java/io/appform/ranger/http/common/HttpNodeDataStoreConnector.java
+++ b/ranger-http/src/main/java/io/appform/ranger/http/common/HttpNodeDataStoreConnector.java
@@ -15,11 +15,10 @@
*/
package io.appform.ranger.http.common;
-import com.fasterxml.jackson.databind.ObjectMapper;
import io.appform.ranger.core.model.NodeDataStoreConnector;
import io.appform.ranger.http.config.HttpClientConfig;
+import io.appform.ranger.http.servicefinder.HttpCommunicator;
import lombok.extern.slf4j.Slf4j;
-import okhttp3.OkHttpClient;
/**
*
@@ -28,16 +27,13 @@
public class HttpNodeDataStoreConnector implements NodeDataStoreConnector {
protected final HttpClientConfig config;
- protected final ObjectMapper mapper;
- protected final OkHttpClient httpClient;
+ protected final HttpCommunicator httpCommunicator;
public HttpNodeDataStoreConnector(
final HttpClientConfig config,
- final ObjectMapper mapper,
- final OkHttpClient httpClient) {
- this.httpClient = httpClient;
+ final HttpCommunicator httpCommunicator) {
+ this.httpCommunicator = httpCommunicator;
this.config = config;
- this.mapper = mapper;
}
diff --git a/ranger-http/src/main/java/io/appform/ranger/http/servicefinder/HttpApiCommunicator.java b/ranger-http/src/main/java/io/appform/ranger/http/servicefinder/HttpApiCommunicator.java
new file mode 100644
index 00000000..4b6247e9
--- /dev/null
+++ b/ranger-http/src/main/java/io/appform/ranger/http/servicefinder/HttpApiCommunicator.java
@@ -0,0 +1,204 @@
+/*
+ * Copyright 2024 Authors, Flipkart Internet Pvt. Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.appform.ranger.http.servicefinder;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.appform.ranger.core.model.Service;
+import io.appform.ranger.core.model.ServiceNode;
+import io.appform.ranger.http.config.HttpClientConfig;
+import io.appform.ranger.http.model.ServiceDataSourceResponse;
+import io.appform.ranger.http.serde.HTTPResponseDataDeserializer;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import lombok.val;
+import okhttp3.HttpUrl;
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.Response;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Supplier;
+
+/**
+ * Direct api based communication
+ */
+@Slf4j
+public class HttpApiCommunicator implements HttpCommunicator {
+ private final AtomicBoolean upstreamAvailable = new AtomicBoolean(true);
+ private final ScheduledExecutorService resetter = Executors.newSingleThreadScheduledExecutor();
+
+ @Getter
+ private final OkHttpClient httpClient;
+ private final HttpClientConfig config;
+ private final ObjectMapper mapper;
+
+ public HttpApiCommunicator(OkHttpClient httpClient, HttpClientConfig config, ObjectMapper mapper) {
+ Objects.requireNonNull(mapper, "mapper has not been set for node data");
+ this.httpClient = httpClient;
+ this.config = config;
+ this.mapper = mapper;
+ resetter.scheduleWithFixedDelay(() -> upstreamAvailable.set(true), 0, 60, TimeUnit.SECONDS);
+ }
+
+ @Override
+ public boolean healthy() {
+ return upstreamAvailable.get();
+ }
+
+ @Override
+ public Set services() {
+ return executeRemoteCall(() -> {
+ val httpUrl = new HttpUrl.Builder()
+ .scheme(config.isSecure() ? "https" : "http")
+ .host(config.getHost())
+ .port(config.getPort() == 0 ? defaultPort() : config.getPort())
+ .encodedPath("/ranger/services/v1")
+ .build();
+ val request = new Request.Builder()
+ .url(httpUrl)
+ .get()
+ .build();
+
+ try (val response = httpClient.newCall(request).execute()) {
+ if (response.isSuccessful()) {
+ return parseServices(response, httpUrl);
+ }
+ else {
+ throw new HttpCommunicationException(
+ "Http call to returned a failure response. Url:" + httpUrl + " status: " + response.code());
+ }
+ }
+ catch (Exception e) {
+ throw new HttpCommunicationException(
+ "Error parsing the response from server for url: " + httpUrl
+ + " with exception " + e.getClass().getSimpleName() + ": " + e.getMessage());
+ }
+ });
+ }
+
+ @Override
+ public List> listNodes(
+ Service service,
+ HTTPResponseDataDeserializer deserializer) {
+ return executeRemoteCall(() -> {
+ val url = String.format("/ranger/nodes/v1/%s/%s", service.getNamespace(), service.getServiceName());
+
+ log.debug("Refreshing the node list from url {}", url);
+ val httpUrl = new HttpUrl.Builder()
+ .scheme(config.isSecure() ? "https" : "http")
+ .host(config.getHost())
+ .port(config.getPort() == 0 ? defaultPort() : config.getPort())
+ .encodedPath(url)
+ .build();
+ val request = new Request.Builder()
+ .url(httpUrl)
+ .get()
+ .build();
+
+ try (val response = httpClient.newCall(request).execute()) {
+ if (response.isSuccessful()) {
+ return parseNodeList(deserializer, response, httpUrl);
+ }
+ else {
+ throw new HttpCommunicationException("HTTP call failed. url: " + httpUrl + " status: " + response.code());
+ }
+ }
+ catch (Exception e) {
+ throw new HttpCommunicationException("Error getting node data from the http endpoint: " + httpUrl +
+ ". Error: " + e.getMessage());
+ }
+ });
+ }
+
+ @Override
+ public void close() throws Exception {
+
+ }
+
+ private U executeRemoteCall(Supplier executor) {
+ upstreamAvailable.set(true);
+ try {
+ return executor.get();
+ }
+ catch (HttpCommunicationException e) {
+ upstreamAvailable.set(false);
+ throw e;
+ }
+ }
+
+ private int defaultPort() {
+ return config.isSecure()
+ ? 443
+ : 80;
+ }
+
+ private Set parseServices(Response response, HttpUrl httpUrl) {
+ try (val body = response.body()) {
+ if (null == body) {
+ throw new HttpCommunicationException("Empty response body from: " + httpUrl);
+ }
+ else {
+ val bytes = body.bytes();
+ val serviceDataSourceResponse = mapper.readValue(bytes, ServiceDataSourceResponse.class);
+ if (serviceDataSourceResponse.valid()) {
+ return serviceDataSourceResponse.getData();
+ }
+ else {
+ throw new HttpCommunicationException(
+ "Http call to returned a failure response. Url:" + httpUrl + " data: " + serviceDataSourceResponse);
+ }
+ }
+ }
+ catch (Exception e) {
+ throw new HttpCommunicationException(
+ "Error reading data from server. Url: " + httpUrl + "Error: " + e.getMessage());
+ }
+ }
+
+ private static List> parseNodeList(
+ HTTPResponseDataDeserializer deserializer,
+ Response response,
+ HttpUrl httpUrl) {
+ try (val body = response.body()) {
+ if (null == body) {
+ log.warn("HTTP call to {} returned empty body", httpUrl);
+ throw new HttpCommunicationException("Empty response received for call to " + httpUrl);
+ }
+ else {
+ val bytes = body.bytes();
+ val serviceNodesResponse = deserializer.deserialize(bytes);
+ if (serviceNodesResponse.valid()) {
+ return serviceNodesResponse.getData();
+ }
+ else {
+ throw new HttpCommunicationException(
+ "Http call returned null nodes for url: " + httpUrl + " response: " + serviceNodesResponse);
+ }
+ }
+ }
+ catch (Exception e) {
+ throw new HttpCommunicationException(
+ "Error parsing node data from server. Url: " + httpUrl + "Error: " + e.getMessage());
+ }
+ }
+}
diff --git a/ranger-http/src/main/java/io/appform/ranger/http/servicefinder/HttpCommunicationException.java b/ranger-http/src/main/java/io/appform/ranger/http/servicefinder/HttpCommunicationException.java
new file mode 100644
index 00000000..f8a2586e
--- /dev/null
+++ b/ranger-http/src/main/java/io/appform/ranger/http/servicefinder/HttpCommunicationException.java
@@ -0,0 +1,29 @@
+/*
+ * Copyright 2024 Authors, Flipkart Internet Pvt. Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.appform.ranger.http.servicefinder;
+
+import io.appform.ranger.core.exceptions.CommunicationException;
+
+/**
+ * Thrown in case there is an issue communicating with the HTTP upstream.
+
+ */
+public class HttpCommunicationException extends CommunicationException {
+ public HttpCommunicationException(final String message) {
+ super(message);
+ }
+}
diff --git a/ranger-http/src/main/java/io/appform/ranger/http/servicefinder/HttpCommunicator.java b/ranger-http/src/main/java/io/appform/ranger/http/servicefinder/HttpCommunicator.java
new file mode 100644
index 00000000..22803e68
--- /dev/null
+++ b/ranger-http/src/main/java/io/appform/ranger/http/servicefinder/HttpCommunicator.java
@@ -0,0 +1,39 @@
+/*
+ * Copyright 2024 Authors, Flipkart Internet Pvt. Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.appform.ranger.http.servicefinder;
+
+import io.appform.ranger.core.model.Service;
+import io.appform.ranger.core.model.ServiceNode;
+import io.appform.ranger.http.serde.HTTPResponseDataDeserializer;
+import okhttp3.OkHttpClient;
+
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Interface for communicator to upstream
+ */
+public interface HttpCommunicator extends AutoCloseable {
+ boolean healthy();
+
+ Set services();
+
+ List> listNodes(final Service service,
+ HTTPResponseDataDeserializer deserializer);
+
+ OkHttpClient getHttpClient();
+}
diff --git a/ranger-http/src/main/java/io/appform/ranger/http/servicefinder/HttpNodeDataSource.java b/ranger-http/src/main/java/io/appform/ranger/http/servicefinder/HttpNodeDataSource.java
index 9ee8b155..d88f94a6 100644
--- a/ranger-http/src/main/java/io/appform/ranger/http/servicefinder/HttpNodeDataSource.java
+++ b/ranger-http/src/main/java/io/appform/ranger/http/servicefinder/HttpNodeDataSource.java
@@ -15,24 +15,21 @@
*/
package io.appform.ranger.http.servicefinder;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Preconditions;
import io.appform.ranger.core.model.NodeDataSource;
import io.appform.ranger.core.model.Service;
import io.appform.ranger.core.model.ServiceNode;
-import io.appform.ranger.core.util.FinderUtils;
import io.appform.ranger.http.common.HttpNodeDataStoreConnector;
import io.appform.ranger.http.config.HttpClientConfig;
import io.appform.ranger.http.serde.HTTPResponseDataDeserializer;
import lombok.extern.slf4j.Slf4j;
-import lombok.val;
-import okhttp3.HttpUrl;
-import okhttp3.OkHttpClient;
-import okhttp3.Request;
-import java.io.IOException;
import java.util.List;
+import java.util.Objects;
import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
/**
*
@@ -41,67 +38,27 @@
public class HttpNodeDataSource> extends HttpNodeDataStoreConnector implements NodeDataSource {
private final Service service;
+ private final AtomicBoolean upstreamAvailable = new AtomicBoolean(true);
+ private final ScheduledExecutorService resetter = Executors.newSingleThreadScheduledExecutor();
public HttpNodeDataSource(
final Service service,
final HttpClientConfig config,
- final ObjectMapper mapper,
- final OkHttpClient httpClient) {
- super(config, mapper, httpClient);
+ final HttpCommunicator httpCommunicator) {
+ super(config, httpCommunicator);
+ Objects.requireNonNull(config, "client config has not been set for node data");
+ Objects.requireNonNull(httpCommunicator, "http communicator has not been set for node data");
this.service = service;
+ resetter.scheduleWithFixedDelay(() -> upstreamAvailable.set(true), 0, 60, TimeUnit.SECONDS);
}
@Override
public Optional>> refresh(D deserializer) {
- Preconditions.checkNotNull(config, "client config has not been set for node data");
- Preconditions.checkNotNull(mapper, "mapper has not been set for node data");
- val url = String.format("/ranger/nodes/v1/%s/%s", service.getNamespace(), service.getServiceName());
-
- log.debug("Refreshing the node list from url {}", url);
- val httpUrl = new HttpUrl.Builder()
- .scheme(config.isSecure()
- ? "https"
- : "http")
- .host(config.getHost())
- .port(config.getPort() == 0
- ? defaultPort()
- : config.getPort())
- .encodedPath(url)
- .build();
- val request = new Request.Builder()
- .url(httpUrl)
- .get()
- .build();
-
- try (val response = httpClient.newCall(request).execute()) {
- if (response.isSuccessful()) {
- try (val body = response.body()) {
- if (null == body) {
- log.warn("HTTP call to {} returned empty body", httpUrl);
- } else {
- val bytes = body.bytes();
- val serviceNodesResponse = deserializer.deserialize(bytes);
- if(serviceNodesResponse.valid()){
- return Optional.of(FinderUtils.filterValidNodes(
- service,
- serviceNodesResponse.getData(),
- healthcheckZombieCheckThresholdTime(service)));
- } else{
- log.warn("Http call to {} returned a failure response with response {}", httpUrl, serviceNodesResponse);
- }
- }
- }
- } else {
- log.warn("HTTP call to {} returned: {}", httpUrl, response.code());
- }
- } catch (IOException e) {
- log.error("Error getting service data from the http endPoint: ", e);
- }
- return Optional.empty();
+ return Optional.of(httpCommunicator.listNodes(service, deserializer));
}
@Override
public boolean isActive() {
- return true;
+ return upstreamAvailable.get();
}
}
diff --git a/ranger-http/src/main/java/io/appform/ranger/http/servicefinder/HttpShardedServiceFinderBuilder.java b/ranger-http/src/main/java/io/appform/ranger/http/servicefinder/HttpShardedServiceFinderBuilder.java
index 2b6fe2f3..44d3cf6e 100644
--- a/ranger-http/src/main/java/io/appform/ranger/http/servicefinder/HttpShardedServiceFinderBuilder.java
+++ b/ranger-http/src/main/java/io/appform/ranger/http/servicefinder/HttpShardedServiceFinderBuilder.java
@@ -23,7 +23,6 @@
import io.appform.ranger.http.config.HttpClientConfig;
import io.appform.ranger.http.serde.HTTPResponseDataDeserializer;
import io.appform.ranger.http.utils.RangerHttpUtils;
-import okhttp3.OkHttpClient;
import java.util.Objects;
@@ -34,7 +33,7 @@ public class HttpShardedServiceFinderBuilder extends SimpleShardedServiceFind
private HttpClientConfig clientConfig;
private ObjectMapper mapper;
- private OkHttpClient httpClient;
+ private HttpCommunicator httpCommunicator;
public HttpShardedServiceFinderBuilder withClientConfig(final HttpClientConfig clientConfig) {
this.clientConfig = clientConfig;
@@ -46,8 +45,8 @@ public HttpShardedServiceFinderBuilder withObjectMapper(final ObjectMapper ma
return this;
}
- public HttpShardedServiceFinderBuilder withHttpClient(final OkHttpClient httpClient){
- this.httpClient = httpClient;
+ public HttpShardedServiceFinderBuilder withHttpCommunicator(final HttpCommunicator httpCommunicator){
+ this.httpCommunicator = httpCommunicator;
return this;
}
@@ -58,9 +57,9 @@ public SimpleShardedServiceFinder build() {
@Override
protected NodeDataSource> dataSource(Service service) {
- return new HttpNodeDataSource<>(service, clientConfig, mapper,
- Objects.requireNonNullElseGet(httpClient,
- () -> RangerHttpUtils.httpClient(clientConfig)));
+ return new HttpNodeDataSource<>(service, clientConfig,
+ Objects.requireNonNullElseGet(httpCommunicator,
+ () -> RangerHttpUtils.httpClient(clientConfig, mapper)));
}
}
diff --git a/ranger-http/src/main/java/io/appform/ranger/http/servicefinder/HttpUnshardedServiceFinderBuilider.java b/ranger-http/src/main/java/io/appform/ranger/http/servicefinder/HttpUnshardedServiceFinderBuilider.java
index 5ae1b73e..fae76ea2 100644
--- a/ranger-http/src/main/java/io/appform/ranger/http/servicefinder/HttpUnshardedServiceFinderBuilider.java
+++ b/ranger-http/src/main/java/io/appform/ranger/http/servicefinder/HttpUnshardedServiceFinderBuilider.java
@@ -23,7 +23,6 @@
import io.appform.ranger.http.config.HttpClientConfig;
import io.appform.ranger.http.serde.HTTPResponseDataDeserializer;
import io.appform.ranger.http.utils.RangerHttpUtils;
-import okhttp3.OkHttpClient;
import java.util.Objects;
@@ -32,7 +31,7 @@ public class HttpUnshardedServiceFinderBuilider
private HttpClientConfig clientConfig;
private ObjectMapper mapper;
- private OkHttpClient httpClient;
+ private HttpCommunicator httpClient;
public HttpUnshardedServiceFinderBuilider withClientConfig(final HttpClientConfig clientConfig) {
this.clientConfig = clientConfig;
@@ -44,7 +43,7 @@ public HttpUnshardedServiceFinderBuilider withObjectMapper(final ObjectMapper
return this;
}
- public HttpUnshardedServiceFinderBuilider withHttpClient(final OkHttpClient httpClient) {
+ public HttpUnshardedServiceFinderBuilider withHttpClient(final HttpCommunicator httpClient) {
this.httpClient = httpClient;
return this;
}
@@ -56,9 +55,9 @@ public SimpleUnshardedServiceFinder build() {
@Override
protected NodeDataSource> dataSource(Service service) {
- return new HttpNodeDataSource<>(service, clientConfig, mapper,
+ return new HttpNodeDataSource<>(service, clientConfig,
Objects.requireNonNullElseGet(httpClient,
- () -> RangerHttpUtils.httpClient(clientConfig)));
+ () -> RangerHttpUtils.httpClient(clientConfig, mapper)));
}
}
diff --git a/ranger-http/src/main/java/io/appform/ranger/http/servicefinderhub/HttpServiceDataSource.java b/ranger-http/src/main/java/io/appform/ranger/http/servicefinderhub/HttpServiceDataSource.java
index a48e0c10..a52cc214 100644
--- a/ranger-http/src/main/java/io/appform/ranger/http/servicefinderhub/HttpServiceDataSource.java
+++ b/ranger-http/src/main/java/io/appform/ranger/http/servicefinderhub/HttpServiceDataSource.java
@@ -15,79 +15,26 @@
*/
package io.appform.ranger.http.servicefinderhub;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Preconditions;
import io.appform.ranger.core.finderhub.ServiceDataSource;
import io.appform.ranger.core.model.Service;
import io.appform.ranger.http.common.HttpNodeDataStoreConnector;
import io.appform.ranger.http.config.HttpClientConfig;
-import io.appform.ranger.http.model.ServiceDataSourceResponse;
+import io.appform.ranger.http.servicefinder.HttpCommunicator;
import lombok.extern.slf4j.Slf4j;
-import lombok.val;
-import okhttp3.HttpUrl;
-import okhttp3.OkHttpClient;
-import okhttp3.Request;
-import java.io.IOException;
import java.util.Collection;
-import java.util.Collections;
+import java.util.Objects;
@Slf4j
public class HttpServiceDataSource extends HttpNodeDataStoreConnector implements ServiceDataSource {
- public HttpServiceDataSource(HttpClientConfig config, ObjectMapper mapper, OkHttpClient httpClient) {
- super(config, mapper, httpClient);
+ public HttpServiceDataSource(HttpClientConfig config, HttpCommunicator httpClient) {
+ super(config, httpClient);
}
@Override
public Collection services() {
- Preconditions.checkNotNull(config, "client config has not been set for node data");
- Preconditions.checkNotNull(mapper, "mapper has not been set for node data");
-
- val httpUrl = new HttpUrl.Builder()
- .scheme(config.isSecure()
- ? "https"
- : "http")
- .host(config.getHost())
- .port(config.getPort() == 0
- ? defaultPort()
- : config.getPort())
- .encodedPath("/ranger/services/v1")
- .build();
- val request = new Request.Builder()
- .url(httpUrl)
- .get()
- .build();
-
- try (val response = httpClient.newCall(request).execute()) {
- if (response.isSuccessful()) {
- try (val body = response.body()) {
- if (null == body) {
- log.warn("HTTP call to {} returned empty body", httpUrl);
- }
- else {
- val bytes = body.bytes();
- val serviceDataSourceResponse = mapper.readValue(bytes, ServiceDataSourceResponse.class);
- if (serviceDataSourceResponse.valid()) {
- return serviceDataSourceResponse.getData();
- }
- else {
- log.warn("Http call to {} returned a failure response with data {}",
- httpUrl,
- serviceDataSourceResponse);
- }
- }
- }
- }
- else {
- log.warn("HTTP call to {} returned code: {}", httpUrl, response.code());
- }
- }
- catch (IOException e) {
- log.info("Error parsing the response from server for : {} with exception {}", httpUrl, e);
- }
-
- log.error("No data returned from server: " + httpUrl);
- return Collections.emptySet();
+ Objects.requireNonNull(config, "client config has not been set for node data");
+ return httpCommunicator.services();
}
}
diff --git a/ranger-http/src/main/java/io/appform/ranger/http/servicefinderhub/HttpShardedServiceFinderFactory.java b/ranger-http/src/main/java/io/appform/ranger/http/servicefinderhub/HttpShardedServiceFinderFactory.java
index 557c43c7..b1b1f508 100644
--- a/ranger-http/src/main/java/io/appform/ranger/http/servicefinderhub/HttpShardedServiceFinderFactory.java
+++ b/ranger-http/src/main/java/io/appform/ranger/http/servicefinderhub/HttpShardedServiceFinderFactory.java
@@ -24,15 +24,15 @@
import io.appform.ranger.core.model.ShardSelector;
import io.appform.ranger.http.config.HttpClientConfig;
import io.appform.ranger.http.serde.HTTPResponseDataDeserializer;
+import io.appform.ranger.http.servicefinder.HttpCommunicator;
import io.appform.ranger.http.servicefinder.HttpShardedServiceFinderBuilder;
import lombok.Builder;
import lombok.val;
-import okhttp3.OkHttpClient;
public class HttpShardedServiceFinderFactory implements ServiceFinderFactory> {
private final HttpClientConfig clientConfig;
- private final OkHttpClient httpClient;
+ private final HttpCommunicator httpClient;
private final ObjectMapper mapper;
private final HTTPResponseDataDeserializer deserializer;
private final ShardSelector> shardSelector;
@@ -41,7 +41,8 @@ public class HttpShardedServiceFinderFactory implements ServiceFinderFactory
@Builder
public HttpShardedServiceFinderFactory(
- HttpClientConfig httpClientConfig, OkHttpClient httpClient,
+ HttpClientConfig httpClientConfig,
+ HttpCommunicator httpClient,
ObjectMapper mapper,
HTTPResponseDataDeserializer deserializer,
ShardSelector> shardSelector,
@@ -61,8 +62,8 @@ public HttpShardedServiceFinderFactory(
public ServiceFinder> buildFinder(Service service) {
val serviceFinder = new HttpShardedServiceFinderBuilder()
.withClientConfig(clientConfig)
- .withHttpClient(httpClient)
.withObjectMapper(mapper)
+ .withHttpCommunicator(httpClient)
.withDeserializer(deserializer)
.withNamespace(service.getNamespace())
.withServiceName(service.getServiceName())
diff --git a/ranger-http/src/main/java/io/appform/ranger/http/servicefinderhub/HttpUnshardedServiceFinderFactory.java b/ranger-http/src/main/java/io/appform/ranger/http/servicefinderhub/HttpUnshardedServiceFinderFactory.java
index 321e0757..51c8cb02 100644
--- a/ranger-http/src/main/java/io/appform/ranger/http/servicefinderhub/HttpUnshardedServiceFinderFactory.java
+++ b/ranger-http/src/main/java/io/appform/ranger/http/servicefinderhub/HttpUnshardedServiceFinderFactory.java
@@ -24,16 +24,16 @@
import io.appform.ranger.core.model.ShardSelector;
import io.appform.ranger.http.config.HttpClientConfig;
import io.appform.ranger.http.serde.HTTPResponseDataDeserializer;
+import io.appform.ranger.http.servicefinder.HttpCommunicator;
import io.appform.ranger.http.servicefinder.HttpUnshardedServiceFinderBuilider;
import lombok.Builder;
import lombok.val;
-import okhttp3.OkHttpClient;
public class HttpUnshardedServiceFinderFactory implements ServiceFinderFactory> {
private final HttpClientConfig clientConfig;
private final ObjectMapper mapper;
- private final OkHttpClient httpClient;
+ private final HttpCommunicator httpClient;
private final HTTPResponseDataDeserializer deserializer;
private final ShardSelector> shardSelector;
private final ServiceNodeSelector nodeSelector;
@@ -42,12 +42,12 @@ public class HttpUnshardedServiceFinderFactory implements ServiceFinderFactor
@Builder
public HttpUnshardedServiceFinderFactory(
HttpClientConfig httpClientConfig,
- ObjectMapper mapper, OkHttpClient httpClient,
+ ObjectMapper mapper,
+ HttpCommunicator httpClient,
HTTPResponseDataDeserializer deserializer,
ShardSelector> shardSelector,
ServiceNodeSelector nodeSelector,
- int nodeRefreshIntervalMs)
- {
+ int nodeRefreshIntervalMs) {
this.clientConfig = httpClientConfig;
this.mapper = mapper;
this.httpClient = httpClient;
diff --git a/ranger-http/src/main/java/io/appform/ranger/http/serviceprovider/HttpNodeDataSink.java b/ranger-http/src/main/java/io/appform/ranger/http/serviceprovider/HttpNodeDataSink.java
index 176fbb26..627d8676 100644
--- a/ranger-http/src/main/java/io/appform/ranger/http/serviceprovider/HttpNodeDataSink.java
+++ b/ranger-http/src/main/java/io/appform/ranger/http/serviceprovider/HttpNodeDataSink.java
@@ -26,10 +26,10 @@
import io.appform.ranger.http.config.HttpClientConfig;
import io.appform.ranger.http.model.ServiceRegistrationResponse;
import io.appform.ranger.http.serde.HttpRequestDataSerializer;
+import io.appform.ranger.http.servicefinder.HttpCommunicator;
import lombok.extern.slf4j.Slf4j;
import lombok.val;
import okhttp3.HttpUrl;
-import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.RequestBody;
@@ -40,10 +40,12 @@
public class HttpNodeDataSink> extends HttpNodeDataStoreConnector implements NodeDataSink {
private final Service service;
+ private final ObjectMapper mapper;
- public HttpNodeDataSink(Service service, HttpClientConfig config, ObjectMapper mapper, OkHttpClient httpClient) {
- super(config, mapper, httpClient);
+ public HttpNodeDataSink(Service service, HttpClientConfig config, ObjectMapper mapper, HttpCommunicator httpClient) {
+ super(config, httpClient);
this.service = service;
+ this.mapper = mapper;
}
@Override
@@ -77,7 +79,7 @@ private Optional> registerService(HttpUrl httpUrl
.url(httpUrl)
.post(requestBody)
.build();
- try (val response = httpClient.newCall(request).execute()) {
+ try (val response = httpCommunicator.getHttpClient().newCall(request).execute()) {
if (response.isSuccessful()) {
try (val body = response.body()) {
if (null == body) {
diff --git a/ranger-http/src/main/java/io/appform/ranger/http/serviceprovider/HttpShardedServiceProviderBuilder.java b/ranger-http/src/main/java/io/appform/ranger/http/serviceprovider/HttpShardedServiceProviderBuilder.java
index 393a2934..4caec43d 100644
--- a/ranger-http/src/main/java/io/appform/ranger/http/serviceprovider/HttpShardedServiceProviderBuilder.java
+++ b/ranger-http/src/main/java/io/appform/ranger/http/serviceprovider/HttpShardedServiceProviderBuilder.java
@@ -22,9 +22,9 @@
import io.appform.ranger.core.serviceprovider.ServiceProvider;
import io.appform.ranger.http.config.HttpClientConfig;
import io.appform.ranger.http.serde.HttpRequestDataSerializer;
+import io.appform.ranger.http.servicefinder.HttpCommunicator;
import io.appform.ranger.http.utils.RangerHttpUtils;
import lombok.extern.slf4j.Slf4j;
-import okhttp3.OkHttpClient;
import java.util.Objects;
@@ -33,7 +33,7 @@ public class HttpShardedServiceProviderBuilder extends BaseServiceProviderBui
private HttpClientConfig clientConfig;
private ObjectMapper mapper;
- private OkHttpClient httpClient;
+ private HttpCommunicator httpClient;
public HttpShardedServiceProviderBuilder withClientConfiguration(final HttpClientConfig clientConfig) {
this.clientConfig = clientConfig;
@@ -45,7 +45,7 @@ public HttpShardedServiceProviderBuilder withObjectMapper(final ObjectMapper
return this;
}
- public HttpShardedServiceProviderBuilder withHttpClient(final OkHttpClient httpClient) {
+ public HttpShardedServiceProviderBuilder withHttpClient(final HttpCommunicator httpClient) {
this.httpClient = httpClient;
return this;
}
@@ -59,6 +59,6 @@ public ServiceProvider> build() {
protected NodeDataSink> dataSink(Service service) {
return new HttpNodeDataSink<>(service, clientConfig, mapper,
Objects.requireNonNullElseGet(httpClient,
- () -> RangerHttpUtils.httpClient(clientConfig)));
+ () -> RangerHttpUtils.httpClient(clientConfig, mapper)));
}
}
diff --git a/ranger-http/src/main/java/io/appform/ranger/http/utils/RangerHttpUtils.java b/ranger-http/src/main/java/io/appform/ranger/http/utils/RangerHttpUtils.java
index de7c85d2..c3aaf105 100644
--- a/ranger-http/src/main/java/io/appform/ranger/http/utils/RangerHttpUtils.java
+++ b/ranger-http/src/main/java/io/appform/ranger/http/utils/RangerHttpUtils.java
@@ -16,7 +16,10 @@
package io.appform.ranger.http.utils;
+import com.fasterxml.jackson.databind.ObjectMapper;
import io.appform.ranger.http.config.HttpClientConfig;
+import io.appform.ranger.http.servicefinder.HttpApiCommunicator;
+import io.appform.ranger.http.servicefinder.HttpCommunicator;
import lombok.experimental.UtilityClass;
import lombok.extern.slf4j.Slf4j;
import okhttp3.ConnectionPool;
@@ -30,17 +33,22 @@
@UtilityClass
@Slf4j
public class RangerHttpUtils {
- public static OkHttpClient httpClient(final HttpClientConfig config) {
+ public static HttpCommunicator httpClient(
+ final HttpClientConfig config,
+ final ObjectMapper mapper) {
log.info("Creating http client for {}:{}", config.getHost(), config.getPort());
- return new OkHttpClient.Builder()
- .callTimeout(config.getOperationTimeoutMs() == 0
- ? 3000
- : config.getOperationTimeoutMs(), TimeUnit.MILLISECONDS)
- .connectTimeout(config.getConnectionTimeoutMs() == 0
- ? 3000
- : config.getConnectionTimeoutMs(), TimeUnit.MILLISECONDS)
- .followRedirects(true)
- .connectionPool(new ConnectionPool(1, 30, TimeUnit.SECONDS))
- .build();
+ return new HttpApiCommunicator<>(
+ new OkHttpClient.Builder()
+ .callTimeout(config.getOperationTimeoutMs() == 0
+ ? 3000
+ : config.getOperationTimeoutMs(), TimeUnit.MILLISECONDS)
+ .connectTimeout(config.getConnectionTimeoutMs() == 0
+ ? 3000
+ : config.getConnectionTimeoutMs(), TimeUnit.MILLISECONDS)
+ .followRedirects(true)
+ .connectionPool(new ConnectionPool(1, 30, TimeUnit.SECONDS))
+ .build(),
+ config,
+ mapper);
}
}
diff --git a/ranger-http/src/test/java/io/appform/ranger/http/common/HttpNodeDataStoreConnectorTest.java b/ranger-http/src/test/java/io/appform/ranger/http/common/HttpNodeDataStoreConnectorTest.java
index 58677990..41a67b0c 100644
--- a/ranger-http/src/test/java/io/appform/ranger/http/common/HttpNodeDataStoreConnectorTest.java
+++ b/ranger-http/src/test/java/io/appform/ranger/http/common/HttpNodeDataStoreConnectorTest.java
@@ -31,8 +31,8 @@ void testHttpNodeDataStoreConnector(){
.host("localhost-1")
.port(80)
.build();
- val httpNodeDataStoreConnector = new HttpNodeDataStoreConnector<>(httpClientConfig, objectMapper,
- RangerHttpUtils.httpClient(httpClientConfig));
+ val httpNodeDataStoreConnector = new HttpNodeDataStoreConnector<>(httpClientConfig,
+ RangerHttpUtils.httpClient(httpClientConfig, objectMapper));
Assertions.assertNotNull(httpNodeDataStoreConnector);
Assertions.assertTrue(httpNodeDataStoreConnector.isActive());
}
diff --git a/ranger-http/src/test/java/io/appform/ranger/http/servicefinderhub/HttpServiceDataSourceTest.java b/ranger-http/src/test/java/io/appform/ranger/http/servicefinderhub/HttpServiceDataSourceTest.java
index 909ed69d..7662a167 100644
--- a/ranger-http/src/test/java/io/appform/ranger/http/servicefinderhub/HttpServiceDataSourceTest.java
+++ b/ranger-http/src/test/java/io/appform/ranger/http/servicefinderhub/HttpServiceDataSourceTest.java
@@ -55,7 +55,7 @@ void testServiceDataSource(WireMockRuntimeInfo wireMockRuntimeInfo) throws IOExc
.connectionTimeoutMs(30_000)
.operationTimeoutMs(30_000)
.build();
- val httpServiceDataSource = new HttpServiceDataSource<>(clientConfig, MAPPER, RangerHttpUtils.httpClient(clientConfig));
+ val httpServiceDataSource = new HttpServiceDataSource<>(clientConfig, RangerHttpUtils.httpClient(clientConfig, MAPPER));
val services = httpServiceDataSource.services();
Assertions.assertNotNull(services);
Assertions.assertFalse(services.isEmpty());
diff --git a/ranger-hub-server-bundle/pom.xml b/ranger-hub-server-bundle/pom.xml
index 0bd5b103..c6e94acc 100644
--- a/ranger-hub-server-bundle/pom.xml
+++ b/ranger-hub-server-bundle/pom.xml
@@ -39,10 +39,6 @@
ranger-hub-server-bundle
-
- 2.0.23
-
-
io.appform.ranger
diff --git a/ranger-hub-server-bundle/src/main/java/io/appform/ranger/hub/server/bundle/RangerHubServerBundle.java b/ranger-hub-server-bundle/src/main/java/io/appform/ranger/hub/server/bundle/RangerHubServerBundle.java
index 162140a5..b6013c1d 100644
--- a/ranger-hub-server-bundle/src/main/java/io/appform/ranger/hub/server/bundle/RangerHubServerBundle.java
+++ b/ranger-hub-server-bundle/src/main/java/io/appform/ranger/hub/server/bundle/RangerHubServerBundle.java
@@ -107,6 +107,7 @@ private RangerHubClient> addCurat
.disablePushUpdaters(zkConfiguration.isDisablePushUpdaters())
.mapper(getMapper())
.serviceRefreshDurationMs(zkConfiguration.getServiceRefreshDurationMs())
+ .hubRefreshDurationMs(zkConfiguration.getServiceRefreshDurationMs())
.nodeRefreshTimeMs(zkConfiguration.getNodeRefreshTimeMs())
.deserializer(data -> {
try {
@@ -127,8 +128,9 @@ private RangerHubClient> getHttpH
.namespace(namespace)
.mapper(getMapper())
.clientConfig(httpClientConfig)
- .httpClient(RangerHttpUtils.httpClient(httpClientConfig))
+ .httpClient(RangerHttpUtils.httpClient(httpClientConfig, getMapper()))
.serviceRefreshDurationMs(httpConfiguration.getServiceRefreshDurationMs())
+ .hubRefreshDurationMs(httpConfiguration.getServiceRefreshDurationMs())
.nodeRefreshTimeMs(httpConfiguration.getNodeRefreshTimeMs())
.deserializer(data -> {
try {
@@ -155,6 +157,7 @@ private RangerHubClient> getDrove
.clientConfig(droveConfig)
.droveCommunicator(droveCommunicator)
.serviceRefreshDurationMs(droveUpstreamConfiguration.getServiceRefreshDurationMs())
+ .hubRefreshDurationMs(droveUpstreamConfiguration.getServiceRefreshDurationMs())
.nodeRefreshTimeMs(droveUpstreamConfiguration.getNodeRefreshTimeMs())
.deserializer(new DroveResponseDataDeserializer<>() {
@Override
diff --git a/ranger-hub-server-bundle/src/main/java/io/appform/ranger/hub/server/bundle/healthcheck/RangerHealthCheck.java b/ranger-hub-server-bundle/src/main/java/io/appform/ranger/hub/server/bundle/healthcheck/RangerHealthCheck.java
index 3f4b52d0..69d4ce95 100644
--- a/ranger-hub-server-bundle/src/main/java/io/appform/ranger/hub/server/bundle/healthcheck/RangerHealthCheck.java
+++ b/ranger-hub-server-bundle/src/main/java/io/appform/ranger/hub/server/bundle/healthcheck/RangerHealthCheck.java
@@ -16,11 +16,9 @@
package io.appform.ranger.hub.server.bundle.healthcheck;
import com.codahale.metrics.health.HealthCheck;
-import javax.inject.Singleton;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.framework.CuratorFramework;
-@Singleton
@Slf4j
public class RangerHealthCheck extends HealthCheck {
diff --git a/ranger-hub-server-bundle/src/main/java/io/appform/ranger/hub/server/bundle/lifecycle/CuratorLifecycle.java b/ranger-hub-server-bundle/src/main/java/io/appform/ranger/hub/server/bundle/lifecycle/CuratorLifecycle.java
index 820d2727..cebf856b 100644
--- a/ranger-hub-server-bundle/src/main/java/io/appform/ranger/hub/server/bundle/lifecycle/CuratorLifecycle.java
+++ b/ranger-hub-server-bundle/src/main/java/io/appform/ranger/hub/server/bundle/lifecycle/CuratorLifecycle.java
@@ -17,13 +17,12 @@
import io.appform.ranger.common.server.ShardInfo;
import io.appform.ranger.core.signals.Signal;
-import java.util.Collections;
-import javax.inject.Singleton;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.framework.CuratorFramework;
+import java.util.Collections;
+
@Slf4j
-@Singleton
public class CuratorLifecycle extends Signal {
private final CuratorFramework curatorFramework;
diff --git a/ranger-server/pom.xml b/ranger-server/pom.xml
index 0064ed39..7c8afa44 100644
--- a/ranger-server/pom.xml
+++ b/ranger-server/pom.xml
@@ -27,6 +27,11 @@
ranger-server
+
+ 2.0.1
+ 5.10.2
+
+
io.appform.ranger
@@ -37,6 +42,16 @@
io.dropwizard
dropwizard-core
+
+ jakarta.inject
+ jakarta.inject-api
+ ${jakarta.inject-api.version}
+
+
+ ru.vyarus
+ dropwizard-guicey
+ ${dw-guicey.version}
+
diff --git a/ranger-zookeeper/src/main/java/io/appform/ranger/zookeeper/servicefinder/ZkNodeDataSource.java b/ranger-zookeeper/src/main/java/io/appform/ranger/zookeeper/servicefinder/ZkNodeDataSource.java
index 7d4249c7..f2fea6fa 100644
--- a/ranger-zookeeper/src/main/java/io/appform/ranger/zookeeper/servicefinder/ZkNodeDataSource.java
+++ b/ranger-zookeeper/src/main/java/io/appform/ranger/zookeeper/servicefinder/ZkNodeDataSource.java
@@ -20,7 +20,6 @@
import io.appform.ranger.core.model.NodeDataSource;
import io.appform.ranger.core.model.Service;
import io.appform.ranger.core.model.ServiceNode;
-import io.appform.ranger.core.util.FinderUtils;
import io.appform.ranger.zookeeper.common.ZkNodeDataStoreConnector;
import io.appform.ranger.zookeeper.common.ZkStoreType;
import io.appform.ranger.zookeeper.serde.ZkNodeDataDeserializer;
@@ -29,11 +28,11 @@
import lombok.val;
import org.apache.curator.framework.CuratorFramework;
import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.NoNodeException;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
-import org.apache.zookeeper.KeeperException.NoNodeException;
/**
*
@@ -65,7 +64,6 @@ private Optional>> checkForUpdateOnZookeeper(D deserializer)
}
Preconditions.checkNotNull(deserializer, "Deserializer has not been set for node data");
try {
- val healthcheckZombieCheckThresholdTime = healthcheckZombieCheckThresholdTime(service); //1 Minute
val serviceName = service.getServiceName();
if (!isActive()) {
log.warn("ZK connection is not active. Ignoring refresh request for service: {}",
@@ -79,13 +77,11 @@ private Optional>> checkForUpdateOnZookeeper(D deserializer)
log.debug("Found {} nodes for [{}]", children.size(), serviceName);
for (val child : children) {
byte[] data = readChild(parentPath, child).orElse(null);
- if (data == null || data.length <= 0) {
+ if (data == null || data.length == 0) {
continue;
}
val node = deserializer.deserialize(data);
- if(FinderUtils.isValidNode(service, healthcheckZombieCheckThresholdTime, node)) {
- nodes.add(node);
- }
+ nodes.add(node);
}
return Optional.of(nodes);
}