Skip to content

Commit

Permalink
Merge pull request #46 from appform-io/failure_grace
Browse files Browse the repository at this point in the history
    Stability fixes and optimizations
  • Loading branch information
koushikr authored Oct 23, 2024
2 parents 8caea77 + 2761ccb commit bc5b586
Show file tree
Hide file tree
Showing 44 changed files with 573 additions and 272 deletions.
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<guava.version>33.2.0-jre</guava.version>
<guava.version>33.3.0-jre</guava.version>
<curator.version>5.5.0</curator.version>
<slf4j.version>1.7.32</slf4j.version>

Expand All @@ -116,7 +116,7 @@
<wiremock.version>3.3.1</wiremock.version>
<mockito.version>4.2.0</mockito.version>

<dropwizard.version>2.1.10</dropwizard.version>
<dropwizard.version>2.1.12</dropwizard.version>
<logback.version>1.2.12</logback.version>
</properties>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@
import io.appform.ranger.core.model.Service;
import io.appform.ranger.core.model.ServiceNode;
import io.appform.ranger.core.units.TestNodeData;
import java.util.Optional;
import lombok.Builder;

import java.util.Collections;
import java.util.List;
import java.util.Optional;

@Builder
public class TestSimpleUnshardedServiceFinder <T>
Expand All @@ -51,6 +51,7 @@ public Optional<List<ServiceNode<TestNodeData>>> refresh(Deserializer<TestNodeDa
.host("localhost")
.port(9200)
.nodeData(TestNodeData.builder().shardId(1).build())
.lastUpdatedTimeStamp(Long.MAX_VALUE)
.build())
);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright 2024 Authors, Flipkart Internet Pvt. Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.appform.ranger.core.exceptions;

