Skip to content

Commit

Permalink
introduce disable-time-limiter property
Browse files Browse the repository at this point in the history
  • Loading branch information
Martin Siegemund committed Nov 12, 2023
1 parent 7d4b499 commit 9892b11
Show file tree
Hide file tree
Showing 11 changed files with 255 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,10 @@ public class ReactiveResilience4JAutoConfiguration {
@Bean
@ConditionalOnMissingBean(ReactiveCircuitBreakerFactory.class)
public ReactiveResilience4JCircuitBreakerFactory reactiveResilience4JCircuitBreakerFactory(
CircuitBreakerRegistry circuitBreakerRegistry, TimeLimiterRegistry timeLimiterRegistry) {
CircuitBreakerRegistry circuitBreakerRegistry, TimeLimiterRegistry timeLimiterRegistry,
Resilience4JConfigurationProperties resilience4JConfigurationProperties) {
ReactiveResilience4JCircuitBreakerFactory factory = new ReactiveResilience4JCircuitBreakerFactory(
circuitBreakerRegistry, timeLimiterRegistry);
circuitBreakerRegistry, timeLimiterRegistry, resilience4JConfigurationProperties);
customizers.forEach(customizer -> customizer.customize(factory));
return factory;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package org.springframework.cloud.circuitbreaker.resilience4j;

import java.time.Duration;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -59,29 +60,38 @@ public class ReactiveResilience4JCircuitBreaker implements ReactiveCircuitBreake

private final Optional<Customizer<CircuitBreaker>> circuitBreakerCustomizer;

private final boolean disableTimeLimiter;

public ReactiveResilience4JCircuitBreaker(String id, String groupName,
Resilience4JConfigBuilder.Resilience4JCircuitBreakerConfiguration config,
CircuitBreakerRegistry circuitBreakerRegistry, TimeLimiterRegistry timeLimiterRegistry,
Optional<Customizer<CircuitBreaker>> circuitBreakerCustomizer) {
Optional<Customizer<CircuitBreaker>> circuitBreakerCustomizer,
boolean disableTimeLimiter) {
this.id = id;
this.groupName = groupName;
this.circuitBreakerConfig = config.getCircuitBreakerConfig();
this.circuitBreakerRegistry = circuitBreakerRegistry;
this.circuitBreakerCustomizer = circuitBreakerCustomizer;
this.timeLimiterConfig = config.getTimeLimiterConfig();
this.timeLimiterRegistry = timeLimiterRegistry;
this.disableTimeLimiter = disableTimeLimiter;
}

@Override
public <T> Mono<T> run(Mono<T> toRun, Function<Throwable, Mono<T>> fallback) {
Tuple2<CircuitBreaker, TimeLimiter> tuple = buildCircuitBreakerAndTimeLimiter();
Mono<T> toReturn = toRun.transform(CircuitBreakerOperator.of(tuple.getT1()))
.timeout(tuple.getT2().getTimeLimiterConfig().getTimeoutDuration())
// Since we are using the Mono timeout we need to tell the circuit breaker
// about the error
.doOnError(TimeoutException.class,
t -> tuple.getT1().onError(tuple.getT2().getTimeLimiterConfig().getTimeoutDuration().toMillis(),
TimeUnit.MILLISECONDS, t));
Tuple2<CircuitBreaker, Optional<TimeLimiter>> tuple = buildCircuitBreakerAndTimeLimiter();
Mono<T> toReturn = toRun.transform(CircuitBreakerOperator.of(tuple.getT1()));
if (tuple.getT2().isPresent()) {
final Duration timeoutDuration = tuple.getT2().get().getTimeLimiterConfig().getTimeoutDuration();
toReturn = toReturn
.timeout(timeoutDuration)
// Since we are using the Mono timeout we need to tell the circuit breaker
// about the error
.doOnError(TimeoutException.class,
t -> tuple.getT1()
.onError(timeoutDuration.toMillis(),
TimeUnit.MILLISECONDS, t));
}
if (fallback != null) {
toReturn = toReturn.onErrorResume(fallback);
}
Expand All @@ -90,28 +100,37 @@ public <T> Mono<T> run(Mono<T> toRun, Function<Throwable, Mono<T>> fallback) {

@Override
public <T> Flux<T> run(Flux<T> toRun, Function<Throwable, Flux<T>> fallback) {
Tuple2<CircuitBreaker, TimeLimiter> tuple = buildCircuitBreakerAndTimeLimiter();
Flux<T> toReturn = toRun.transform(CircuitBreakerOperator.of(tuple.getT1()))
.timeout(tuple.getT2().getTimeLimiterConfig().getTimeoutDuration())
// Since we are using the Flux timeout we need to tell the circuit breaker
// about the error
.doOnError(TimeoutException.class,
t -> tuple.getT1().onError(tuple.getT2().getTimeLimiterConfig().getTimeoutDuration().toMillis(),
TimeUnit.MILLISECONDS, t));
Tuple2<CircuitBreaker, Optional<TimeLimiter>> tuple = buildCircuitBreakerAndTimeLimiter();
Flux<T> toReturn = toRun.transform(CircuitBreakerOperator.of(tuple.getT1()));
if (tuple.getT2().isPresent()) {
final Duration timeoutDuration = tuple.getT2().get().getTimeLimiterConfig().getTimeoutDuration();
toReturn = toReturn.timeout(timeoutDuration)
// Since we are using the Flux timeout we need to tell the circuit breaker
// about the error
.doOnError(TimeoutException.class,
t -> tuple.getT1()
.onError(timeoutDuration.toMillis(),
TimeUnit.MILLISECONDS, t));
}
if (fallback != null) {
toReturn = toReturn.onErrorResume(fallback);
}
return toReturn;
}

private Tuple2<CircuitBreaker, TimeLimiter> buildCircuitBreakerAndTimeLimiter() {
private Tuple2<CircuitBreaker, Optional<TimeLimiter>> buildCircuitBreakerAndTimeLimiter() {
final Map<String, String> tags = Map.of(CIRCUIT_BREAKER_GROUP_TAG, this.groupName);
CircuitBreaker circuitBreaker = circuitBreakerRegistry.circuitBreaker(id, circuitBreakerConfig, tags);
circuitBreakerCustomizer.ifPresent(customizer -> customizer.customize(circuitBreaker));
if (disableTimeLimiter) {
/* do not provide/load time-limiter */
return Tuples.of(circuitBreaker, Optional.empty());
}
/* provide time-limiter */
TimeLimiter timeLimiter = this.timeLimiterRegistry.find(this.id)
.orElseGet(() -> this.timeLimiterRegistry.find(this.groupName)
.orElseGet(() -> this.timeLimiterRegistry.timeLimiter(this.id, this.timeLimiterConfig, tags)));
return Tuples.of(circuitBreaker, timeLimiter);
return Tuples.of(circuitBreaker, Optional.of(timeLimiter));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,19 @@ public class ReactiveResilience4JCircuitBreakerFactory extends

private TimeLimiterRegistry timeLimiterRegistry = TimeLimiterRegistry.ofDefaults();

private Map<String, Customizer<CircuitBreaker>> circuitBreakerCustomizers = new HashMap<>();
private final Map<String, Customizer<CircuitBreaker>> circuitBreakerCustomizers = new HashMap<>();

private final Resilience4JConfigurationProperties resilience4JConfigurationProperties;

public ReactiveResilience4JCircuitBreakerFactory(CircuitBreakerRegistry circuitBreakerRegistry,
TimeLimiterRegistry timeLimiterRegistry) {
TimeLimiterRegistry timeLimiterRegistry,
Resilience4JConfigurationProperties resilience4JConfigurationProperties) {
this.circuitBreakerRegistry = circuitBreakerRegistry;
this.timeLimiterRegistry = timeLimiterRegistry;
this.defaultConfiguration = id -> new Resilience4JConfigBuilder(id)
.circuitBreakerConfig(this.circuitBreakerRegistry.getDefaultConfig())
.timeLimiterConfig(this.timeLimiterRegistry.getDefaultConfig()).build();
this.resilience4JConfigurationProperties = resilience4JConfigurationProperties;
}

@Override
Expand Down Expand Up @@ -91,7 +95,8 @@ public ReactiveCircuitBreaker create(String id, String groupName) {
Resilience4JConfigBuilder.Resilience4JCircuitBreakerConfiguration config = new Resilience4JConfigBuilder(id)
.circuitBreakerConfig(circuitBreakerConfig).timeLimiterConfig(timeLimiterConfig).build();
return new ReactiveResilience4JCircuitBreaker(id, groupName, config, circuitBreakerRegistry,
timeLimiterRegistry, Optional.ofNullable(circuitBreakerCustomizers.get(id)));
timeLimiterRegistry, Optional.ofNullable(circuitBreakerCustomizers.get(id)),
resilience4JConfigurationProperties.isDisableTimeLimiter());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public class Resilience4JCircuitBreaker implements CircuitBreaker {
private final String id;

private final String groupName;
private final Map<String, String> tags;

private Resilience4jBulkheadProvider bulkheadProvider;

Expand All @@ -59,12 +60,15 @@ public class Resilience4JCircuitBreaker implements CircuitBreaker {

private final Optional<Customizer<io.github.resilience4j.circuitbreaker.CircuitBreaker>> circuitBreakerCustomizer;

private final boolean disableTimeLimiter;

public Resilience4JCircuitBreaker(String id, String groupName,
io.github.resilience4j.circuitbreaker.CircuitBreakerConfig circuitBreakerConfig,
TimeLimiterConfig timeLimiterConfig, CircuitBreakerRegistry circuitBreakerRegistry,
TimeLimiterRegistry timeLimiterRegistry, ExecutorService executorService,
Optional<Customizer<io.github.resilience4j.circuitbreaker.CircuitBreaker>> circuitBreakerCustomizer,
Resilience4jBulkheadProvider bulkheadProvider) {
Resilience4jBulkheadProvider bulkheadProvider,
boolean disableTimeLimiter) {
this.id = id;
this.groupName = groupName;
this.circuitBreakerConfig = circuitBreakerConfig;
Expand All @@ -74,6 +78,8 @@ public Resilience4JCircuitBreaker(String id, String groupName,
this.executorService = executorService;
this.circuitBreakerCustomizer = circuitBreakerCustomizer;
this.bulkheadProvider = bulkheadProvider;
this.disableTimeLimiter = disableTimeLimiter;
tags = Map.of(CIRCUIT_BREAKER_GROUP_TAG, this.groupName);
}

public Resilience4JCircuitBreaker(String id, String groupName,
Expand All @@ -83,25 +89,24 @@ public Resilience4JCircuitBreaker(String id, String groupName,
Optional<Customizer<io.github.resilience4j.circuitbreaker.CircuitBreaker>> circuitBreakerCustomizer,
Resilience4jBulkheadProvider bulkheadProvider) {
this(id, groupName, circuitBreakerConfig, timeLimiterConfig, circuitBreakerRegistry, timeLimiterRegistry, null,
circuitBreakerCustomizer, bulkheadProvider);
circuitBreakerCustomizer, bulkheadProvider, false);
}

@Override
public <T> T run(Supplier<T> toRun, Function<Throwable, T> fallback) {
final Map<String, String> tags = Map.of(CIRCUIT_BREAKER_GROUP_TAG, this.groupName);
TimeLimiter timeLimiter = this.timeLimiterRegistry.find(this.id)
.orElseGet(() -> this.timeLimiterRegistry.find(this.groupName)
.orElseGet(() -> this.timeLimiterRegistry.timeLimiter(this.id, this.timeLimiterConfig, tags)));
Optional<TimeLimiter> timeLimiter = loadTimeLimiter();
io.github.resilience4j.circuitbreaker.CircuitBreaker defaultCircuitBreaker = registry.circuitBreaker(this.id,
this.circuitBreakerConfig, tags);
circuitBreakerCustomizer.ifPresent(customizer -> customizer.customize(defaultCircuitBreaker));
if (bulkheadProvider != null) {
return bulkheadProvider.run(this.groupName, toRun, fallback, defaultCircuitBreaker, timeLimiter, tags);
return bulkheadProvider.run(this.groupName, toRun, fallback, defaultCircuitBreaker, timeLimiter.orElse(null), tags);
}
else {
if (executorService != null) {
Supplier<Future<T>> futureSupplier = () -> executorService.submit(toRun::get);
Callable restrictedCall = TimeLimiter.decorateFutureSupplier(timeLimiter, futureSupplier);
/* conditionally wrap in time-limiter */
Callable<T> restrictedCall = timeLimiter.map(tl -> TimeLimiter.decorateFutureSupplier(tl, futureSupplier))
.orElse(() -> futureSupplier.get().get());
Callable<T> callable = io.github.resilience4j.circuitbreaker.CircuitBreaker
.decorateCallable(defaultCircuitBreaker, restrictedCall);
try {
Expand All @@ -125,4 +130,12 @@ public <T> T run(Supplier<T> toRun, Function<Throwable, T> fallback) {
}
}

private Optional<TimeLimiter> loadTimeLimiter() {
if (disableTimeLimiter) {
return Optional.empty();
}
return Optional.of(this.timeLimiterRegistry.find(this.id)
.orElseGet(() -> this.timeLimiterRegistry.find(this.groupName)
.orElseGet(() -> this.timeLimiterRegistry.timeLimiter(this.id, this.timeLimiterConfig, tags))));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,8 @@ private Resilience4JCircuitBreaker create(String id, String groupName,
else {
return new Resilience4JCircuitBreaker(id, groupName, circuitBreakerConfig, timeLimiterConfig,
circuitBreakerRegistry, timeLimiterRegistry, circuitBreakerExecutorService,
Optional.ofNullable(circuitBreakerCustomizers.get(id)), bulkheadProvider);
Optional.ofNullable(circuitBreakerCustomizers.get(id)), bulkheadProvider,
this.resilience4JConfigurationProperties.isDisableTimeLimiter());
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ public class Resilience4JConfigurationProperties {

private boolean disableThreadPool = false;

private boolean disableTimeLimiter = false;

public boolean isEnableGroupMeterFilter() {
return enableGroupMeterFilter;
}
Expand Down Expand Up @@ -64,4 +66,12 @@ public void setDisableThreadPool(boolean disableThreadPool) {
this.disableThreadPool = disableThreadPool;
}


boolean isDisableTimeLimiter() {
return disableTimeLimiter;
}

void setDisableTimeLimiter(boolean disableTimeLimiter) {
this.disableTimeLimiter = disableTimeLimiter;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,12 @@ private <T> Supplier<CompletionStage<T>> decorateBulkhead(final String id, final
}
}

private <T> Callable<T> decorateTimeLimiter(final Supplier<CompletionStage<T>> supplier, TimeLimiter timeLimiter) {
private static <T> Callable<T> decorateTimeLimiter(final Supplier<? extends CompletionStage<T>> supplier, TimeLimiter timeLimiter) {
final Supplier<Future<T>> futureSupplier = () -> supplier.get().toCompletableFuture();
if (timeLimiter == null) {
/* execute without time-limiter */
return () -> futureSupplier.get().get();
}
return timeLimiter.decorateFutureSupplier(futureSupplier);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public class ReactiveResilience4JAutoConfigurationWithoutMetricsTest {

static ReactiveResilience4JCircuitBreakerFactory circuitBreakerFactory = spy(
new ReactiveResilience4JCircuitBreakerFactory(CircuitBreakerRegistry.ofDefaults(),
TimeLimiterRegistry.ofDefaults()));
TimeLimiterRegistry.ofDefaults(), new Resilience4JConfigurationProperties()));

@Test
public void testWithoutMetrics() {
Expand Down
Loading

0 comments on commit 9892b11

Please sign in to comment.