Skip to content

Commit

Permalink
Merge pull request #48 from ToOnlyGaurav/race_condition_fix
Browse files Browse the repository at this point in the history
Fixing race condition while starting service finder hub.
  • Loading branch information
santanusinha authored Dec 5, 2024
2 parents 1b353aa + 0595350 commit 836955c
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import com.github.rholder.retry.RetryerBuilder;
import com.github.rholder.retry.StopStrategies;
import com.github.rholder.retry.WaitStrategies;
import com.google.common.base.Stopwatch;
import io.appform.ranger.core.finder.ServiceFinder;
import io.appform.ranger.core.model.HubConstants;
Expand Down Expand Up @@ -54,7 +55,7 @@ public class ServiceFinderHub<T, R extends ServiceRegistry<T>> {
private final Lock updateLock = new ReentrantLock();
private final Condition updateCond = updateLock.newCondition();
private final AtomicBoolean updateAvailable = new AtomicBoolean(false);
private final ExecutorService executorService = Executors.newFixedThreadPool(1);
private final ExecutorService executorService = Executors.newFixedThreadPool(2);

@Getter
private final ExternalTriggeredSignal<Void> startSignal
Expand Down Expand Up @@ -187,7 +188,7 @@ private void monitor() {
while (!updateAvailable.get()) {
updateCond.await();
}
updateRegistry();
executorService.submit(this::updateRegistry);
}
catch (InterruptedException e) {
log.info("Updater thread interrupted");
Expand Down Expand Up @@ -279,6 +280,7 @@ private void waitTillServiceIsReady(Service service) {
RetryerBuilder.<Boolean>newBuilder()
.retryIfResult(r -> !r)
.withStopStrategy(StopStrategies.stopAfterDelay(serviceRefreshTimeoutMs, TimeUnit.MILLISECONDS))
.withWaitStrategy(WaitStrategies.fixedWait(1, TimeUnit.SECONDS))
.build()
.call(() -> Optional.ofNullable(getFinders().get().get(service))
.map(ServiceFinder::getServiceRegistry)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,25 @@ void testDynamicServiceAddition() {
Assertions.assertEquals(0, dynamicServiceNode.get().getPort());
}

@Test
void testTimeoutOnHubStartup() {
var testServiceFinderHub = new TestServiceFinderHubBuilder()
.withServiceDataSource(new DynamicDataSource(Lists.newArrayList(new Service("NS", "SERVICE"))))
.withServiceFinderFactory(new TestServiceFinderFactory())
.withRefreshFrequencyMs(5_000)
.withHubStartTimeout(1_000)
.withServiceRefreshTimeout(10_000)
.build();

try {
Exception exception = Assertions.assertThrows(IllegalStateException.class, testServiceFinderHub::start);
Assertions.assertTrue(exception.getMessage()
.contains("Couldn't perform service hub refresh at this time. Refresh exceeded the start up time specified"));
} finally {
testServiceFinderHub.stop();
}
}

@Test
void testDelayedServiceAddition() {
val delayedHub = new ServiceFinderHub<>(new DynamicDataSource(Lists.newArrayList(new Service("NS", "SERVICE"))),
Expand Down Expand Up @@ -107,6 +126,34 @@ void testDynamicServiceAdditionWithNonDynamicDataSource() {
}
}

public class TestServiceFinderFactory implements ServiceFinderFactory<TestNodeData, MapBasedServiceRegistry<TestNodeData>> {

@Override
public ServiceFinder<TestNodeData, MapBasedServiceRegistry<TestNodeData>> buildFinder(Service service) {
val finder = new TestServiceFinderBuilder()
.withNamespace(service.getNamespace())
.withServiceName(service.getServiceName())
.withDeserializer(new Deserializer<TestNodeData>() {})
.withSleepDuration(60)
.build();

finder.start();
return finder;
}
}

private static class TestServiceFinderHubBuilder extends ServiceFinderHubBuilder<TestNodeData, MapBasedServiceRegistry<TestNodeData>> {

@Override
protected void preBuild() {

}

@Override
protected void postBuild(ServiceFinderHub<TestNodeData, MapBasedServiceRegistry<TestNodeData>> serviceFinderHub) {

}
}
private static class TestServiceFinderBuilder extends BaseServiceFinderBuilder<TestNodeData, MapBasedServiceRegistry<TestNodeData>, ServiceFinder<TestNodeData, MapBasedServiceRegistry<TestNodeData>>, TestServiceFinderBuilder, Deserializer<TestNodeData>> {

private int finderSleepDurationSeconds = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,12 @@ public static <T> HttpCommunicator<T> httpClient(
.connectTimeout(config.getConnectionTimeoutMs() == 0
? 3000
: config.getConnectionTimeoutMs(), TimeUnit.MILLISECONDS)
.readTimeout(config.getOperationTimeoutMs() == 0
? 3000
: config.getOperationTimeoutMs(), TimeUnit.MILLISECONDS)
.writeTimeout(config.getOperationTimeoutMs() == 0
? 3000
: config.getOperationTimeoutMs(), TimeUnit.MILLISECONDS)
.followRedirects(true)
.connectionPool(new ConnectionPool(1, 30, TimeUnit.SECONDS))
.build(),
Expand Down
41 changes: 41 additions & 0 deletions ranger-server/src/main/resources/local.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
ranger:
namespace: mynamespace
upstreams:
- type: HTTP
nodeRefreshTimeMs: 5000
serviceRefreshTimeoutMs: 300000
hubStartTimeoutMs: 210000
httpClientConfigs:
- host: localhost
port: 80
- type: ZK
nodeRefreshTimeMs: 5000
serviceRefreshTimeoutMs: 3000
hubStartTimeoutMs: 5000
zookeepers: [ "localhost:2181" ]
disablePushUpdaters: true


server:
maxThreads: 1024
minThreads: 1024
applicationConnectors:
- type: http
port: 18080
adminConnectors:
- type: http
port: 18081
applicationContextPath: /
requestLog:
appenders:
- type: console
timeZone: IST

logging:
level: INFO

appenders:
- type: console
threshold: INFO
timeZone: IST
logFormat: "%(%-5level) [%date] [%thread] [%logger{0}]: %message%n"

0 comments on commit 836955c

Please sign in to comment.