Skip to content

Commit

Permalink
Merge pull request #40 from koushikr/main
Browse files Browse the repository at this point in the history
Boundary for ServiceAddition and ServiceFinderHubRefresh
  • Loading branch information
santanusinha authored Aug 20, 2024
2 parents 93c978e + 762561e commit 493f0b4
Show file tree
Hide file tree
Showing 23 changed files with 147 additions and 56 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
# Changelog
All notable changes to this project will be documented in this file.

## [1.1-RC2]

- Pertaining to PR : https://github.com/appform-io/ranger/pull/22/, building of a ServiceFinderHub and a ServiceFinder are bounded.

## [1.0-RC18]
- Version bump to release lexicographically higher version than 1.0-dw3-RC17

Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<groupId>io.appform.ranger</groupId>
<artifactId>ranger</artifactId>
<packaging>pom</packaging>
<version>1.1-RC1</version>
<version>1.1-RC2</version>
<name>Ranger</name>
<url>https://github.com/appform-io/ranger</url>
<description>Service Discovery for Java</description>
Expand Down
2 changes: 1 addition & 1 deletion ranger-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>ranger</artifactId>
<groupId>io.appform.ranger</groupId>
<version>1.1-RC1</version>
<version>1.1-RC2</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,8 @@
import io.appform.ranger.core.finderhub.ServiceDataSource;
import io.appform.ranger.core.finderhub.ServiceFinderFactory;
import io.appform.ranger.core.finderhub.ServiceFinderHub;
import io.appform.ranger.core.model.Deserializer;
import io.appform.ranger.core.model.Service;
import io.appform.ranger.core.model.ServiceNode;
import io.appform.ranger.core.model.ServiceNodeSelector;
import io.appform.ranger.core.model.ServiceRegistry;
import io.appform.ranger.core.model.ShardSelector;
import io.appform.ranger.core.model.*;

