Skip to content

Commit

Permalink
a) Making ServiceFinderHub Start and ServiceFinderAddition bounded. P…
Browse files Browse the repository at this point in the history
…ertaining to #22

b) Introduced hubStartDurationMs and finderRefreshDurationMs for defining the boundary. If this boundary were to breach, throw with an IllegalStateException
c) Added tests, upgraded versions and added CHANGELOG
  • Loading branch information
koushikr committed Jul 9, 2024
1 parent 936c214 commit 06ffbee
Show file tree
Hide file tree
Showing 18 changed files with 113 additions and 44 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
# Changelog
All notable changes to this project will be documented in this file.

## [1.1-RC2]

- Pertaining to PR : https://github.com/appform-io/ranger/pull/22/, building of a ServiceFinderHub and a ServiceFinder are bounded.

## [1.0-RC18]
- Version bump to release lexicographically higher version than 1.0-dw3-RC17

Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<groupId>io.appform.ranger</groupId>
<artifactId>ranger</artifactId>
<packaging>pom</packaging>
<version>1.1-RC1</version>
<version>1.1-RC2</version>
<name>Ranger</name>
<url>https://github.com/appform-io/ranger</url>
<description>Service Discovery for Java</description>
Expand Down
2 changes: 1 addition & 1 deletion ranger-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>ranger</artifactId>
<groupId>io.appform.ranger</groupId>
<version>1.1-RC1</version>
<version>1.1-RC2</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import io.appform.ranger.client.utils.CriteriaUtils;
import io.appform.ranger.core.finder.ServiceFinder;
import io.appform.ranger.core.finderhub.ServiceDataSource;
import io.appform.ranger.core.finderhub.ServiceFinderFactory;
import io.appform.ranger.core.finderhub.ServiceFinderHub;
Expand All @@ -31,7 +32,6 @@
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.Predicate;
import lombok.Getter;
import lombok.experimental.SuperBuilder;
Expand Down Expand Up @@ -160,7 +160,7 @@ public Collection<Service> getRegisteredServices() {
* @return CompletableFuture which waits for hub to be ready for discovering the new service
*/
@Override
public CompletableFuture<?> addService(Service service) {
public ServiceFinder<T, R> addService(Service service) {
if(hub == null) {
throw new IllegalStateException("Hub not started yet. Call .start()");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package io.appform.ranger.client;

import io.appform.ranger.core.finder.ServiceFinder;
import io.appform.ranger.core.model.Service;
import io.appform.ranger.core.model.ServiceNode;
import io.appform.ranger.core.model.ServiceNodeSelector;
Expand All @@ -33,7 +34,7 @@ public interface RangerHubClient<T, R extends ServiceRegistry<T>> {

Collection<Service> getRegisteredServices();

CompletableFuture addService(Service service);
ServiceFinder<T, R> addService(Service service);

Optional<ServiceNode<T>> getNode(final Service service);

Expand Down
2 changes: 1 addition & 1 deletion ranger-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>ranger</artifactId>
<groupId>io.appform.ranger</groupId>
<version>1.1-RC1</version>
<version>1.1-RC2</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
package io.appform.ranger.core.finderhub;

import com.github.rholder.retry.RetryerBuilder;
import com.github.rholder.retry.StopStrategies;
import com.google.common.base.Stopwatch;
import com.google.common.base.Supplier;
import io.appform.ranger.core.finder.ServiceFinder;
import io.appform.ranger.core.model.Service;
import io.appform.ranger.core.model.ServiceRegistry;
Expand Down Expand Up @@ -66,11 +68,26 @@ public class ServiceFinderHub<T, R extends ServiceRegistry<T>> {
private final AtomicBoolean alreadyUpdating = new AtomicBoolean(false);
private Future<?> monitorFuture = null;

private final long serviceRefreshDurationMs;
private final long hubRefreshDurationMs;

public ServiceFinderHub(
ServiceDataSource serviceDataSource,
ServiceFinderFactory<T, R> finderFactory) {
ServiceFinderFactory<T, R> finderFactory
) {
this(serviceDataSource, finderFactory,
10_000, 30_000);
}

public ServiceFinderHub(
ServiceDataSource serviceDataSource,
ServiceFinderFactory<T, R> finderFactory,
long serviceRefreshDurationMs,
long hubRefreshDurationMs) {
this.serviceDataSource = serviceDataSource;
this.finderFactory = finderFactory;
this.serviceRefreshDurationMs = serviceRefreshDurationMs;
this.hubRefreshDurationMs = hubRefreshDurationMs;
this.refreshSignals.add(new ScheduledSignal<>("service-hub-updater",
() -> null,
Collections.emptyList(),
Expand All @@ -81,22 +98,20 @@ public Optional<ServiceFinder<T, R>> finder(final Service service) {
return Optional.ofNullable(finders.get().get(service));
}

public CompletableFuture<ServiceFinder<T, R>> buildFinder(final Service service) {
public ServiceFinder<T, R> buildFinder(final Service service) {
val finder = finders.get().get(service);
if (finder != null) {
return CompletableFuture.completedFuture(finder);
if (null != finder) {
return finder;
}
serviceDataSource.add(service);
return CompletableFuture.supplyAsync(() -> {
try {
updateAvailable();
waitTillServiceIsReady(service);
return finders.get().get(service);
} catch(Exception e) {
log.warn("Exception whiling building finder", e);
throw e;
}
});
try {
updateAvailable();
waitTillServiceIsReady(service);
return finders.get().get(service);
} catch(Exception e) {
log.warn("Exception whiling building finder", e);
throw e;
}
}

public void start() {
Expand Down Expand Up @@ -190,13 +205,31 @@ private void updateRegistry() {
}

private void waitTillHubIsReady() {
serviceDataSource.services().forEach(this::waitTillServiceIsReady);
val hubRefresher = CompletableFuture.allOf(
serviceDataSource.services()
.stream()
.map(service -> CompletableFuture.supplyAsync((Supplier<Void>) () -> {
waitTillServiceIsReady(service);
return null;
})).toArray(CompletableFuture[]::new)
);
try {
hubRefresher.get(hubRefreshDurationMs, TimeUnit.MILLISECONDS);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
Exceptions
.illegalState("Couldn't perform hub refresh at this time. Refresh exceeded the start up time specified");
} catch (Exception e) {
Exceptions
.illegalState("Couldn't perform hub refresh at this time", e);
}
}

private void waitTillServiceIsReady(Service service) {
try {
RetryerBuilder.<Boolean>newBuilder()
.retryIfResult(r -> !r)
.withStopStrategy(StopStrategies.stopAfterDelay(serviceRefreshDurationMs, TimeUnit.MILLISECONDS))
.build()
.call(() -> Optional.ofNullable(getFinders().get().get(service))
.map(ServiceFinder::getServiceRegistry)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ public abstract class ServiceFinderHubBuilder<T, R extends ServiceRegistry<T>> {
private final List<Consumer<Void>> extraStartSignalConsumers = new ArrayList<>();
private final List<Consumer<Void>> extraStopSignalConsumers = new ArrayList<>();
private final List<Signal<Void>> extraRefreshSignals = new ArrayList<>();
private long serviceRefreshDurationMs = 10_000;
private long hubRefreshDurationMs = 30_000;

public ServiceFinderHubBuilder<T, R> withServiceDataSource(ServiceDataSource serviceDataSource) {
this.serviceDataSource = serviceDataSource;
Expand Down Expand Up @@ -68,12 +70,23 @@ public ServiceFinderHubBuilder<T, R> withExtraRefreshSignal(Signal<Void> extraRe
return this;
}

public ServiceFinderHubBuilder<T, R> withServiceRefreshDuration(long serviceRefreshDurationMs) {
this.serviceRefreshDurationMs = serviceRefreshDurationMs;
return this;
}

public ServiceFinderHubBuilder<T, R> withHubRefreshDuration(long hubRefreshDurationMs) {
this.hubRefreshDurationMs = hubRefreshDurationMs;
return this;
}

public ServiceFinderHub<T, R> build() {
preBuild();
Preconditions.checkNotNull(serviceDataSource, "Provide a non-null service data source");
Preconditions.checkNotNull(serviceFinderFactory, "Provide a non-null service finder factory");

val hub = new ServiceFinderHub<>(serviceDataSource, serviceFinderFactory);
val hub = new ServiceFinderHub<>(serviceDataSource, serviceFinderFactory,
serviceRefreshDurationMs, hubRefreshDurationMs);
final ScheduledSignal<Void> refreshSignal = new ScheduledSignal<>("service-hub-refresh-timer",
() -> null,
Collections.emptyList(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,12 @@
import io.appform.ranger.core.model.*;
import io.appform.ranger.core.units.TestNodeData;
import java.util.Optional;

import io.appform.ranger.core.utils.RangerTestUtils;
import lombok.val;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.function.Executable;

import java.util.ArrayList;
import java.util.HashSet;
Expand Down Expand Up @@ -41,30 +44,39 @@ void testDynamicServiceAddition() {
Assertions.assertEquals("HOST", node.get().getHost());
Assertions.assertEquals(0, node.get().getPort());

serviceFinderHub.buildFinder(new Service("NS", "SERVICE")).join();
val dynamicServiceFinder = serviceFinderHub.finder(new Service("NS", "SERVICE"))
.orElseThrow(() -> new IllegalStateException("Finder should be present"));
val dynamicServiceFinder = serviceFinderHub.buildFinder(new Service("NS", "SERVICE"));
val dynamicServiceNode = dynamicServiceFinder.get(null, (criteria, serviceRegistry) -> serviceRegistry.nodeList());
Assertions.assertTrue(dynamicServiceNode.isPresent());
Assertions.assertEquals("HOST", dynamicServiceNode.get().getHost());
Assertions.assertEquals(0, dynamicServiceNode.get().getPort());
}

@Test
void testDynamicServiceAdditionAsync() throws InterruptedException {
void testDelayedServiceAddition() {
val delayedHub = new ServiceFinderHub<>(new DynamicDataSource(Lists.newArrayList(new Service("NS", "SERVICE"))),
service -> new TestServiceFinderBuilder()
.withNamespace(service.getNamespace())
.withServiceName(service.getServiceName())
.withDeserializer(new Deserializer<TestNodeData>() {})
.withSleepDuration(5)
.build(), 1_000, 5_000
);
Assertions.assertThrows(IllegalStateException.class, delayedHub::start);
val serviceFinderHub = new ServiceFinderHub<>(new DynamicDataSource(Lists.newArrayList(new Service("NS", "SERVICE"))),
service -> new TestServiceFinderBuilder()
.withNamespace(service.getNamespace())
.withServiceName(service.getServiceName())
.withDeserializer(new Deserializer<TestNodeData>() {})
.withSleepDuration(1)
.build(), 2_000, 5_000
);
serviceFinderHub.start();
serviceFinderHub.buildFinder(new Service("NS", "SERVICE_NAME"));
val finderOpt = serviceFinderHub.finder(new Service("NS", "SERVICE_NAME"));
Assertions.assertFalse(finderOpt.isPresent(), "Finders will not be availbale immediately");
Thread.sleep(1000);
val finderAfterWaitOpt = serviceFinderHub.finder(new Service("NS", "SERVICE_NAME"));
Assertions.assertTrue(finderAfterWaitOpt.isPresent(), "Finders should be availble after some time");
Assertions.assertTrue(serviceFinderHub.finder(new Service("NS", "SERVICE")).isPresent());
}


@Test
void testDynamicServiceAdditionWithNonDynamicDataSource() {

val serviceFinderHub = new ServiceFinderHub<>(new StaticDataSource(new HashSet<>()), service -> new TestServiceFinderBuilder()
.withNamespace(service.getNamespace())
.withServiceName(service.getServiceName())
Expand All @@ -73,8 +85,7 @@ void testDynamicServiceAdditionWithNonDynamicDataSource() {
.build());
serviceFinderHub.start();
try {
val future = serviceFinderHub.buildFinder(new Service("NS", "SERVICE_NAME"));
future.join();
serviceFinderHub.buildFinder(new Service("NS", "SERVICE_NAME"));
Assertions.fail("Exception should have been thrown");
} catch (Exception exception) {
Assertions.assertTrue(exception instanceof UnsupportedOperationException, "Unsupported exception should be thrown");
Expand All @@ -83,6 +94,8 @@ void testDynamicServiceAdditionWithNonDynamicDataSource() {

private static class TestServiceFinderBuilder extends BaseServiceFinderBuilder<TestNodeData, MapBasedServiceRegistry<TestNodeData>, ServiceFinder<TestNodeData, MapBasedServiceRegistry<TestNodeData>>, TestServiceFinderBuilder, Deserializer<TestNodeData>> {

private int finderSleepDurationSeconds = 0;

@Override
public ServiceFinder<TestNodeData, MapBasedServiceRegistry<TestNodeData>> build() {
val bf = buildFinder();
Expand All @@ -97,11 +110,16 @@ protected NodeDataSource<TestNodeData, Deserializer<TestNodeData>> dataSource(Se

@Override
protected ServiceFinder<TestNodeData, MapBasedServiceRegistry<TestNodeData>> buildFinder(Service service, ShardSelector<TestNodeData, MapBasedServiceRegistry<TestNodeData>> shardSelector, ServiceNodeSelector<TestNodeData> nodeSelector) {
RangerTestUtils.sleepUntil(finderSleepDurationSeconds);
if (null == shardSelector) {
shardSelector = new MatchingShardSelector<>();
}
return new SimpleShardedServiceFinder<>(new MapBasedServiceRegistry<>(service), shardSelector, nodeSelector);
}

public TestServiceFinderBuilder withSleepDuration(final int finderSleepDurationSeconds) {
this.finderSleepDurationSeconds = finderSleepDurationSeconds;
return this;
}

private static class TestNodeDataSource implements NodeDataSource<TestNodeData, Deserializer<TestNodeData>> {
Expand Down
2 changes: 1 addition & 1 deletion ranger-http-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>ranger</artifactId>
<groupId>io.appform.ranger</groupId>
<version>1.1-RC1</version>
<version>1.1-RC2</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
2 changes: 1 addition & 1 deletion ranger-http-model/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>ranger</artifactId>
<groupId>io.appform.ranger</groupId>
<version>1.1-RC1</version>
<version>1.1-RC2</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
2 changes: 1 addition & 1 deletion ranger-http/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>ranger</artifactId>
<groupId>io.appform.ranger</groupId>
<version>1.1-RC1</version>
<version>1.1-RC2</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
2 changes: 1 addition & 1 deletion ranger-hub-server-bundle/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>ranger</artifactId>
<groupId>io.appform.ranger</groupId>
<version>1.0-RC18</version>
<version>1.1-RC2</version>
</parent>
<build>
<plugins>
Expand Down
2 changes: 1 addition & 1 deletion ranger-server-bundle/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>ranger</artifactId>
<groupId>io.appform.ranger</groupId>
<version>1.1-RC1</version>
<version>1.1-RC2</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
2 changes: 1 addition & 1 deletion ranger-server-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>ranger</artifactId>
<groupId>io.appform.ranger</groupId>
<version>1.1-RC1</version>
<version>1.1-RC2</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
2 changes: 1 addition & 1 deletion ranger-server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<parent>
<groupId>io.appform.ranger</groupId>
<artifactId>ranger</artifactId>
<version>1.0-RC18</version>
<version>1.1-RC2</version>
</parent>

<artifactId>ranger-server</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion ranger-zk-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>ranger</artifactId>
<groupId>io.appform.ranger</groupId>
<version>1.1-RC1</version>
<version>1.1-RC2</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
2 changes: 1 addition & 1 deletion ranger-zookeeper/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>ranger</artifactId>
<groupId>io.appform.ranger</groupId>
<version>1.1-RC1</version>
<version>1.1-RC2</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down

0 comments on commit 06ffbee

Please sign in to comment.