Skip to content

Commit

Permalink
Merge pull request #228 from ryanjbaxter/bulkhead-configuration-fix
Browse files Browse the repository at this point in the history
Allows the ability to configure bulkheads using config properties
  • Loading branch information
ryanjbaxter authored Dec 17, 2024
2 parents b504609 + b5bfcf7 commit 9144b3a
Show file tree
Hide file tree
Showing 5 changed files with 318 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,31 @@ resilience4j.bulkhead:
maxConcurrentCalls: 10
----

For more inforamtion on the Resilience4j property configuration, see https://resilience4j.readme.io/docs/getting-started-3#configuration[Resilience4J Spring Boot 2 Configuration].
You can also provide common configuration via `resilience4j.bulkhead.configs.*` and
`resilience4j.thread-pool-bulkhead.configs.*` properties. This allows you to specify configuration
once and reuse it across multiple bulkheads.
[source,yaml]
----
resilience4j.bulkhead:
configs:
default:
queueCapacity: 5
someShared:
queueCapacity: 10
instances:
backendA:
baseConfig: default
maxConcurrentCalls: 10
backendB:
baseConfig: someShared
----

If you configure your bulkhead this way it will take lowest
priority. The priority order is:

1. `resilience4j.thread-pool-bulkhead.instances.*` or `resilience4j.bulkhead.instances.*`
2. `Customizer` configuration using `Resilience4JBulkheadProvider`.
3. `resilience4j.thread-pool-bulkhead.configs.*` or `resilience4j.bulkhead.configs.*`

For more information on the Resilience4j property configuration, see https://resilience4j.readme.io/docs/getting-started-3#configuration[Resilience4J Spring Boot 2 Configuration].

Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.springframework.cloud.circuitbreaker.resilience4j;

import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
Expand All @@ -27,13 +28,17 @@
import java.util.function.Supplier;

import io.github.resilience4j.bulkhead.Bulkhead;
import io.github.resilience4j.bulkhead.BulkheadConfig;
import io.github.resilience4j.bulkhead.BulkheadRegistry;
import io.github.resilience4j.bulkhead.ThreadPoolBulkhead;
import io.github.resilience4j.bulkhead.ThreadPoolBulkheadConfig;
import io.github.resilience4j.bulkhead.ThreadPoolBulkheadRegistry;
import io.github.resilience4j.circuitbreaker.CircuitBreaker;
import io.github.resilience4j.timelimiter.TimeLimiter;

import org.springframework.cloud.client.circuitbreaker.Customizer;
import org.springframework.lang.NonNull;
import org.springframework.util.Assert;

