diff --git a/README.md b/README.md index 06e821c..a12e968 100644 --- a/README.md +++ b/README.md @@ -59,12 +59,15 @@ build.gradle file (e.g. before `mavenCentral()`. ``` # terra-workspace-manager/build.gradle - + + // If true, search local repository (~/.m2/repository/) first for dependencies. + def useMavenLocal = true repositories { - mavenLocal() - mavenCentral() - ... - } + if (useMavenLocal) { + mavenLocal() // must be listed first to take effect + } + mavenCentral() + ... ``` That's it! Your service should pick up locally-published changes. If your changes involved bumping diff --git a/build.gradle b/build.gradle index 1f8d78b..2e3607d 100644 --- a/build.gradle +++ b/build.gradle @@ -78,7 +78,7 @@ dependencies { // Terra libraries implementation group: 'org.broadinstitute.dsde.workbench', name: 'sam-client_2.13', version: '0.1-0c4b377' - var stairwayVersion= '1.1.1-SNAPSHOT' + var stairwayVersion= '1.1.7-SNAPSHOT' api "bio.terra:stairway-gcp:${stairwayVersion}" implementation "bio.terra:stairway-azure:${stairwayVersion}" diff --git a/src/main/java/bio/terra/common/stairway/StairwayComponent.java b/src/main/java/bio/terra/common/stairway/StairwayComponent.java index e3b588b..8059f02 100644 --- a/src/main/java/bio/terra/common/stairway/StairwayComponent.java +++ b/src/main/java/bio/terra/common/stairway/StairwayComponent.java @@ -23,6 +23,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; /** A Spring Component for exposing an initialized {@link Stairway}. */ @@ -33,6 +35,7 @@ public class StairwayComponent { private final KubeService kubeService; private final KubeProperties kubeProperties; private final StairwayProperties stairwayProperties; + private final ThreadPoolTaskExecutor executor; private final AtomicReference status = new AtomicReference<>(Status.INITIALIZING); private Stairway stairway; @@ -40,10 +43,12 @@ public class StairwayComponent { public StairwayComponent( KubeService kubeService, KubeProperties kubeProperties, - StairwayProperties stairwayProperties) { + StairwayProperties stairwayProperties, + @Qualifier(StairwayProperties.STAIRWAY_EXECUTOR_BEAN_NAME) ThreadPoolTaskExecutor executor) { this.kubeService = kubeService; this.kubeProperties = kubeProperties; this.stairwayProperties = stairwayProperties; + this.executor = executor; logger.info("Creating Stairway: name: [{}]", kubeService.getPodName()); } @@ -144,7 +149,8 @@ public void initialize(StairwayOptionsBuilder initializeBuilder) { initializeBuilder.getContext()) // not necessarily a Spring ApplicationContext .stairwayName(kubeProperties.getPodName()) .workQueue(queue) - .exceptionSerializer(initializeBuilder.getExceptionSerializer()); + .exceptionSerializer(initializeBuilder.getExceptionSerializer()) + .executor(executor); initializeBuilder.getHooks().forEach(builder::stairwayHook); try { this.stairway = builder.build(); diff --git a/src/main/java/bio/terra/common/stairway/StairwayProperties.java b/src/main/java/bio/terra/common/stairway/StairwayProperties.java index cc9069d..a58ee07 100644 --- a/src/main/java/bio/terra/common/stairway/StairwayProperties.java +++ b/src/main/java/bio/terra/common/stairway/StairwayProperties.java @@ -1,7 +1,10 @@ package bio.terra.common.stairway; +import bio.terra.stairway.DefaultThreadPoolTaskExecutor; import java.time.Duration; import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; /** * Properties for configuring a Stairway instance. @@ -232,4 +235,11 @@ public boolean isAzureQueueEnabled() { public void setAzureQueueEnabled(boolean azureQueueEnabled) { this.azureQueueEnabled = azureQueueEnabled; } + + public static final String STAIRWAY_EXECUTOR_BEAN_NAME = "stairwayExecutor"; + + @Bean(STAIRWAY_EXECUTOR_BEAN_NAME) + public ThreadPoolTaskExecutor stairwayExecutor() { + return new DefaultThreadPoolTaskExecutor(maxParallelFlights); + } } diff --git a/src/test/java/bio/terra/common/stairway/StairwayComponentTest.java b/src/test/java/bio/terra/common/stairway/StairwayComponentTest.java index f743888..8abe06a 100644 --- a/src/test/java/bio/terra/common/stairway/StairwayComponentTest.java +++ b/src/test/java/bio/terra/common/stairway/StairwayComponentTest.java @@ -12,6 +12,7 @@ import org.mockito.InjectMocks; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; @ExtendWith(MockitoExtension.class) class StairwayComponentTest { @@ -20,6 +21,7 @@ class StairwayComponentTest { @Mock private KubeProperties kubeProperties; private StairwayProperties stairwayProperties; @Mock private KubeService kubeService; + @Mock private ThreadPoolTaskExecutor executor; @Test void setupAzureWorkQueueTest() { @@ -28,7 +30,8 @@ void setupAzureWorkQueueTest() { "Endpoint=sb://azure-xxxx.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=xxxxx"; setProperties(connectionString, "topicName", "subscriptionName"); - stairwayComponent = new StairwayComponent(kubeService, kubeProperties, stairwayProperties); + stairwayComponent = + new StairwayComponent(kubeService, kubeProperties, stairwayProperties, executor); QueueInterface queue = stairwayComponent.setupAzureWorkQueue(); assertTrue(queue instanceof AzureServiceBusQueue); } @@ -37,7 +40,8 @@ void setupAzureWorkQueueTest() { void setupAzureWorkQueueTestConnectionStringRequired() { setProperties("", "topicName", "subscriptionName"); - stairwayComponent = new StairwayComponent(kubeService, kubeProperties, stairwayProperties); + stairwayComponent = + new StairwayComponent(kubeService, kubeProperties, stairwayProperties, executor); assertThrows(IllegalArgumentException.class, () -> stairwayComponent.setupAzureWorkQueue()); } @@ -47,7 +51,8 @@ void setupAzureWorkQueueTestTopicNameRequired() { "Endpoint=sb://azure-xxxx.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=xxxxx"; setProperties(connectionString, "", "subscriptionName"); - stairwayComponent = new StairwayComponent(kubeService, kubeProperties, stairwayProperties); + stairwayComponent = + new StairwayComponent(kubeService, kubeProperties, stairwayProperties, executor); assertThrows(IllegalArgumentException.class, () -> stairwayComponent.setupAzureWorkQueue()); } @@ -57,13 +62,15 @@ void setupAzureWorkQueueTestSubscriptionNameRequired() { "Endpoint=sb://azure-xxxx.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=xxxxx"; setProperties(connectionString, "topicName", ""); - stairwayComponent = new StairwayComponent(kubeService, kubeProperties, stairwayProperties); + stairwayComponent = + new StairwayComponent(kubeService, kubeProperties, stairwayProperties, executor); assertThrows(IllegalArgumentException.class, () -> stairwayComponent.setupAzureWorkQueue()); } @Test void setupAzureWorkQueueTestThrowsNPE() { - stairwayComponent = new StairwayComponent(kubeService, kubeProperties, stairwayProperties); + stairwayComponent = + new StairwayComponent(kubeService, kubeProperties, stairwayProperties, executor); assertThrows(NullPointerException.class, () -> stairwayComponent.setupAzureWorkQueue()); } diff --git a/src/test/java/bio/terra/common/stairway/StairwayPropertiesTest.java b/src/test/java/bio/terra/common/stairway/StairwayPropertiesTest.java new file mode 100644 index 0000000..e350972 --- /dev/null +++ b/src/test/java/bio/terra/common/stairway/StairwayPropertiesTest.java @@ -0,0 +1,55 @@ +package bio.terra.common.stairway; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.instanceOf; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import bio.terra.stairway.DefaultThreadPoolTaskExecutor; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; + +@Tag("unit") +public class StairwayPropertiesTest { + + private StairwayProperties properties; + + @BeforeEach + void beforeEach() { + this.properties = new StairwayProperties(); + } + + @Test + void testStairwayProperties_executor_maxParallelFlightsUnspecified() { + assertThat( + "maxParallelFlights are 0 when unspecified", + properties.getMaxParallelFlights(), + equalTo(0)); + + ThreadPoolTaskExecutor executor = properties.stairwayExecutor(); + assertThat(executor, instanceOf(DefaultThreadPoolTaskExecutor.class)); + assertThat( + "DefaultThreadPoolTaskExecutor overrides invalid pool size", + executor.getMaxPoolSize(), + greaterThan(0)); + assertTrue(executor.isRunning()); + } + + @Test + void testStairwayProperties_executor_maxParallelFlightsSpecified() { + int maxParallelFlights = 24; + properties.setMaxParallelFlights(maxParallelFlights); + assertThat(properties.getMaxParallelFlights(), equalTo(maxParallelFlights)); + + ThreadPoolTaskExecutor executor = properties.stairwayExecutor(); + assertThat(executor, instanceOf(DefaultThreadPoolTaskExecutor.class)); + assertThat( + "DefaultThreadPoolTaskExecutor honors valid pool size", + executor.getMaxPoolSize(), + equalTo(maxParallelFlights)); + assertTrue(executor.isRunning()); + } +}