diff --git a/service/application/src/main/java/com/solace/maas/ep/event/management/agent/command/CommandManager.java b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/command/CommandManager.java index 8d111c927..ea5e9dc23 100644 --- a/service/application/src/main/java/com/solace/maas/ep/event/management/agent/command/CommandManager.java +++ b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/command/CommandManager.java @@ -12,14 +12,17 @@ import com.solace.maas.ep.event.management.agent.plugin.solace.processor.semp.SolaceHttpSemp; import com.solace.maas.ep.event.management.agent.plugin.terraform.manager.TerraformManager; import com.solace.maas.ep.event.management.agent.publisher.CommandPublisher; +import com.solace.maas.ep.event.management.agent.util.MdcTaskDecorator; import lombok.extern.slf4j.Slf4j; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Service; import java.time.OffsetDateTime; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; import static com.solace.maas.ep.event.management.agent.constants.Command.COMMAND_CORRELATION_ID; import static com.solace.maas.ep.event.management.agent.plugin.terraform.manager.TerraformManager.LOG_LEVEL_ERROR; @@ -34,6 +37,7 @@ public class CommandManager { private final CommandPublisher commandPublisher; private final MessagingServiceDelegateService messagingServiceDelegateService; private final EventPortalProperties eventPortalProperties; + private final ThreadPoolTaskExecutor configPushPool; public CommandManager(TerraformManager terraformManager, CommandMapper commandMapper, CommandPublisher commandPublisher, MessagingServiceDelegateService messagingServiceDelegateService, @@ -43,9 +47,28 @@ public CommandManager(TerraformManager terraformManager, CommandMapper commandMa this.commandPublisher = commandPublisher; this.messagingServiceDelegateService = messagingServiceDelegateService; this.eventPortalProperties = eventPortalProperties; + configPushPool = new ThreadPoolTaskExecutor(); + configPushPool.setCorePoolSize(eventPortalProperties.getCommandThreadPoolMinSize()); + configPushPool.setMaxPoolSize(eventPortalProperties.getCommandThreadPoolMaxSize()); + configPushPool.setQueueCapacity(eventPortalProperties.getCommandThreadPoolQueueSize()); + configPushPool.setThreadNamePrefix("config-push-pool-"); + configPushPool.setTaskDecorator(new MdcTaskDecorator()); + configPushPool.initialize(); } public void execute(CommandMessage request) { + + CompletableFuture.runAsync(() -> configPush(request), configPushPool) + .exceptionally(e -> { + log.error("Error getting terraform variables", e); + Command firstCommand = request.getCommandBundles().get(0).getCommands().get(0); + setCommandError(firstCommand, (Exception) e); + sendResponse(request); + return null; + }); + } + + public void configPush(CommandMessage request) { Map envVars; try { envVars = setBrokerSpecificEnvVars(request.getServiceId()); diff --git a/service/application/src/main/java/com/solace/maas/ep/event/management/agent/config/eventPortal/EventPortalProperties.java b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/config/eventPortal/EventPortalProperties.java index a2976b96b..e7cba43da 100644 --- a/service/application/src/main/java/com/solace/maas/ep/event/management/agent/config/eventPortal/EventPortalProperties.java +++ b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/config/eventPortal/EventPortalProperties.java @@ -18,6 +18,10 @@ public class EventPortalProperties { private String topicPrefix; + private int commandThreadPoolMinSize = 5; + private int commandThreadPoolMaxSize = 10; + private int commandThreadPoolQueueSize = 1_000; + private GatewayProperties gateway = new GatewayProperties("standalone", "standalone", new GatewayMessagingProperties(true, false, List.of())); } diff --git a/service/application/src/main/java/com/solace/maas/ep/event/management/agent/util/MdcTaskDecorator.java b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/util/MdcTaskDecorator.java new file mode 100644 index 000000000..334f453fc --- /dev/null +++ b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/util/MdcTaskDecorator.java @@ -0,0 +1,25 @@ +package com.solace.maas.ep.event.management.agent.util; + +import org.slf4j.MDC; +import org.springframework.core.task.TaskDecorator; + +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +public class MdcTaskDecorator implements TaskDecorator { + + @Override + public Runnable decorate(Runnable runnable) { + Map contextMap = (Map) Optional.ofNullable(MDC.getCopyOfContextMap()).orElse(new HashMap()); + return () -> { + try { + MDC.setContextMap(contextMap); + runnable.run(); + } finally { + MDC.clear(); + } + + }; + } +} diff --git a/service/application/src/main/resources/application-TEST.yml b/service/application/src/main/resources/application-TEST.yml index 55e3ed2ca..df723660c 100644 --- a/service/application/src/main/resources/application-TEST.yml +++ b/service/application/src/main/resources/application-TEST.yml @@ -24,6 +24,9 @@ eventPortal: runtimeAgentId: ${EP_RUNTIME_AGENT_ID:1234567} organizationId: ${EP_ORGANIZATION_ID:myOrg123} topicPrefix: ${EP_TOPIC_PREFIX:sc/ep/runtime} + commandThreadPoolMinSize: 5 + commandThreadPoolMaxSize: 5 + commandThreadPoolQueueSize: 10 gateway: id: decal5 name: evmr1 diff --git a/service/application/src/test/java/com/solace/maas/ep/event/management/agent/commandManager/CommandManagerTests.java b/service/application/src/test/java/com/solace/maas/ep/event/management/agent/commandManager/CommandManagerTests.java index 68d003ba6..c8bea2e91 100644 --- a/service/application/src/test/java/com/solace/maas/ep/event/management/agent/commandManager/CommandManagerTests.java +++ b/service/application/src/test/java/com/solace/maas/ep/event/management/agent/commandManager/CommandManagerTests.java @@ -14,20 +14,32 @@ import com.solace.maas.ep.event.management.agent.plugin.solace.processor.semp.SolaceHttpSemp; import com.solace.maas.ep.event.management.agent.plugin.terraform.manager.TerraformManager; import com.solace.maas.ep.event.management.agent.publisher.CommandPublisher; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; +import org.mockito.invocation.Invocation; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.test.context.ActiveProfiles; +import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.stream.IntStream; import static com.solace.maas.ep.event.management.agent.constants.Command.COMMAND_CORRELATION_ID; import static com.solace.maas.ep.event.management.agent.plugin.mop.MOPMessageType.generic; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.reset; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -38,42 +50,99 @@ public class CommandManagerTests { @Autowired - CommandManager commandManager; + private CommandManager commandManager; @Autowired - TerraformManager terraformManager; + private TerraformManager terraformManager; @Autowired - CommandPublisher commandPublisher; + private CommandPublisher commandPublisher; @Autowired - MessagingServiceDelegateService messagingServiceDelegateService; + private MessagingServiceDelegateService messagingServiceDelegateService; @Autowired - EventPortalProperties eventPortalProperties; + private EventPortalProperties eventPortalProperties; + + private final static ThreadPoolTaskExecutor testThreadPool = new ThreadPoolTaskExecutor(); + + @BeforeEach + public void cleanup() { + reset(terraformManager); + reset(commandPublisher); + } + + @Test + public void testMultiThreadedCommandManager() throws InterruptedException { + + // Set up the thread pool + int commandThreadPoolQueueSize = eventPortalProperties.getCommandThreadPoolQueueSize(); + testThreadPool.setCorePoolSize(commandThreadPoolQueueSize); + testThreadPool.initialize(); + + + // Build enough requests to fill the command thread pool queue + List messageList = new ArrayList<>(); + for (int i = 0; i < commandThreadPoolQueueSize; i++) { + messageList.add(getCommandMessage(Integer.toString(i))); + } + + doNothing().when(commandPublisher).sendCommandResponse(any(), any()); + doAnswer(invocation -> { + // Simulate the time spent for a SEMP command to complete + TimeUnit.SECONDS.sleep(1); + return null; + }).when(terraformManager).execute(any(), any(), any()); + + ArgumentCaptor> topicArgCaptor = ArgumentCaptor.forClass(Map.class); + + when(messagingServiceDelegateService.getMessagingServiceClient(any())).thenReturn( + new SolaceHttpSemp(SempClient.builder() + .username("myUsername") + .password("myPassword") + .connectionUrl("myConnectionUrl") + .build())); + + // Execute all the commands in parallel to fill the command thread pool queue + IntStream.rangeClosed(1, commandThreadPoolQueueSize).parallel().forEach(i -> + CompletableFuture.runAsync(() -> commandManager.execute(messageList.get(i - 1)), testThreadPool)); + + // Wait for all the threads to complete (add a timeout just in case) + Collection invocations = Mockito.mockingDetails(commandPublisher).getInvocations(); + long timeout = System.currentTimeMillis() + 10_000; + while (invocations.size() < commandThreadPoolQueueSize && System.currentTimeMillis() < timeout) { + TimeUnit.MILLISECONDS.sleep(500); + invocations = Mockito.mockingDetails(commandPublisher).getInvocations(); + } + + // Verify terraform manager is called + ArgumentCaptor> envArgCaptor = ArgumentCaptor.forClass(Map.class); + verify(terraformManager, times(commandThreadPoolQueueSize)).execute(any(), any(), envArgCaptor.capture()); + + // Verify the env vars are set with the terraform manager is called + Map envVars = envArgCaptor.getValue(); + assert envVars.get("TF_VAR_password").equals("myPassword"); + assert envVars.get("TF_VAR_username").equals("myUsername"); + assert envVars.get("TF_VAR_url").equals("myConnectionUrl"); + + ArgumentCaptor messageArgCaptor = ArgumentCaptor.forClass(CommandMessage.class); + verify(commandPublisher, times(commandThreadPoolQueueSize)).sendCommandResponse(messageArgCaptor.capture(), topicArgCaptor.capture()); + + Map topicVars = topicArgCaptor.getValue(); + assert topicVars.get("orgId").equals(eventPortalProperties.getOrganizationId()); + assert topicVars.get("runtimeAgentId").equals(eventPortalProperties.getRuntimeAgentId()); + + // Make sure we get all 10 correlation ids in the response messages + List receivedCorrelationIds = messageArgCaptor.getAllValues().stream().map(CommandMessage::getCommandCorrelationId).toList(); + List expectedCorrelationIds = IntStream.range(0, commandThreadPoolQueueSize).mapToObj(i -> "myCorrelationId" + i).toList(); + assertTrue(receivedCorrelationIds.size() == expectedCorrelationIds.size() && + receivedCorrelationIds.containsAll(expectedCorrelationIds) && expectedCorrelationIds.containsAll(receivedCorrelationIds)); + } @Test public void testCommandManager() { // Create a command request message - CommandMessage message = new CommandMessage(); - message.setOrigType(MOPSvcType.maasEventMgmt); - message.withMessageType(generic); - message.setContext("abc"); - message.setActorId("myActorId"); - message.setOrgId(eventPortalProperties.getOrganizationId()); - message.setTraceId("myTraceId"); - message.setCommandCorrelationId("myCorrelationId"); - message.setCommandBundles(List.of( - CommandBundle.builder() - .executionType(ExecutionType.serial) - .exitOnFailure(false) - .commands(List.of( - Command.builder() - .commandType(CommandType.terraform) - .body("asdfasdfadsf") - .command("apply") - .build())) - .build())); + CommandMessage message = getCommandMessage("1"); doNothing().when(terraformManager).execute(any(), any(), any()); @@ -85,7 +154,20 @@ public void testCommandManager() { .password("myPassword") .connectionUrl("myConnectionUrl") .build())); + commandManager.execute(message); + // Wait for all the threads to complete (add a timeout just in case) + Collection invocations = Mockito.mockingDetails(commandPublisher).getInvocations(); + long timeout = System.currentTimeMillis() + 10_000; + while (invocations.isEmpty() && System.currentTimeMillis() < timeout) { + try { + TimeUnit.MILLISECONDS.sleep(500); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + invocations = Mockito.mockingDetails(commandPublisher).getInvocations(); + } + // Verify terraform manager is called ArgumentCaptor> envArgCaptor = ArgumentCaptor.forClass(Map.class); @@ -104,4 +186,27 @@ public void testCommandManager() { assert topicVars.get("runtimeAgentId").equals(eventPortalProperties.getRuntimeAgentId()); assert topicVars.get(COMMAND_CORRELATION_ID).equals(message.getCommandCorrelationId()); } + + private CommandMessage getCommandMessage(String suffix) { + CommandMessage message = new CommandMessage(); + message.setOrigType(MOPSvcType.maasEventMgmt); + message.withMessageType(generic); + message.setContext("abc"); + message.setActorId("myActorId"); + message.setOrgId(eventPortalProperties.getOrganizationId()); + message.setTraceId("myTraceId"); + message.setCommandCorrelationId("myCorrelationId" + suffix); + message.setCommandBundles(List.of( + CommandBundle.builder() + .executionType(ExecutionType.serial) + .exitOnFailure(false) + .commands(List.of( + Command.builder() + .commandType(CommandType.terraform) + .body("asdfasdfadsf") + .command("apply") + .build())) + .build())); + return message; + } } diff --git a/service/terraform-plugin/src/test/java/com/solace/maas/ep/event/management/agent/plugin/terraform/real/TerraformClientRealTests.java b/service/terraform-plugin/src/test/java/com/solace/maas/ep/event/management/agent/plugin/terraform/real/TerraformClientRealTests.java index f46ebfc98..13cc44254 100644 --- a/service/terraform-plugin/src/test/java/com/solace/maas/ep/event/management/agent/plugin/terraform/real/TerraformClientRealTests.java +++ b/service/terraform-plugin/src/test/java/com/solace/maas/ep/event/management/agent/plugin/terraform/real/TerraformClientRealTests.java @@ -28,7 +28,9 @@ import java.util.concurrent.Future; import static java.nio.charset.StandardCharsets.UTF_8; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotSame; +import static org.junit.jupiter.api.Assertions.fail; @Slf4j @ActiveProfiles("TEST") @@ -48,7 +50,7 @@ public class TerraformClientRealTests { @Test public void planCreateNewQueue() { - executeTerraformCommand("addQueue.tf", "plan"); + executeTerraformCommand("addQueue.tf", "plan "); } @Test @@ -57,7 +59,7 @@ public void createNewQueue() { } @Test - public void create2Queues() { + public void create2DifferentQueues() { ExecutorService executorService = Executors.newFixedThreadPool(10); Future> future1 = executorService.submit(() -> executeTerraformCommand("addQueue.tf", "apply")); @@ -72,6 +74,26 @@ public void create2Queues() { } } + @Test + public void create2OfTheSameQueue() { + ExecutorService executorService = Executors.newFixedThreadPool(10); + Future> future1 = executorService.submit(() -> + executeTerraformCommand("addQueue.tf", "apply", "app123-consumer", JobStatus.success)); + Future> future2 = executorService.submit(() -> + executeTerraformCommand("addQueue.tf", "apply", "app123-consumer", JobStatus.error)); + // wait for the futures to complete + try { + List command1Bundles = future1.get(); + List command2Bundles = future2.get(); + // We expect the first one to succeed and the second one to fail + assertEquals(JobStatus.success, command1Bundles.get(0).getCommands().get(0).getResult().getStatus()); + assertEquals(JobStatus.error, command2Bundles.get(0).getCommands().get(0).getResult().getStatus()); + } catch (Exception e) { + log.error("Error waiting for futures to complete", e); + fail(); + } + } + @Test public void delete2Queues() { executeTerraformCommand("deleteQueue.tf", "apply"); @@ -141,6 +163,10 @@ private List executeTerraformCommand(String hclFileName, String t } private List executeTerraformCommand(String hclFileName, String tfVerb, String context) { + return executeTerraformCommand(hclFileName, tfVerb, context, JobStatus.success); + } + + private List executeTerraformCommand(String hclFileName, String tfVerb, String context, JobStatus expectedJobStatus) { String terraformString = asString(resourceLoader.getResource("classpath:realTfFiles" + File.separator + hclFileName)); Command commandRequest = Command.builder() @@ -168,7 +194,7 @@ private List executeTerraformCommand(String hclFileName, String t for (Command command : commandBundle.getCommands()) { CommandResult result = command.getResult(); System.out.println("Logs " + result.getLogs()); - assertNotSame(JobStatus.error, result.getStatus()); + assertEquals(expectedJobStatus, result.getStatus()); } }