Skip to content

Commit

Permalink
Add thread pool to command manager
Browse files Browse the repository at this point in the history
  • Loading branch information
gregmeldrum committed Dec 13, 2023
1 parent e84cbef commit 3e477b3
Show file tree
Hide file tree
Showing 6 changed files with 213 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,17 @@
import com.solace.maas.ep.event.management.agent.plugin.solace.processor.semp.SolaceHttpSemp;
import com.solace.maas.ep.event.management.agent.plugin.terraform.manager.TerraformManager;
import com.solace.maas.ep.event.management.agent.publisher.CommandPublisher;
import com.solace.maas.ep.event.management.agent.util.MdcTaskDecorator;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;

import java.time.OffsetDateTime;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;

import static com.solace.maas.ep.event.management.agent.constants.Command.COMMAND_CORRELATION_ID;
import static com.solace.maas.ep.event.management.agent.plugin.terraform.manager.TerraformManager.LOG_LEVEL_ERROR;
Expand All @@ -34,6 +37,7 @@ public class CommandManager {
private final CommandPublisher commandPublisher;
private final MessagingServiceDelegateService messagingServiceDelegateService;
private final EventPortalProperties eventPortalProperties;
private final ThreadPoolTaskExecutor configPushPool;

public CommandManager(TerraformManager terraformManager, CommandMapper commandMapper,
CommandPublisher commandPublisher, MessagingServiceDelegateService messagingServiceDelegateService,
Expand All @@ -43,9 +47,28 @@ public CommandManager(TerraformManager terraformManager, CommandMapper commandMa
this.commandPublisher = commandPublisher;
this.messagingServiceDelegateService = messagingServiceDelegateService;
this.eventPortalProperties = eventPortalProperties;
configPushPool = new ThreadPoolTaskExecutor();
configPushPool.setCorePoolSize(eventPortalProperties.getCommandThreadPoolMinSize());
configPushPool.setMaxPoolSize(eventPortalProperties.getCommandThreadPoolMaxSize());
configPushPool.setQueueCapacity(eventPortalProperties.getCommandThreadPoolQueueSize());
configPushPool.setThreadNamePrefix("config-push-pool-");
configPushPool.setTaskDecorator(new MdcTaskDecorator());
configPushPool.initialize();
}

public void execute(CommandMessage request) {

CompletableFuture.runAsync(() -> configPush(request), configPushPool)
.exceptionally(e -> {
log.error("Error getting terraform variables", e);
Command firstCommand = request.getCommandBundles().get(0).getCommands().get(0);
setCommandError(firstCommand, (Exception) e);
sendResponse(request);
return null;
});
}

public void configPush(CommandMessage request) {
Map<String, String> envVars;
try {
envVars = setBrokerSpecificEnvVars(request.getServiceId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ public class EventPortalProperties {

private String topicPrefix;

private int commandThreadPoolMinSize = 5;
private int commandThreadPoolMaxSize = 10;
private int commandThreadPoolQueueSize = 1_000;

private GatewayProperties gateway
= new GatewayProperties("standalone", "standalone", new GatewayMessagingProperties(true, false, List.of()));
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package com.solace.maas.ep.event.management.agent.util;

import org.slf4j.MDC;
import org.springframework.core.task.TaskDecorator;

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;

public class MdcTaskDecorator implements TaskDecorator {

@Override
public Runnable decorate(Runnable runnable) {
Map<String, String> contextMap = (Map) Optional.ofNullable(MDC.getCopyOfContextMap()).orElse(new HashMap());
return () -> {
try {
MDC.setContextMap(contextMap);
runnable.run();
} finally {
MDC.clear();
}

};
}
}
3 changes: 3 additions & 0 deletions service/application/src/main/resources/application-TEST.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ eventPortal:
runtimeAgentId: ${EP_RUNTIME_AGENT_ID:1234567}
organizationId: ${EP_ORGANIZATION_ID:myOrg123}
topicPrefix: ${EP_TOPIC_PREFIX:sc/ep/runtime}
commandThreadPoolMinSize: 5
commandThreadPoolMaxSize: 5
commandThreadPoolQueueSize: 10
gateway:
id: decal5
name: evmr1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,32 @@
import com.solace.maas.ep.event.management.agent.plugin.solace.processor.semp.SolaceHttpSemp;
import com.solace.maas.ep.event.management.agent.plugin.terraform.manager.TerraformManager;
import com.solace.maas.ep.event.management.agent.publisher.CommandPublisher;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import org.mockito.invocation.Invocation;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.test.context.ActiveProfiles;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;

import static com.solace.maas.ep.event.management.agent.constants.Command.COMMAND_CORRELATION_ID;
import static com.solace.maas.ep.event.management.agent.plugin.mop.MOPMessageType.generic;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
Expand All @@ -38,42 +50,99 @@
public class CommandManagerTests {

@Autowired
CommandManager commandManager;
private CommandManager commandManager;

@Autowired
TerraformManager terraformManager;
private TerraformManager terraformManager;

@Autowired
CommandPublisher commandPublisher;
private CommandPublisher commandPublisher;

@Autowired
MessagingServiceDelegateService messagingServiceDelegateService;
private MessagingServiceDelegateService messagingServiceDelegateService;

@Autowired
EventPortalProperties eventPortalProperties;
private EventPortalProperties eventPortalProperties;

private final static ThreadPoolTaskExecutor testThreadPool = new ThreadPoolTaskExecutor();

@BeforeEach
public void cleanup() {
reset(terraformManager);
reset(commandPublisher);
}

@Test
public void testMultiThreadedCommandManager() throws InterruptedException {

// Set up the thread pool
int commandThreadPoolQueueSize = eventPortalProperties.getCommandThreadPoolQueueSize();
testThreadPool.setCorePoolSize(commandThreadPoolQueueSize);
testThreadPool.initialize();


// Build enough requests to fill the command thread pool queue
List<CommandMessage> messageList = new ArrayList<>();
for (int i = 0; i < commandThreadPoolQueueSize; i++) {
messageList.add(getCommandMessage(Integer.toString(i)));
}

doNothing().when(commandPublisher).sendCommandResponse(any(), any());
doAnswer(invocation -> {
// Simulate the time spent for a SEMP command to complete
TimeUnit.SECONDS.sleep(1);
return null;
}).when(terraformManager).execute(any(), any(), any());

ArgumentCaptor<Map<String, String>> topicArgCaptor = ArgumentCaptor.forClass(Map.class);

when(messagingServiceDelegateService.getMessagingServiceClient(any())).thenReturn(
new SolaceHttpSemp(SempClient.builder()
.username("myUsername")
.password("myPassword")
.connectionUrl("myConnectionUrl")
.build()));

// Execute all the commands in parallel to fill the command thread pool queue
IntStream.rangeClosed(1, commandThreadPoolQueueSize).parallel().forEach(i ->
CompletableFuture.runAsync(() -> commandManager.execute(messageList.get(i - 1)), testThreadPool));

// Wait for all the threads to complete (add a timeout just in case)
Collection<Invocation> invocations = Mockito.mockingDetails(commandPublisher).getInvocations();
long timeout = System.currentTimeMillis() + 10_000;
while (invocations.size() < commandThreadPoolQueueSize && System.currentTimeMillis() < timeout) {
TimeUnit.MILLISECONDS.sleep(500);
invocations = Mockito.mockingDetails(commandPublisher).getInvocations();
}

// Verify terraform manager is called
ArgumentCaptor<Map<String, String>> envArgCaptor = ArgumentCaptor.forClass(Map.class);
verify(terraformManager, times(commandThreadPoolQueueSize)).execute(any(), any(), envArgCaptor.capture());

// Verify the env vars are set with the terraform manager is called
Map<String, String> envVars = envArgCaptor.getValue();
assert envVars.get("TF_VAR_password").equals("myPassword");
assert envVars.get("TF_VAR_username").equals("myUsername");
assert envVars.get("TF_VAR_url").equals("myConnectionUrl");

ArgumentCaptor<CommandMessage> messageArgCaptor = ArgumentCaptor.forClass(CommandMessage.class);
verify(commandPublisher, times(commandThreadPoolQueueSize)).sendCommandResponse(messageArgCaptor.capture(), topicArgCaptor.capture());

Map<String, String> topicVars = topicArgCaptor.getValue();
assert topicVars.get("orgId").equals(eventPortalProperties.getOrganizationId());
assert topicVars.get("runtimeAgentId").equals(eventPortalProperties.getRuntimeAgentId());

// Make sure we get all 10 correlation ids in the response messages
List<String> receivedCorrelationIds = messageArgCaptor.getAllValues().stream().map(CommandMessage::getCommandCorrelationId).toList();
List<String> expectedCorrelationIds = IntStream.range(0, commandThreadPoolQueueSize).mapToObj(i -> "myCorrelationId" + i).toList();
assertTrue(receivedCorrelationIds.size() == expectedCorrelationIds.size() &&
receivedCorrelationIds.containsAll(expectedCorrelationIds) && expectedCorrelationIds.containsAll(receivedCorrelationIds));
}

@Test
public void testCommandManager() {
// Create a command request message
CommandMessage message = new CommandMessage();
message.setOrigType(MOPSvcType.maasEventMgmt);
message.withMessageType(generic);
message.setContext("abc");
message.setActorId("myActorId");
message.setOrgId(eventPortalProperties.getOrganizationId());
message.setTraceId("myTraceId");
message.setCommandCorrelationId("myCorrelationId");
message.setCommandBundles(List.of(
CommandBundle.builder()
.executionType(ExecutionType.serial)
.exitOnFailure(false)
.commands(List.of(
Command.builder()
.commandType(CommandType.terraform)
.body("asdfasdfadsf")
.command("apply")
.build()))
.build()));
CommandMessage message = getCommandMessage("1");

doNothing().when(terraformManager).execute(any(), any(), any());

Expand All @@ -85,7 +154,20 @@ public void testCommandManager() {
.password("myPassword")
.connectionUrl("myConnectionUrl")
.build()));

commandManager.execute(message);
// Wait for all the threads to complete (add a timeout just in case)
Collection<Invocation> invocations = Mockito.mockingDetails(commandPublisher).getInvocations();
long timeout = System.currentTimeMillis() + 10_000;
while (invocations.isEmpty() && System.currentTimeMillis() < timeout) {
try {
TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
invocations = Mockito.mockingDetails(commandPublisher).getInvocations();
}


// Verify terraform manager is called
ArgumentCaptor<Map<String, String>> envArgCaptor = ArgumentCaptor.forClass(Map.class);
Expand All @@ -104,4 +186,27 @@ public void testCommandManager() {
assert topicVars.get("runtimeAgentId").equals(eventPortalProperties.getRuntimeAgentId());
assert topicVars.get(COMMAND_CORRELATION_ID).equals(message.getCommandCorrelationId());
}

private CommandMessage getCommandMessage(String suffix) {
CommandMessage message = new CommandMessage();
message.setOrigType(MOPSvcType.maasEventMgmt);
message.withMessageType(generic);
message.setContext("abc");
message.setActorId("myActorId");
message.setOrgId(eventPortalProperties.getOrganizationId());
message.setTraceId("myTraceId");
message.setCommandCorrelationId("myCorrelationId" + suffix);
message.setCommandBundles(List.of(
CommandBundle.builder()
.executionType(ExecutionType.serial)
.exitOnFailure(false)
.commands(List.of(
Command.builder()
.commandType(CommandType.terraform)
.body("asdfasdfadsf")
.command("apply")
.build()))
.build()));
return message;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@
import java.util.concurrent.Future;

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotSame;
import static org.junit.jupiter.api.Assertions.fail;

@Slf4j
@ActiveProfiles("TEST")
Expand All @@ -48,7 +50,7 @@ public class TerraformClientRealTests {

@Test
public void planCreateNewQueue() {
executeTerraformCommand("addQueue.tf", "plan");
executeTerraformCommand("addQueue.tf", "plan ");
}

@Test
Expand All @@ -57,7 +59,7 @@ public void createNewQueue() {
}

@Test
public void create2Queues() {
public void create2DifferentQueues() {
ExecutorService executorService = Executors.newFixedThreadPool(10);
Future<List<CommandBundle>> future1 = executorService.submit(() ->
executeTerraformCommand("addQueue.tf", "apply"));
Expand All @@ -72,6 +74,26 @@ public void create2Queues() {
}
}

@Test
public void create2OfTheSameQueue() {
ExecutorService executorService = Executors.newFixedThreadPool(10);
Future<List<CommandBundle>> future1 = executorService.submit(() ->
executeTerraformCommand("addQueue.tf", "apply", "app123-consumer", JobStatus.success));
Future<List<CommandBundle>> future2 = executorService.submit(() ->
executeTerraformCommand("addQueue.tf", "apply", "app123-consumer", JobStatus.error));
// wait for the futures to complete
try {
List<CommandBundle> command1Bundles = future1.get();
List<CommandBundle> command2Bundles = future2.get();
// We expect the first one to succeed and the second one to fail
assertEquals(JobStatus.success, command1Bundles.get(0).getCommands().get(0).getResult().getStatus());
assertEquals(JobStatus.error, command2Bundles.get(0).getCommands().get(0).getResult().getStatus());
} catch (Exception e) {
log.error("Error waiting for futures to complete", e);
fail();
}
}

@Test
public void delete2Queues() {
executeTerraformCommand("deleteQueue.tf", "apply");
Expand Down Expand Up @@ -141,6 +163,10 @@ private List<CommandBundle> executeTerraformCommand(String hclFileName, String t
}

private List<CommandBundle> executeTerraformCommand(String hclFileName, String tfVerb, String context) {
return executeTerraformCommand(hclFileName, tfVerb, context, JobStatus.success);
}

private List<CommandBundle> executeTerraformCommand(String hclFileName, String tfVerb, String context, JobStatus expectedJobStatus) {
String terraformString = asString(resourceLoader.getResource("classpath:realTfFiles" + File.separator + hclFileName));

Command commandRequest = Command.builder()
Expand Down Expand Up @@ -168,7 +194,7 @@ private List<CommandBundle> executeTerraformCommand(String hclFileName, String t
for (Command command : commandBundle.getCommands()) {
CommandResult result = command.getResult();
System.out.println("Logs " + result.getLogs());
assertNotSame(JobStatus.error, result.getStatus());
assertEquals(expectedJobStatus, result.getStatus());
}
}

Expand Down

0 comments on commit 3e477b3

Please sign in to comment.