/**
* Base for communication exception
*/
public abstract class CommunicationException extends RuntimeException {
protected CommunicationException(final String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,16 @@
package io.appform.ranger.core.finder.serviceregistry;

import com.github.rholder.retry.RetryerBuilder;
import com.github.rholder.retry.WaitStrategies;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import io.appform.ranger.core.healthcheck.HealthcheckStatus;
import io.appform.ranger.core.model.Deserializer;
import io.appform.ranger.core.model.NodeDataSource;
import io.appform.ranger.core.model.ServiceRegistry;
import io.appform.ranger.core.signals.Signal;
import io.appform.ranger.core.util.Exceptions;
import io.appform.ranger.core.util.FinderUtils;
import lombok.extern.slf4j.Slf4j;
import lombok.val;

Expand All @@ -39,7 +42,7 @@
public class ServiceRegistryUpdater<T, D extends Deserializer<T>> {

private final ServiceRegistry<T> serviceRegistry;
private final NodeDataSource<T,D> nodeDataSource;
private final NodeDataSource<T, D> nodeDataSource;
private final D deserializer;

private final Lock checkLock = new ReentrantLock();
Expand All @@ -51,7 +54,7 @@ public class ServiceRegistryUpdater<T, D extends Deserializer<T>> {

public ServiceRegistryUpdater(
ServiceRegistry<T> serviceRegistry,
NodeDataSource<T,D> nodeDataSource,
NodeDataSource<T, D> nodeDataSource,
List<Signal<T>> signalGenerators,
D deserializer) {
this.serviceRegistry = serviceRegistry;
Expand All @@ -70,6 +73,8 @@ public void start() {
try {
RetryerBuilder.<Boolean>newBuilder()
.retryIfResult(r -> null == r || !r)
.retryIfException()
.withWaitStrategy(WaitStrategies.fixedWait(1, TimeUnit.SECONDS))
.build()
.call(serviceRegistry::isRefreshed);
}
Expand All @@ -81,7 +86,7 @@ public void start() {
}

public void stop() {
if(null != queryThreadFuture) {
if (null != queryThreadFuture) {
executorService.shutdownNow();
}
}
Expand Down Expand Up @@ -125,21 +130,42 @@ private Void queryExecutor() {

private void updateRegistry() throws InterruptedException {
log.debug("Checking for updates on data source for service: {}",
serviceRegistry.getService().getServiceName());
if(!nodeDataSource.isActive()) {
log.warn("Node data source seems to be down. Keeping old list for {}",
serviceRegistry.getService().getServiceName());
return;
}
val nodeList = nodeDataSource.refresh(deserializer).orElse(null);
if (null != nodeList) {
log.debug("Updating nodeList of size: {} for [{}]", nodeList.size(),
serviceRegistry.getService().getServiceName());
serviceRegistry.updateNodes(nodeList);
serviceRegistry.getService().getServiceName());
var callFailed = false;
if (nodeDataSource.isActive()) { //Source should implement circuit breaker to fail fast and reopen after some
// time
try {
val nodeList = nodeDataSource.refresh(deserializer).orElse(null);
if (null != nodeList) {
log.debug("Updating nodeList of size: {} for [{}]", nodeList.size(),
serviceRegistry.getService().getServiceName());
val livenessCheckMaxAge = nodeDataSource.healthcheckZombieCheckThresholdTime(serviceRegistry.getService());
//Remove all stale nodes before updating. This is done centrally to ensure some data sources
//don't skip this check. Some control is still provided so that they can overload.
serviceRegistry.updateNodes(FinderUtils.filterValidNodes(serviceRegistry.getService(), nodeList, livenessCheckMaxAge));
}
else {
log.warn("Empty list returned from node data source. We are in a weird state. Keeping old list for {}",
serviceRegistry.getService().getServiceName());
}
}
catch (Exception e) {
log.error("Error updating data from registry. Error: [{}] {}",
e.getClass().getSimpleName(),
e.getMessage());
callFailed = true;
}
}
else {
log.warn("Empty list returned from node data source. We are in a weird state. Keeping old list for {}",
if (!nodeDataSource.isActive() || callFailed) {
val currTime = System.currentTimeMillis();
log.warn("Node data source seems to be down. Keeping old list for {}." +
" Will update timestamp to keep stale date relevant.",
serviceRegistry.getService().getServiceName());
serviceRegistry.updateNodes(serviceRegistry.nodeList()
.stream()
.filter(node -> HealthcheckStatus.healthy == node.getHealthcheckStatus())
.map(node -> node.setLastUpdatedTimeStamp(currTime))
.toList());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import com.github.rholder.retry.RetryerBuilder;
import com.github.rholder.retry.StopStrategies;
import com.google.common.base.Stopwatch;
import com.google.common.base.Supplier;
import io.appform.ranger.core.finder.ServiceFinder;
import io.appform.ranger.core.model.HubConstants;
import io.appform.ranger.core.model.Service;
Expand All @@ -34,11 +33,13 @@
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;

/**
Expand Down Expand Up @@ -68,11 +69,14 @@ public class ServiceFinderHub<T, R extends ServiceRegistry<T>> {
private final ServiceFinderFactory<T, R> finderFactory;

private final AtomicBoolean alreadyUpdating = new AtomicBoolean(false);
private final AtomicInteger poolThreadIndex = new AtomicInteger(0);
private Future<?> monitorFuture = null;

private final long serviceRefreshDurationMs;
private final long hubRefreshDurationMs;

private final ForkJoinPool refresherPool;

public ServiceFinderHub(
ServiceDataSource serviceDataSource,
ServiceFinderFactory<T, R> finderFactory
Expand All @@ -88,12 +92,13 @@ public ServiceFinderHub(
long hubRefreshDurationMs) {
this.serviceDataSource = serviceDataSource;
this.finderFactory = finderFactory;
this.serviceRefreshDurationMs = serviceRefreshDurationMs;
this.hubRefreshDurationMs = hubRefreshDurationMs;
this.serviceRefreshDurationMs = serviceRefreshDurationMs == 0 ? HubConstants.SERVICE_REFRESH_DURATION_MS : serviceRefreshDurationMs;
this.hubRefreshDurationMs = hubRefreshDurationMs == 0 ? HubConstants.HUB_REFRESH_DURATION_MS : hubRefreshDurationMs;
this.refreshSignals.add(new ScheduledSignal<>("service-hub-updater",
() -> null,
Collections.emptyList(),
10_000));
this.refresherPool = createRefresherPool();
}

public Optional<ServiceFinder<T, R>> finder(final Service service) {
Expand Down Expand Up @@ -158,6 +163,18 @@ public void updateAvailable() {
}
}

private ForkJoinPool createRefresherPool() {
return new ForkJoinPool(
Math.max(20, Runtime.getRuntime().availableProcessors()),
pool -> {
val thread = ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool);
thread.setName("hub-refresher-" + poolThreadIndex.getAndIncrement());
return thread;
},
null,
false);
}

private void monitor() {
while (true) {
try {
Expand Down Expand Up @@ -185,17 +202,18 @@ private void updateRegistry() {
return;
}
alreadyUpdating.set(true);
final Map<Service, ServiceFinder<T, R>> updatedFinders = new HashMap<>();
val updatedFinders = new ConcurrentHashMap<Service, ServiceFinder<T, R>>();
try {
val services = serviceDataSource.services();
if (services.isEmpty()) {
log.debug("No services found for the service data source. Skipping update on the registry");
return;
}
val knownServiceFinders = finders.get();
val newFinders = services.stream()
.filter(service -> !knownServiceFinders.containsKey(service))
.collect(Collectors.toMap(Function.identity(), finderFactory::buildFinder));
val newFinders = refresherPool.submit(() -> services.parallelStream()
.filter(service -> !knownServiceFinders.containsKey(service))
.collect(Collectors.toMap(Function.identity(), finderFactory::buildFinder)))
.get();
val matchingServices = knownServiceFinders.entrySet()
.stream()
.filter(entry -> services.contains(entry.getKey()))
Expand All @@ -208,6 +226,10 @@ private void updateRegistry() {
updatedFinders.putAll(matchingServices);
finders.set(updatedFinders);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("Refresh interrupted.");
}
catch (Exception e) {
log.error("Error updating service list. Will maintain older list", e);
}
Expand All @@ -217,29 +239,33 @@ private void updateRegistry() {
}

private void waitTillHubIsReady() {
val services = serviceDataSource.services();
val timeToRefresh = Math.max(hubRefreshDurationMs,
(serviceRefreshDurationMs * services.size()) / refresherPool.getParallelism());
if (timeToRefresh != hubRefreshDurationMs) {
log.warn("Max hub refresh time has been dynamically adjusted to {} ms from the provided {} ms as the " +
"provided time would have been insufficient to refresh {} services.",
timeToRefresh, hubRefreshDurationMs, services.size());
}
val hubRefresher = CompletableFuture.allOf(
serviceDataSource.services()
.stream()
services.stream()
.map(service -> CompletableFuture.supplyAsync((Supplier<Void>) () -> {
waitTillServiceIsReady(service);
return null;
})).toArray(CompletableFuture[]::new)
);
})).toArray(CompletableFuture[]::new));
try {
hubRefresher.get(hubRefreshDurationMs, TimeUnit.MILLISECONDS);
hubRefresher.get(timeToRefresh, TimeUnit.MILLISECONDS);
}
catch (InterruptedException ie) {
Thread.currentThread().interrupt();
Exceptions.illegalState("Refresh interrupted");
}
catch (TimeoutException e) {
Exceptions
.illegalState("Couldn't perform service hub refresh at this time. " +
Exceptions.illegalState("Couldn't perform service hub refresh at this time. " +
"Refresh exceeded the start up time specified");
}
catch (Exception e) {
Exceptions
.illegalState("Couldn't perform hub refresh at this time", e);
Exceptions.illegalState("Couldn't perform hub refresh at this time", e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package io.appform.ranger.core.model;

import io.appform.ranger.core.exceptions.CommunicationException;

import java.util.List;
import java.util.Optional;

Expand All @@ -24,9 +26,9 @@
@SuppressWarnings("unused")
public interface NodeDataSource<T, D extends Deserializer<T>> extends NodeDataStoreConnector<T> {

Optional<List<ServiceNode<T>>> refresh(D deserializer);
Optional<List<ServiceNode<T>>> refresh(D deserializer) throws CommunicationException;

default long healthcheckZombieCheckThresholdTime(Service service) {
return System.currentTimeMillis() - 60000; //1 Minute
return isActive() ? (System.currentTimeMillis() - 60000) : 0; //1 Minute
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public class ScheduledSignal<T> extends Signal<T> {
private final String name;
private final long refreshIntervalMillis;

private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();

private ScheduledFuture<?> scheduledFuture = null;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@
import io.appform.ranger.core.healthcheck.HealthcheckStatus;
import io.appform.ranger.core.model.*;
import io.appform.ranger.core.units.TestNodeData;
import java.util.Optional;

import io.appform.ranger.core.utils.RangerTestUtils;
import lombok.val;
import org.junit.jupiter.api.Assertions;
Expand All @@ -36,6 +34,7 @@
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;

class ServiceFinderHubTest {

Expand Down Expand Up @@ -83,7 +82,7 @@ void testDelayedServiceAddition() {
.withServiceName(service.getServiceName())
.withDeserializer(new Deserializer<TestNodeData>() {})
.withSleepDuration(1)
.build(), 2_000, 5_000
.build(), 5_000, 5_000
);
serviceFinderHub.start();
Assertions.assertTrue(serviceFinderHub.finder(new Service("NS", "SERVICE")).isPresent());
Expand Down Expand Up @@ -141,7 +140,7 @@ private static class TestNodeDataSource implements NodeDataSource<TestNodeData,
@Override
public Optional<List<ServiceNode<TestNodeData>>> refresh(Deserializer<TestNodeData> deserializer) {
val list = new ArrayList<ServiceNode<TestNodeData>>();
list.add(new ServiceNode<>("HOST", 0, TestNodeData.builder().shardId(1).build(), HealthcheckStatus.healthy, 10L, "HTTP"));
list.add(new ServiceNode<>("HOST", 0, TestNodeData.builder().shardId(1).build(), HealthcheckStatus.healthy, Long.MAX_VALUE, "HTTP"));
return Optional.of(list);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@
"iterations" : 4,
"threads" : 1,
"forks" : 3,
"mean_ops" : 787544.107063881
"mean_ops" : 812476.3197574528
}
Loading

0 comments on commit bc5b586

Please sign in to comment.