Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DCJ-457] Register a ThreadPoolTaskExecutor for Stairway as a Bean #187

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,15 @@ build.gradle file (e.g. before `mavenCentral()`.

```
# terra-workspace-manager/build.gradle


// If true, search local repository (~/.m2/repository/) first for dependencies.
def useMavenLocal = true
repositories {
mavenLocal()
mavenCentral()
...
}
if (useMavenLocal) {
mavenLocal() // must be listed first to take effect
}
mavenCentral()
...
```

That's it! Your service should pick up locally-published changes. If your changes involved bumping
Expand Down
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ dependencies {

// Terra libraries
implementation group: 'org.broadinstitute.dsde.workbench', name: 'sam-client_2.13', version: '0.1-0c4b377'
var stairwayVersion= '1.1.1-SNAPSHOT'
var stairwayVersion= '1.1.7-SNAPSHOT'
api "bio.terra:stairway-gcp:${stairwayVersion}"
implementation "bio.terra:stairway-azure:${stairwayVersion}"

Expand Down
10 changes: 8 additions & 2 deletions src/main/java/bio/terra/common/stairway/StairwayComponent.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;

/** A Spring Component for exposing an initialized {@link Stairway}. */
Expand All @@ -33,17 +35,20 @@ public class StairwayComponent {
private final KubeService kubeService;
private final KubeProperties kubeProperties;
private final StairwayProperties stairwayProperties;
private final ThreadPoolTaskExecutor executor;
private final AtomicReference<Status> status = new AtomicReference<>(Status.INITIALIZING);
private Stairway stairway;

@Autowired
public StairwayComponent(
KubeService kubeService,
KubeProperties kubeProperties,
StairwayProperties stairwayProperties) {
StairwayProperties stairwayProperties,
@Qualifier(StairwayProperties.STAIRWAY_EXECUTOR_BEAN_NAME) ThreadPoolTaskExecutor executor) {
this.kubeService = kubeService;
this.kubeProperties = kubeProperties;
this.stairwayProperties = stairwayProperties;
this.executor = executor;
logger.info("Creating Stairway: name: [{}]", kubeService.getPodName());
}

Expand Down Expand Up @@ -144,7 +149,8 @@ public void initialize(StairwayOptionsBuilder initializeBuilder) {
initializeBuilder.getContext()) // not necessarily a Spring ApplicationContext
.stairwayName(kubeProperties.getPodName())
.workQueue(queue)
.exceptionSerializer(initializeBuilder.getExceptionSerializer());
.exceptionSerializer(initializeBuilder.getExceptionSerializer())
.executor(executor);
initializeBuilder.getHooks().forEach(builder::stairwayHook);
try {
this.stairway = builder.build();
Expand Down
10 changes: 10 additions & 0 deletions src/main/java/bio/terra/common/stairway/StairwayProperties.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package bio.terra.common.stairway;

import bio.terra.stairway.DefaultThreadPoolTaskExecutor;
import java.time.Duration;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

/**
* Properties for configuring a Stairway instance.
Expand Down Expand Up @@ -232,4 +235,11 @@ public boolean isAzureQueueEnabled() {
public void setAzureQueueEnabled(boolean azureQueueEnabled) {
this.azureQueueEnabled = azureQueueEnabled;
}

public static final String STAIRWAY_EXECUTOR_BEAN_NAME = "stairwayExecutor";

@Bean(STAIRWAY_EXECUTOR_BEAN_NAME)
public ThreadPoolTaskExecutor stairwayExecutor() {
return new DefaultThreadPoolTaskExecutor(maxParallelFlights);
}
}
17 changes: 12 additions & 5 deletions src/test/java/bio/terra/common/stairway/StairwayComponentTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

@ExtendWith(MockitoExtension.class)
class StairwayComponentTest {
Expand All @@ -20,6 +21,7 @@ class StairwayComponentTest {
@Mock private KubeProperties kubeProperties;
private StairwayProperties stairwayProperties;
@Mock private KubeService kubeService;
@Mock private ThreadPoolTaskExecutor executor;

@Test
void setupAzureWorkQueueTest() {
Expand All @@ -28,7 +30,8 @@ void setupAzureWorkQueueTest() {
"Endpoint=sb://azure-xxxx.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=xxxxx";
setProperties(connectionString, "topicName", "subscriptionName");

stairwayComponent = new StairwayComponent(kubeService, kubeProperties, stairwayProperties);
stairwayComponent =
new StairwayComponent(kubeService, kubeProperties, stairwayProperties, executor);
QueueInterface queue = stairwayComponent.setupAzureWorkQueue();
assertTrue(queue instanceof AzureServiceBusQueue);
}
Expand All @@ -37,7 +40,8 @@ void setupAzureWorkQueueTest() {
void setupAzureWorkQueueTestConnectionStringRequired() {
setProperties("", "topicName", "subscriptionName");

stairwayComponent = new StairwayComponent(kubeService, kubeProperties, stairwayProperties);
stairwayComponent =
new StairwayComponent(kubeService, kubeProperties, stairwayProperties, executor);
assertThrows(IllegalArgumentException.class, () -> stairwayComponent.setupAzureWorkQueue());
}

Expand All @@ -47,7 +51,8 @@ void setupAzureWorkQueueTestTopicNameRequired() {
"Endpoint=sb://azure-xxxx.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=xxxxx";
setProperties(connectionString, "", "subscriptionName");

stairwayComponent = new StairwayComponent(kubeService, kubeProperties, stairwayProperties);
stairwayComponent =
new StairwayComponent(kubeService, kubeProperties, stairwayProperties, executor);
assertThrows(IllegalArgumentException.class, () -> stairwayComponent.setupAzureWorkQueue());
}

Expand All @@ -57,13 +62,15 @@ void setupAzureWorkQueueTestSubscriptionNameRequired() {
"Endpoint=sb://azure-xxxx.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=xxxxx";
setProperties(connectionString, "topicName", "");

stairwayComponent = new StairwayComponent(kubeService, kubeProperties, stairwayProperties);
stairwayComponent =
new StairwayComponent(kubeService, kubeProperties, stairwayProperties, executor);
assertThrows(IllegalArgumentException.class, () -> stairwayComponent.setupAzureWorkQueue());
}

@Test
void setupAzureWorkQueueTestThrowsNPE() {
stairwayComponent = new StairwayComponent(kubeService, kubeProperties, stairwayProperties);
stairwayComponent =
new StairwayComponent(kubeService, kubeProperties, stairwayProperties, executor);
assertThrows(NullPointerException.class, () -> stairwayComponent.setupAzureWorkQueue());
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package bio.terra.common.stairway;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.instanceOf;
import static org.junit.jupiter.api.Assertions.assertTrue;

import bio.terra.stairway.DefaultThreadPoolTaskExecutor;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

@Tag("unit")
public class StairwayPropertiesTest {

private StairwayProperties properties;

@BeforeEach
void beforeEach() {
this.properties = new StairwayProperties();
}

@Test
void testStairwayProperties_executor_maxParallelFlightsUnspecified() {
assertThat(
"maxParallelFlights are 0 when unspecified",
properties.getMaxParallelFlights(),
equalTo(0));

ThreadPoolTaskExecutor executor = properties.stairwayExecutor();
assertThat(executor, instanceOf(DefaultThreadPoolTaskExecutor.class));
assertThat(
"DefaultThreadPoolTaskExecutor overrides invalid pool size",
executor.getMaxPoolSize(),
greaterThan(0));
assertTrue(executor.isRunning());
}

@Test
void testStairwayProperties_executor_maxParallelFlightsSpecified() {
int maxParallelFlights = 24;
properties.setMaxParallelFlights(maxParallelFlights);
assertThat(properties.getMaxParallelFlights(), equalTo(maxParallelFlights));

ThreadPoolTaskExecutor executor = properties.stairwayExecutor();
assertThat(executor, instanceOf(DefaultThreadPoolTaskExecutor.class));
assertThat(
"DefaultThreadPoolTaskExecutor honors valid pool size",
executor.getMaxPoolSize(),
equalTo(maxParallelFlights));
assertTrue(executor.isRunning());
}
}
Loading