import java.util.Collection;
import java.util.Collections;
import java.util.List;
Expand All @@ -40,7 +36,7 @@
@Slf4j
@Getter
@SuperBuilder
public abstract class AbstractRangerHubClient<T, R extends ServiceRegistry<T>, D extends Deserializer<T>> implements RangerHubClient<T, R> {
public abstract class AbstractRangerHubClient<T, R extends ServiceRegistry<T>, D extends Deserializer<T>> implements RangerHubClient<T,R> {

private final String namespace;
private final ObjectMapper mapper;
Expand All @@ -50,19 +46,36 @@ public abstract class AbstractRangerHubClient<T, R extends ServiceRegistry<T>, D
private int nodeRefreshTimeMs;
private ServiceFinderHub<T, R> hub;
private ServiceDataSource serviceDataSource;
private long serviceRefreshDurationMs;
private long hubRefreshDurationMs;

@Override
public void start() {
Preconditions.checkNotNull(mapper, "Mapper can't be null");
Preconditions.checkNotNull(namespace, "namespace can't be null");
Preconditions.checkNotNull(deserializer, "deserializer can't be null");

if (this.nodeRefreshTimeMs < RangerClientConstants.MINIMUM_REFRESH_TIME) {
if (this.nodeRefreshTimeMs < HubConstants.MINIMUM_REFRESH_TIME_MS) {
log.warn("Node info update interval too low: {} ms. Has been upgraded to {} ms ",
this.nodeRefreshTimeMs,
RangerClientConstants.MINIMUM_REFRESH_TIME);
HubConstants.MINIMUM_REFRESH_TIME_MS);
}
this.nodeRefreshTimeMs = Math.max(HubConstants.MINIMUM_REFRESH_TIME_MS, this.nodeRefreshTimeMs);

if (this.serviceRefreshDurationMs <= 0) {
log.warn("Service Refresh interval too low: {} ms. Has been upgraded to {} ms ",
this.serviceRefreshDurationMs,
HubConstants.SERVICE_REFRESH_DURATION_MS);
this.serviceRefreshDurationMs = HubConstants.SERVICE_REFRESH_DURATION_MS;
}
this.nodeRefreshTimeMs = Math.max(RangerClientConstants.MINIMUM_REFRESH_TIME, this.nodeRefreshTimeMs);

if (this.hubRefreshDurationMs <= 0) {
log.warn("Hub Refresh interval too low: {} ms. Has been upgraded to {} ms ",
this.hubRefreshDurationMs,
HubConstants.HUB_REFRESH_DURATION_MS);
this.hubRefreshDurationMs = HubConstants.HUB_REFRESH_DURATION_MS;
}

if(null == this.serviceDataSource){
this.serviceDataSource = getDefaultDataSource();
}
Expand Down
2 changes: 1 addition & 1 deletion ranger-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>ranger</artifactId>
<groupId>io.appform.ranger</groupId>
<version>1.1-RC1</version>
<version>1.1-RC2</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@
package io.appform.ranger.core.finderhub;

import com.github.rholder.retry.RetryerBuilder;
import com.github.rholder.retry.StopStrategies;
import com.google.common.base.Stopwatch;
import com.google.common.base.Supplier;
import io.appform.ranger.core.finder.ServiceFinder;
import io.appform.ranger.core.model.HubConstants;
import io.appform.ranger.core.model.Service;
import io.appform.ranger.core.model.ServiceRegistry;
import io.appform.ranger.core.signals.ExternalTriggeredSignal;
Expand Down Expand Up @@ -66,11 +69,26 @@ public class ServiceFinderHub<T, R extends ServiceRegistry<T>> {
private final AtomicBoolean alreadyUpdating = new AtomicBoolean(false);
private Future<?> monitorFuture = null;

private final long serviceRefreshDurationMs;
private final long hubRefreshDurationMs;

public ServiceFinderHub(
ServiceDataSource serviceDataSource,
ServiceFinderFactory<T, R> finderFactory
) {
this(serviceDataSource, finderFactory,
HubConstants.SERVICE_REFRESH_DURATION_MS, HubConstants.HUB_REFRESH_DURATION_MS);
}

public ServiceFinderHub(
ServiceDataSource serviceDataSource,
ServiceFinderFactory<T, R> finderFactory) {
ServiceFinderFactory<T, R> finderFactory,
long serviceRefreshDurationMs,
long hubRefreshDurationMs) {
this.serviceDataSource = serviceDataSource;
this.finderFactory = finderFactory;
this.serviceRefreshDurationMs = serviceRefreshDurationMs;
this.hubRefreshDurationMs = hubRefreshDurationMs;
this.refreshSignals.add(new ScheduledSignal<>("service-hub-updater",
() -> null,
Collections.emptyList(),
Expand Down Expand Up @@ -190,13 +208,31 @@ private void updateRegistry() {
}

private void waitTillHubIsReady() {
serviceDataSource.services().forEach(this::waitTillServiceIsReady);
val hubRefresher = CompletableFuture.allOf(
serviceDataSource.services()
.stream()
.map(service -> CompletableFuture.supplyAsync((Supplier<Void>) () -> {
waitTillServiceIsReady(service);
return null;
})).toArray(CompletableFuture[]::new)
);
try {
hubRefresher.get(hubRefreshDurationMs, TimeUnit.MILLISECONDS);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
Exceptions
.illegalState("Couldn't perform hub refresh at this time. Refresh exceeded the start up time specified");
} catch (Exception e) {
Exceptions
.illegalState("Couldn't perform hub refresh at this time", e);
}
}

private void waitTillServiceIsReady(Service service) {
try {
RetryerBuilder.<Boolean>newBuilder()
.retryIfResult(r -> !r)
.withStopStrategy(StopStrategies.stopAfterDelay(serviceRefreshDurationMs, TimeUnit.MILLISECONDS))
.build()
.call(() -> Optional.ofNullable(getFinders().get().get(service))
.map(ServiceFinder::getServiceRegistry)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package io.appform.ranger.core.finderhub;

import com.google.common.base.Preconditions;
import io.appform.ranger.core.model.HubConstants;
import io.appform.ranger.core.model.ServiceRegistry;
import io.appform.ranger.core.signals.ScheduledSignal;
import io.appform.ranger.core.signals.Signal;
Expand All @@ -33,10 +34,12 @@
public abstract class ServiceFinderHubBuilder<T, R extends ServiceRegistry<T>> {
private ServiceDataSource serviceDataSource;
private ServiceFinderFactory<T, R> serviceFinderFactory;
private long refreshFrequencyMs = 10_000;
private long refreshFrequencyMs = HubConstants.REFRESH_FREQUENCY_MS;
private final List<Consumer<Void>> extraStartSignalConsumers = new ArrayList<>();
private final List<Consumer<Void>> extraStopSignalConsumers = new ArrayList<>();
private final List<Signal<Void>> extraRefreshSignals = new ArrayList<>();
private long serviceRefreshDurationMs = HubConstants.SERVICE_REFRESH_DURATION_MS;
private long hubRefreshDurationMs = HubConstants.HUB_REFRESH_DURATION_MS;

public ServiceFinderHubBuilder<T, R> withServiceDataSource(ServiceDataSource serviceDataSource) {
this.serviceDataSource = serviceDataSource;
Expand Down Expand Up @@ -68,12 +71,23 @@ public ServiceFinderHubBuilder<T, R> withExtraRefreshSignal(Signal<Void> extraRe
return this;
}

public ServiceFinderHubBuilder<T, R> withServiceRefreshDuration(long serviceRefreshDurationMs) {
this.serviceRefreshDurationMs = serviceRefreshDurationMs;
return this;
}

public ServiceFinderHubBuilder<T, R> withHubRefreshDuration(long hubRefreshDurationMs) {
this.hubRefreshDurationMs = hubRefreshDurationMs;
return this;
}

public ServiceFinderHub<T, R> build() {
preBuild();
Preconditions.checkNotNull(serviceDataSource, "Provide a non-null service data source");
Preconditions.checkNotNull(serviceFinderFactory, "Provide a non-null service finder factory");

val hub = new ServiceFinderHub<>(serviceDataSource, serviceFinderFactory);
val hub = new ServiceFinderHub<>(serviceDataSource, serviceFinderFactory,
serviceRefreshDurationMs, hubRefreshDurationMs);
final ScheduledSignal<Void> refreshSignal = new ScheduledSignal<>("service-hub-refresh-timer",
() -> null,
Collections.emptyList(),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,24 +1,27 @@
/*
* Copyright 2015 Flipkart Internet Pvt. Ltd.
*
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
*
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.appform.ranger.client;
package io.appform.ranger.core.model;

import lombok.experimental.UtilityClass;

@UtilityClass
public class RangerClientConstants {
public static final int CONNECTION_RETRY_TIME = 5000;
public static final int MINIMUM_REFRESH_TIME = 5000;
public class HubConstants {
public static long SERVICE_REFRESH_DURATION_MS = 10_000;
public static long HUB_REFRESH_DURATION_MS = 30_000;
public static long REFRESH_FREQUENCY_MS = 10_000;
public static final int CONNECTION_RETRY_TIME_MS = 5_000;
public static final int MINIMUM_REFRESH_TIME_MS = 5_000;
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import io.appform.ranger.core.model.*;
import io.appform.ranger.core.units.TestNodeData;
import java.util.Optional;

import io.appform.ranger.core.utils.RangerTestUtils;
import lombok.val;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -41,30 +43,39 @@ void testDynamicServiceAddition() {
Assertions.assertEquals("HOST", node.get().getHost());
Assertions.assertEquals(0, node.get().getPort());

serviceFinderHub.buildFinder(new Service("NS", "SERVICE")).join();
val dynamicServiceFinder = serviceFinderHub.finder(new Service("NS", "SERVICE"))
.orElseThrow(() -> new IllegalStateException("Finder should be present"));
val dynamicServiceFinder = serviceFinderHub.buildFinder(new Service("NS", "SERVICE")).join();
val dynamicServiceNode = dynamicServiceFinder.get(null, (criteria, serviceRegistry) -> serviceRegistry.nodeList());
Assertions.assertTrue(dynamicServiceNode.isPresent());
Assertions.assertEquals("HOST", dynamicServiceNode.get().getHost());
Assertions.assertEquals(0, dynamicServiceNode.get().getPort());
}

@Test
void testDynamicServiceAdditionAsync() throws InterruptedException {
void testDelayedServiceAddition() {
val delayedHub = new ServiceFinderHub<>(new DynamicDataSource(Lists.newArrayList(new Service("NS", "SERVICE"))),
service -> new TestServiceFinderBuilder()
.withNamespace(service.getNamespace())
.withServiceName(service.getServiceName())
.withDeserializer(new Deserializer<TestNodeData>() {})
.withSleepDuration(5)
.build(), 1_000, 5_000
);
Assertions.assertThrows(IllegalStateException.class, delayedHub::start);
val serviceFinderHub = new ServiceFinderHub<>(new DynamicDataSource(Lists.newArrayList(new Service("NS", "SERVICE"))),
service -> new TestServiceFinderBuilder()
.withNamespace(service.getNamespace())
.withServiceName(service.getServiceName())
.withDeserializer(new Deserializer<TestNodeData>() {})
.withSleepDuration(1)
.build(), 2_000, 5_000
);
serviceFinderHub.start();
serviceFinderHub.buildFinder(new Service("NS", "SERVICE_NAME"));
val finderOpt = serviceFinderHub.finder(new Service("NS", "SERVICE_NAME"));
Assertions.assertFalse(finderOpt.isPresent(), "Finders will not be availbale immediately");
Thread.sleep(1000);
val finderAfterWaitOpt = serviceFinderHub.finder(new Service("NS", "SERVICE_NAME"));
Assertions.assertTrue(finderAfterWaitOpt.isPresent(), "Finders should be availble after some time");
Assertions.assertTrue(serviceFinderHub.finder(new Service("NS", "SERVICE")).isPresent());
}


@Test
void testDynamicServiceAdditionWithNonDynamicDataSource() {

val serviceFinderHub = new ServiceFinderHub<>(new StaticDataSource(new HashSet<>()), service -> new TestServiceFinderBuilder()
.withNamespace(service.getNamespace())
.withServiceName(service.getServiceName())
Expand All @@ -73,8 +84,7 @@ void testDynamicServiceAdditionWithNonDynamicDataSource() {
.build());
serviceFinderHub.start();
try {
val future = serviceFinderHub.buildFinder(new Service("NS", "SERVICE_NAME"));
future.join();
serviceFinderHub.buildFinder(new Service("NS", "SERVICE_NAME")).join();
Assertions.fail("Exception should have been thrown");
} catch (Exception exception) {
Assertions.assertTrue(exception instanceof UnsupportedOperationException, "Unsupported exception should be thrown");
Expand All @@ -83,6 +93,8 @@ void testDynamicServiceAdditionWithNonDynamicDataSource() {

private static class TestServiceFinderBuilder extends BaseServiceFinderBuilder<TestNodeData, MapBasedServiceRegistry<TestNodeData>, ServiceFinder<TestNodeData, MapBasedServiceRegistry<TestNodeData>>, TestServiceFinderBuilder, Deserializer<TestNodeData>> {

private int finderSleepDurationSeconds = 0;

@Override
public ServiceFinder<TestNodeData, MapBasedServiceRegistry<TestNodeData>> build() {
val bf = buildFinder();
Expand All @@ -97,11 +109,16 @@ protected NodeDataSource<TestNodeData, Deserializer<TestNodeData>> dataSource(Se

@Override
protected ServiceFinder<TestNodeData, MapBasedServiceRegistry<TestNodeData>> buildFinder(Service service, ShardSelector<TestNodeData, MapBasedServiceRegistry<TestNodeData>> shardSelector, ServiceNodeSelector<TestNodeData> nodeSelector) {
RangerTestUtils.sleepUntil(finderSleepDurationSeconds);
if (null == shardSelector) {
shardSelector = new MatchingShardSelector<>();
}
return new SimpleShardedServiceFinder<>(new MapBasedServiceRegistry<>(service), shardSelector, nodeSelector);
}

public TestServiceFinderBuilder withSleepDuration(final int finderSleepDurationSeconds) {
this.finderSleepDurationSeconds = finderSleepDurationSeconds;
return this;
}

private static class TestNodeDataSource implements NodeDataSource<TestNodeData, Deserializer<TestNodeData>> {
Expand Down
2 changes: 1 addition & 1 deletion ranger-http-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>ranger</artifactId>
<groupId>io.appform.ranger</groupId>
<version>1.1-RC1</version>
<version>1.1-RC2</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ protected ServiceFinderHub<T, R> buildHub() {
.withServiceDataSource(getServiceDataSource())
.withServiceFinderFactory(getFinderFactory())
.withRefreshFrequencyMs(getNodeRefreshTimeMs())
.withHubRefreshDuration(getHubRefreshDurationMs())
.withServiceRefreshDuration(getServiceRefreshDurationMs())
.build();
}
}
2 changes: 1 addition & 1 deletion ranger-http-model/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>ranger</artifactId>
<groupId>io.appform.ranger</groupId>
<version>1.1-RC1</version>
<version>1.1-RC2</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
2 changes: 1 addition & 1 deletion ranger-http/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>ranger</artifactId>
<groupId>io.appform.ranger</groupId>
<version>1.1-RC1</version>
<version>1.1-RC2</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
2 changes: 1 addition & 1 deletion ranger-hub-server-bundle/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>ranger</artifactId>
<groupId>io.appform.ranger</groupId>
<version>1.1-RC1</version>
<version>1.1-RC2</version>
</parent>
<build>
<plugins>
Expand Down
Loading

0 comments on commit 493f0b4

Please sign in to comment.