/**
* @author Andrii Bohutskyi
Expand Down Expand Up @@ -64,7 +69,8 @@ public Resilience4jBulkheadProvider(ThreadPoolBulkheadRegistry threadPoolBulkhea
}

public void configureDefault(
Function<String, Resilience4jBulkheadConfigurationBuilder.BulkheadConfiguration> defaultConfiguration) {
@NonNull Function<String, Resilience4jBulkheadConfigurationBuilder.BulkheadConfiguration> defaultConfiguration) {
Assert.notNull(defaultConfiguration, "Default configuration must not be null");
this.defaultConfiguration = defaultConfiguration;
}

Expand Down Expand Up @@ -119,8 +125,12 @@ public <T> T run(String id, Supplier<T> toRun, Function<Throwable, T> fallback,

private <T> Supplier<CompletionStage<T>> decorateBulkhead(final String id, final Map<String, String> tags,
final Supplier<T> supplier) {
// If the configuration was supplied via a customizer use that configuration, else
// check if the configuration is present
// in the registries, and if its not present in either place, use the default
// configuration
Resilience4jBulkheadConfigurationBuilder.BulkheadConfiguration configuration = configurations
.computeIfAbsent(id, defaultConfiguration);
.computeIfAbsent(id, this::getConfiguration);

if (useSemaphoreBulkhead(id)) {
Bulkhead bulkhead = bulkheadRegistry.bulkhead(id, configuration.getBulkheadConfig(), tags);
Expand All @@ -134,10 +144,22 @@ private <T> Supplier<CompletionStage<T>> decorateBulkhead(final String id, final
}
}

private Resilience4jBulkheadConfigurationBuilder.BulkheadConfiguration getConfiguration(String id) {
Resilience4jBulkheadConfigurationBuilder builder = new Resilience4jBulkheadConfigurationBuilder();
Resilience4jBulkheadConfigurationBuilder.BulkheadConfiguration defaultConfiguration = this.defaultConfiguration
.apply(id);
Optional<BulkheadConfig> bulkheadConfiguration = bulkheadRegistry.getConfiguration(id);
Optional<ThreadPoolBulkheadConfig> threadPoolBulkheadConfig = threadPoolBulkheadRegistry.getConfiguration(id);
builder.bulkheadConfig(bulkheadConfiguration.orElse(defaultConfiguration.getBulkheadConfig()));
builder.threadPoolBulkheadConfig(
threadPoolBulkheadConfig.orElse(defaultConfiguration.getThreadPoolBulkheadConfig()));
return builder.build();
}

public <T> Callable<T> decorateCallable(final String id, final Map<String, String> tags,
final Callable<T> callable) {
Resilience4jBulkheadConfigurationBuilder.BulkheadConfiguration configuration = configurations
.computeIfAbsent(id, defaultConfiguration);
.computeIfAbsent(id, this::getConfiguration);

if (useSemaphoreBulkhead(id)) {
Bulkhead bulkhead = bulkheadRegistry.bulkhead(id, configuration.getBulkheadConfig(), tags);
Expand All @@ -151,8 +173,25 @@ public <T> Callable<T> decorateCallable(final String id, final Map<String, Strin
}

private boolean useSemaphoreBulkhead(String id) {
return semaphoreDefaultBulkhead
|| (bulkheadRegistry.find(id).isPresent() && threadPoolBulkheadRegistry.find(id).isEmpty());
// If we find a configuration in the threadPoolBulkheadRegistry, we assume the
// user configured the bulkhead specifically to
// use a threadpool so regardless of what
// spring.cloud.circuitbreaker.resilience4j.enableSemaphoreDefaultBulkhead is set
// to
// we will use a threadpool for the bulkhead
if (threadPoolBulkheadRegistry.find(id).isPresent()) {
return false;
}
// If we did not find a configuration in the threadPoolBulkheadRegistry, then use
// a semaphore bulkhead if
// spring.cloud.circuitbreaker.resilience4j.enableSemaphoreDefaultBulkhead is set
// to true or if we find a configuration in the
// bulkheadRegistry. We will return false if
// spring.cloud.circuitbreaker.resilience4j.enableSemaphoreDefaultBulkhead is
// false and we don't
// find a configuration in the bulkheadRegistry or the threadPoolBulkheadRegistry
// and this will result in the bulkhead using a threadpool
return semaphoreDefaultBulkhead || bulkheadRegistry.find(id).isPresent();
}

private static <T> Callable<T> decorateTimeLimiter(final Supplier<? extends CompletionStage<T>> supplier,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,239 @@
/*
* Copyright 2013-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.cloud.circuitbreaker.resilience4j;

import io.github.resilience4j.bulkhead.BulkheadConfig;
import io.github.resilience4j.bulkhead.BulkheadRegistry;
import io.github.resilience4j.bulkhead.ThreadPoolBulkhead;
import io.github.resilience4j.bulkhead.ThreadPoolBulkheadConfig;
import io.github.resilience4j.bulkhead.ThreadPoolBulkheadRegistry;
import org.junit.jupiter.api.Test;

import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.test.context.runner.WebApplicationContextRunner;
import org.springframework.context.annotation.Configuration;

import static org.assertj.core.api.Assertions.assertThat;

/**
* @author Ryan Baxter
*/
public class BulkheadConfigurationTests {

@Test
void testConfigurationWithConfigProperties() {
new WebApplicationContextRunner().withUserConfiguration(Application.class)
.withPropertyValues("resilience4j.threadPoolBulkHead.configs.testme.queueCapacity=30")
.run(context -> {
final String id = "testme";
Resilience4JCircuitBreakerFactory resilience4JCircuitBreakerFactory = context
.getBean(Resilience4JCircuitBreakerFactory.class);
ThreadPoolBulkheadRegistry threadPoolBulkheadRegistry = context
.getBean(ThreadPoolBulkheadRegistry.class);
final var circuitBreaker = resilience4JCircuitBreakerFactory.create(id);
circuitBreaker.run(() -> "run-to-populate-registry");
final var threadPoolBulkheadConfigOptional = threadPoolBulkheadRegistry.find(id)
.map(ThreadPoolBulkhead::getBulkheadConfig);
assertThat(threadPoolBulkheadConfigOptional.map(ThreadPoolBulkheadConfig::getQueueCapacity))
.contains(30);
});

}

@Test
void testInstanceConfigurationOverridesConfigProperties() {
new WebApplicationContextRunner().withUserConfiguration(Application.class)
.withPropertyValues("resilience4j.threadPoolBulkHead.configs.testme.queueCapacity=30",
"resilience4j.threadPoolBulkHead.instances.testme.queueCapacity=40")
.run(context -> {
final String id = "testme";
Resilience4JCircuitBreakerFactory resilience4JCircuitBreakerFactory = context
.getBean(Resilience4JCircuitBreakerFactory.class);
ThreadPoolBulkheadRegistry threadPoolBulkheadRegistry = context
.getBean(ThreadPoolBulkheadRegistry.class);
final var circuitBreaker = resilience4JCircuitBreakerFactory.create(id);
circuitBreaker.run(() -> "run-to-populate-registry");
final var threadPoolBulkheadConfigOptional = threadPoolBulkheadRegistry.find(id)
.map(ThreadPoolBulkhead::getBulkheadConfig);
assertThat(threadPoolBulkheadConfigOptional.map(ThreadPoolBulkheadConfig::getQueueCapacity))
.contains(40);
});

}

@Test
void testInstanceConfigurationOverridesConfigAndCustomizerProperties() {
new WebApplicationContextRunner().withUserConfiguration(Application.class)
.withPropertyValues("resilience4j.threadPoolBulkHead.configs.testme.queueCapacity=30",
"resilience4j.threadPoolBulkHead.instances.testme.queueCapacity=40")
.run(context -> {
final String id = "testme";
Resilience4JCircuitBreakerFactory resilience4JCircuitBreakerFactory = context
.getBean(Resilience4JCircuitBreakerFactory.class);
Resilience4jBulkheadProvider bulkheadProvider = context.getBean(Resilience4jBulkheadProvider.class);
bulkheadProvider.configure(builder -> {
ThreadPoolBulkheadConfig threadPoolBulkheadConfig = ThreadPoolBulkheadConfig.custom()
.queueCapacity(50)
.build();
builder.threadPoolBulkheadConfig(threadPoolBulkheadConfig);
}, id);
ThreadPoolBulkheadRegistry threadPoolBulkheadRegistry = context
.getBean(ThreadPoolBulkheadRegistry.class);
final var circuitBreaker = resilience4JCircuitBreakerFactory.create(id);
circuitBreaker.run(() -> "run-to-populate-registry");
final var threadPoolBulkheadConfigOptional = threadPoolBulkheadRegistry.find(id)
.map(ThreadPoolBulkhead::getBulkheadConfig);
assertThat(threadPoolBulkheadConfigOptional.map(ThreadPoolBulkheadConfig::getQueueCapacity))
.contains(40);
});

}

@Test
void testCustomizerConfigurationOverridesConfigProperties() {
new WebApplicationContextRunner().withUserConfiguration(Application.class)
.withPropertyValues("resilience4j.threadPoolBulkHead.configs.testme.queueCapacity=30")
.run(context -> {
final String id = "testme";
Resilience4JCircuitBreakerFactory resilience4JCircuitBreakerFactory = context
.getBean(Resilience4JCircuitBreakerFactory.class);
Resilience4jBulkheadProvider bulkheadProvider = context.getBean(Resilience4jBulkheadProvider.class);
bulkheadProvider.configure(builder -> {
ThreadPoolBulkheadConfig threadPoolBulkheadConfig = ThreadPoolBulkheadConfig.custom()
.queueCapacity(50)
.build();
builder.threadPoolBulkheadConfig(threadPoolBulkheadConfig);
}, id);
ThreadPoolBulkheadRegistry threadPoolBulkheadRegistry = context
.getBean(ThreadPoolBulkheadRegistry.class);
final var circuitBreaker = resilience4JCircuitBreakerFactory.create(id);
circuitBreaker.run(() -> "run-to-populate-registry");
final var threadPoolBulkheadConfigOptional = threadPoolBulkheadRegistry.find(id)
.map(ThreadPoolBulkhead::getBulkheadConfig);
assertThat(threadPoolBulkheadConfigOptional.map(ThreadPoolBulkheadConfig::getQueueCapacity))
.contains(50);
});

}

@Test
void useThreadPoolWithSemaphorePropertySet() {
new WebApplicationContextRunner().withUserConfiguration(Application.class)
.withPropertyValues("resilience4j.threadPoolBulkHead.instances.testme.queueCapacity=30",
"spring.cloud.circuitbreaker.resilience4j.enableSemaphoreDefaultBulkhead=true")
.run(context -> {
final String id = "testme";
Resilience4JCircuitBreakerFactory resilience4JCircuitBreakerFactory = context
.getBean(Resilience4JCircuitBreakerFactory.class);
ThreadPoolBulkheadRegistry threadPoolBulkheadRegistry = context
.getBean(ThreadPoolBulkheadRegistry.class);
BulkheadRegistry bulkheadRegistry = context.getBean(BulkheadRegistry.class);
final var circuitBreaker = resilience4JCircuitBreakerFactory.create(id);
circuitBreaker.run(() -> "run-to-populate-registry");
final var threadPoolBulkheadConfigOptional = threadPoolBulkheadRegistry.find(id);
final var semaphoreBulkheadConfig = bulkheadRegistry.find(id);
assertThat(threadPoolBulkheadConfigOptional.isEmpty()).isFalse();
assertThat(semaphoreBulkheadConfig.isEmpty()).isTrue();
});

}

@Test
void useSemaphoreWithoutPropertySet() {
new WebApplicationContextRunner().withUserConfiguration(Application.class)
.withPropertyValues("resilience4j.bulkhead.instances.testme.max-concurrent-calls=30")
.run(context -> {
final String id = "testme";
Resilience4JCircuitBreakerFactory resilience4JCircuitBreakerFactory = context
.getBean(Resilience4JCircuitBreakerFactory.class);
ThreadPoolBulkheadRegistry threadPoolBulkheadRegistry = context
.getBean(ThreadPoolBulkheadRegistry.class);
BulkheadRegistry bulkheadRegistry = context.getBean(BulkheadRegistry.class);
final var circuitBreaker = resilience4JCircuitBreakerFactory.create(id);
circuitBreaker.run(() -> "run-to-populate-registry");
final var threadPoolBulkheadConfigOptional = threadPoolBulkheadRegistry.find(id);
final var semaphoreBulkheadConfig = bulkheadRegistry.find(id);
assertThat(threadPoolBulkheadConfigOptional.isEmpty()).isTrue();
assertThat(semaphoreBulkheadConfig.isEmpty()).isFalse();
});

}

@Test
void configureDefaultOverridesPropertyDefaultForThreadpool() {
new WebApplicationContextRunner().withUserConfiguration(Application.class)
.withPropertyValues("resilience4j.bulkhead.config.default.max-concurrent-calls=30",
"resilience4j.threadpool.config.default.queueCapacity=30"/*
* ,
* "resilience4j.bulkhead.instances.testme.max-wait-duration=30s",
* "resilience4j.threadPoolBulkHead.instances.testme.core-threadpool-size=1"
*/)
.run(context -> {
final String id = "testme";
Resilience4JCircuitBreakerFactory resilience4JCircuitBreakerFactory = context
.getBean(Resilience4JCircuitBreakerFactory.class);
Resilience4jBulkheadProvider bulkheadProvider = context.getBean(Resilience4jBulkheadProvider.class);
bulkheadProvider.configureDefault(bulkheadId -> new Resilience4jBulkheadConfigurationBuilder()
.bulkheadConfig(BulkheadConfig.custom().maxConcurrentCalls(40).build())
.threadPoolBulkheadConfig(ThreadPoolBulkheadConfig.custom().queueCapacity(40).build())
.build());
ThreadPoolBulkheadRegistry threadPoolBulkheadRegistry = context
.getBean(ThreadPoolBulkheadRegistry.class);
final var circuitBreaker = resilience4JCircuitBreakerFactory.create(id);
circuitBreaker.run(() -> "run-to-populate-registry");
final var threadPoolBulkheadConfigOptional = threadPoolBulkheadRegistry.find(id);
assertThat(threadPoolBulkheadConfigOptional.isEmpty()).isFalse();
assertThat(threadPoolBulkheadConfigOptional.get().getBulkheadConfig().getQueueCapacity()).isEqualTo(40);
});

}

@Test
void configureDefaultOverridesPropertyDefaultForSemaphore() {
new WebApplicationContextRunner().withUserConfiguration(Application.class)
.withPropertyValues("resilience4j.bulkhead.config.default.max-concurrent-calls=30",
"spring.cloud.circuitbreaker.resilience4j.enableSemaphoreDefaultBulkhead=true"/*
* ,
* "resilience4j.bulkhead.instances.testme.max-wait-duration=30s",
* "resilience4j.threadPoolBulkHead.instances.testme.core-threadpool-size=1"
*/)
.run(context -> {
final String id = "testme";
Resilience4JCircuitBreakerFactory resilience4JCircuitBreakerFactory = context
.getBean(Resilience4JCircuitBreakerFactory.class);
Resilience4jBulkheadProvider bulkheadProvider = context.getBean(Resilience4jBulkheadProvider.class);
bulkheadProvider.configureDefault(bulkheadId -> new Resilience4jBulkheadConfigurationBuilder()
.bulkheadConfig(BulkheadConfig.custom().maxConcurrentCalls(40).build())
.threadPoolBulkheadConfig(ThreadPoolBulkheadConfig.custom().queueCapacity(40).build())
.build());
BulkheadRegistry bulkheadRegistry = context.getBean(BulkheadRegistry.class);
final var circuitBreaker = resilience4JCircuitBreakerFactory.create(id);
circuitBreaker.run(() -> "run-to-populate-registry");
final var semaphoreBulkheadConfig = bulkheadRegistry.find(id);
assertThat(semaphoreBulkheadConfig.isEmpty()).isFalse();
assertThat(semaphoreBulkheadConfig.get().getBulkheadConfig().getMaxConcurrentCalls()).isEqualTo(40);
});

}

@Configuration(proxyBeanMethods = false)
@EnableAutoConfiguration
protected static class Application {

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
import static org.assertj.core.api.Assertions.assertThat;

/**
* Tests that time limiter threads are interrupted correctly when used with a bulkhead
* Tests that time limiter threads are interrupted correctly when used with a bulkhead.
*
* @author Renette Ros
*/
Expand All @@ -51,8 +51,14 @@
@DirtiesContext
public class Resilience4JBulkheadAndTimeLimiterIntegrationTest {

/**
* Slow bulkghead name.
*/
public static final String SLOW_BULKHEAD = "slowBulkhead";

/**
* Slow thread pool bulkhead name.
*/
public static final String SLOW_THREAD_POOL_BULKHEAD = "slowThreadPoolBulkhead";

@Autowired
Expand Down
Loading

0 comments on commit 9144b3a

Please sign in to comment.