From 8a41b5a507e42909fef4193a5f93871a6b4fe9fc Mon Sep 17 00:00:00 2001 From: rudraneel-chakraborty Date: Thu, 4 Jul 2024 16:42:27 -0400 Subject: [PATCH 1/7] DATAGO-79772 wip --- .../agent/command/CommandManager.java | 46 +++++- .../CommandLogStreamingProcessor.java | 27 +-- .../ep/event/management/agent/TestConfig.java | 20 +-- .../ManagedAgentMessageHandlerBeansTests.java | 11 ++ ...fManagedAgentMessageHandlerBeansTests.java | 10 ++ .../CloudManagedEMACommandManagerTests.java | 119 ++++++++++++++ .../CommandManagerTestHelper.java | 97 +++++++++++ .../commandManager/CommandManagerTests.java | 154 +++++++++--------- .../CommandLogStreamProcessorTest.java | 60 ------- .../terraform/manager/TerraformManager.java | 8 +- 10 files changed, 365 insertions(+), 187 deletions(-) create mode 100644 service/application/src/test/java/com/solace/maas/ep/event/management/agent/commandManager/CloudManagedEMACommandManagerTests.java create mode 100644 service/application/src/test/java/com/solace/maas/ep/event/management/agent/commandManager/CommandManagerTestHelper.java diff --git a/service/application/src/main/java/com/solace/maas/ep/event/management/agent/command/CommandManager.java b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/command/CommandManager.java index e312728cd..6aef42e3a 100644 --- a/service/application/src/main/java/com/solace/maas/ep/event/management/agent/command/CommandManager.java +++ b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/command/CommandManager.java @@ -21,12 +21,15 @@ import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Service; +import java.io.IOException; +import java.nio.file.Files; import java.nio.file.Path; import java.time.OffsetDateTime; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import static com.solace.maas.ep.event.management.agent.constants.Command.COMMAND_CORRELATION_ID; @@ -45,14 +48,14 @@ public class CommandManager { private final MessagingServiceDelegateService messagingServiceDelegateService; private final EventPortalProperties eventPortalProperties; private final ThreadPoolTaskExecutor configPushPool; - private final CommandLogStreamingProcessor commandLogStreamingProcessor; + private final Optional commandLogStreamingProcessorOpt; public CommandManager(TerraformManager terraformManager, CommandMapper commandMapper, CommandPublisher commandPublisher, MessagingServiceDelegateService messagingServiceDelegateService, EventPortalProperties eventPortalProperties, - CommandLogStreamingProcessor commandLogStreamingProcessor) { + Optional commandLogStreamingProcessorOpt) { this.terraformManager = terraformManager; this.commandMapper = commandMapper; this.commandPublisher = commandPublisher; @@ -65,7 +68,7 @@ public CommandManager(TerraformManager terraformManager, configPushPool.setThreadNamePrefix("config-push-pool-"); configPushPool.setTaskDecorator(new MdcTaskDecorator()); configPushPool.initialize(); - this.commandLogStreamingProcessor = commandLogStreamingProcessor; + this.commandLogStreamingProcessorOpt = commandLogStreamingProcessorOpt; } public void execute(CommandMessage request) { @@ -106,7 +109,10 @@ public void configPush(CommandRequest request) { for (Command command : bundle.getCommands()) { Path executionLog = executeCommand(request, command, envVars); if (executionLog != null) { - streamCommandExecutionLogToEpCore(request, command, executionLog); + if (commandLogStreamingProcessorOpt.isPresent()) { + streamCommandExecutionLogToEpCore(request, command, executionLog); + } + executionLogFilesToClean.add(executionLog); } if (exitEarlyOnFailedCommand(bundle, command)) { @@ -153,17 +159,41 @@ private Path executeCommand(CommandRequest request, private void cleanup(List listOfExecutionLogFiles) { try { - commandLogStreamingProcessor.deleteExecutionLogFiles(listOfExecutionLogFiles); + deleteExecutionLogFiles(listOfExecutionLogFiles); } catch (Exception e) { log.error("Error while deleting execution log.", e); } } - private void streamCommandExecutionLogToEpCore(CommandRequest request, Command command, Path executionLog) { + public void deleteExecutionLogFiles(List listOfExecutionLogFiles) { + boolean allFilesDeleted = listOfExecutionLogFiles + .stream() + .allMatch(this::deleteExecutionLogFile); + if (!allFilesDeleted) { + throw new IllegalArgumentException("Some of the execution log files were not deleted. Please check the logs"); + } + } + + private boolean deleteExecutionLogFile(Path path) { + try { + if (Files.exists(path)) { + Files.delete(path); + } + } catch (IOException e) { + log.warn("Error while deleting execution log at {}", path, e); + return false; + } + return true; + } + + public void streamCommandExecutionLogToEpCore(CommandRequest request, Command command, Path executionLog) { + if (commandLogStreamingProcessorOpt.isEmpty()) { + throw new UnsupportedOperationException("Streaming logs to ep is not supported for this event management agent type"); + } try { - commandLogStreamingProcessor.streamLogsToEP(request, command, executionLog); + commandLogStreamingProcessorOpt.get().streamLogsToEP(request, command, executionLog); } catch (Exception e) { - log.error("Error sending logs to ep-core for command with commandCorrelationId", + log.error("Error sending logs to ep-core for command with commandCorrelationId {}", request.getCommandCorrelationId(), e); } } diff --git a/service/application/src/main/java/com/solace/maas/ep/event/management/agent/processor/CommandLogStreamingProcessor.java b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/processor/CommandLogStreamingProcessor.java index f08e161e6..54462dbbd 100644 --- a/service/application/src/main/java/com/solace/maas/ep/event/management/agent/processor/CommandLogStreamingProcessor.java +++ b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/processor/CommandLogStreamingProcessor.java @@ -12,10 +12,9 @@ import com.solace.maas.ep.event.management.agent.publisher.CommandLogsPublisher; import lombok.extern.slf4j.Slf4j; import org.slf4j.MDC; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; import org.springframework.stereotype.Component; -import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; import java.time.Instant; @@ -33,7 +32,7 @@ @Component @Slf4j -@ConditionalOnProperty(name = "event-portal.gateway.messaging.standalone", havingValue = "false") +@ConditionalOnExpression("${event-portal.gateway.messaging.standalone:false}== false && ${event-portal.managed:false} == false") public class CommandLogStreamingProcessor { public static final String ANY = "*"; @@ -50,28 +49,6 @@ public CommandLogStreamingProcessor(CommandLogsPublisher commandLogsPublisher, } - public void deleteExecutionLogFiles(List listOfExecutionLogFiles) { - boolean allFilesDeleted = listOfExecutionLogFiles - .stream() - .allMatch(this::deleteExecutionLogFile); - if (!allFilesDeleted) { - throw new IllegalArgumentException("Some of the execution log files were not deleted. Please check the logs"); - } - } - - private boolean deleteExecutionLogFile(Path path) { - try { - if (Files.exists(path)) { - Files.delete(path); - } - } catch (IOException e) { - log.warn("Error while deleting execution log at {}", path, e); - return false; - } - return true; - } - - public void streamLogsToEP(CommandRequest request, Command executedCommand, Path commandExecutionLog) { if (executedCommand.getIgnoreResult()) { diff --git a/service/application/src/test/java/com/solace/maas/ep/event/management/agent/TestConfig.java b/service/application/src/test/java/com/solace/maas/ep/event/management/agent/TestConfig.java index b248c5e24..c98216782 100644 --- a/service/application/src/test/java/com/solace/maas/ep/event/management/agent/TestConfig.java +++ b/service/application/src/test/java/com/solace/maas/ep/event/management/agent/TestConfig.java @@ -1,6 +1,5 @@ package com.solace.maas.ep.event.management.agent; -import com.fasterxml.jackson.databind.ObjectMapper; import com.solace.maas.ep.event.management.agent.command.CommandManager; import com.solace.maas.ep.event.management.agent.command.mapper.CommandMapper; import com.solace.maas.ep.event.management.agent.config.SolaceConfiguration; @@ -31,13 +30,13 @@ import org.apache.camel.ProducerTemplate; import org.apache.camel.support.DefaultExchange; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.test.context.TestConfiguration; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Import; import org.springframework.context.annotation.Primary; import org.springframework.context.annotation.Profile; +import java.util.Optional; import java.util.Properties; import java.util.Random; import java.util.concurrent.TimeUnit; @@ -161,21 +160,6 @@ public CommandLogsPublisher getComaCommandLogsPublisher() { return mock(CommandLogsPublisher.class); } - @Bean - @Qualifier("realCommandLogStreamingProcessor") - public CommandLogStreamingProcessor realCommandLogStreamingProcessor(CommandLogsPublisher commandLogsPublisher, - EventPortalProperties eventPortalProperties, - ObjectMapper objectMapper) { - return new CommandLogStreamingProcessor(commandLogsPublisher, eventPortalProperties, objectMapper); - } - - @Bean - @Qualifier("mockedCommandLogStreamingProcessor") - @Primary - public CommandLogStreamingProcessor mockedCommandLogStreamingProcessor() { - return mock(CommandLogStreamingProcessor.class); - } - @Bean @Primary public CommandManager getCommandManager(TerraformManager terraformManager, @@ -183,7 +167,7 @@ public CommandManager getCommandManager(TerraformManager terraformManager, CommandPublisher commandPublisher, MessagingServiceDelegateService messagingServiceDelegateService, EventPortalProperties eventPortalProperties, - CommandLogStreamingProcessor commandLogStreamingProcessor) { + Optional commandLogStreamingProcessor) { return new CommandManager( terraformManager, commandMapper, diff --git a/service/application/src/test/java/com/solace/maas/ep/event/management/agent/applicationContextTests/ManagedAgentMessageHandlerBeansTests.java b/service/application/src/test/java/com/solace/maas/ep/event/management/agent/applicationContextTests/ManagedAgentMessageHandlerBeansTests.java index c8a06f24c..ceb3a9df6 100644 --- a/service/application/src/test/java/com/solace/maas/ep/event/management/agent/applicationContextTests/ManagedAgentMessageHandlerBeansTests.java +++ b/service/application/src/test/java/com/solace/maas/ep/event/management/agent/applicationContextTests/ManagedAgentMessageHandlerBeansTests.java @@ -59,4 +59,15 @@ void testDirectMessageHandlerBeansAreNotLoaded() { ); } + + @Test + void testCommandLogStreamingProcessorBeanIsNotLoaded() { + String[] allBeanNames = applicationContext.getBeanDefinitionNames(); + assertThat( + Arrays.stream(allBeanNames) + .map(StringUtils::lowerCase) + .collect(Collectors.toSet())) + .doesNotContain(StringUtils.lowerCase("commandLogStreamingProcessor")); + + } } diff --git a/service/application/src/test/java/com/solace/maas/ep/event/management/agent/applicationContextTests/SelfManagedAgentMessageHandlerBeansTests.java b/service/application/src/test/java/com/solace/maas/ep/event/management/agent/applicationContextTests/SelfManagedAgentMessageHandlerBeansTests.java index 9b6751e36..3d11ea377 100644 --- a/service/application/src/test/java/com/solace/maas/ep/event/management/agent/applicationContextTests/SelfManagedAgentMessageHandlerBeansTests.java +++ b/service/application/src/test/java/com/solace/maas/ep/event/management/agent/applicationContextTests/SelfManagedAgentMessageHandlerBeansTests.java @@ -55,4 +55,14 @@ void testDirectMessageHandlerBeansAreLoaded() { ); } + + @Test + void testCommandLogStreamingProcessorBeanIsLoaded() { + String[] allBeanNames = applicationContext.getBeanDefinitionNames(); + assertThat( + Arrays.stream(allBeanNames) + .map(StringUtils::lowerCase) + .collect(Collectors.toSet())).contains(StringUtils.lowerCase("commandLogStreamingProcessor")); + + } } diff --git a/service/application/src/test/java/com/solace/maas/ep/event/management/agent/commandManager/CloudManagedEMACommandManagerTests.java b/service/application/src/test/java/com/solace/maas/ep/event/management/agent/commandManager/CloudManagedEMACommandManagerTests.java new file mode 100644 index 000000000..11af0ed7f --- /dev/null +++ b/service/application/src/test/java/com/solace/maas/ep/event/management/agent/commandManager/CloudManagedEMACommandManagerTests.java @@ -0,0 +1,119 @@ +package com.solace.maas.ep.event.management.agent.commandManager; + +import com.solace.maas.ep.common.messages.CommandMessage; +import com.solace.maas.ep.event.management.agent.command.CommandManager; +import com.solace.maas.ep.event.management.agent.config.eventPortal.EventPortalProperties; +import com.solace.maas.ep.event.management.agent.plugin.command.model.Command; +import com.solace.maas.ep.event.management.agent.plugin.command.model.JobStatus; +import com.solace.maas.ep.event.management.agent.plugin.service.MessagingServiceDelegateService; +import com.solace.maas.ep.event.management.agent.plugin.solace.processor.semp.SempClient; +import com.solace.maas.ep.event.management.agent.plugin.solace.processor.semp.SolaceHttpSemp; +import com.solace.maas.ep.event.management.agent.plugin.terraform.manager.TerraformManager; +import com.solace.maas.ep.event.management.agent.publisher.CommandPublisher; +import lombok.extern.slf4j.Slf4j; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import org.mockito.ArgumentCaptor; +import org.mockito.stubbing.Answer; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.mock.mockito.SpyBean; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.ActiveProfiles; + +import java.nio.file.Path; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import static org.awaitility.Awaitility.await; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.springframework.test.annotation.DirtiesContext.ClassMode.BEFORE_EACH_TEST_METHOD; + +@Slf4j +@ActiveProfiles("TEST") +@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, properties = { + "eventPortal.gateway.messaging.standalone=false", + "eventPortal.managed=true", + "eventPortal.incomingRequestQueueName = ep_core_ema_requests_123456_123123", + "event-portal.gateway.messaging.rto-session=false" +}) +@DirtiesContext(classMode = BEFORE_EACH_TEST_METHOD) +//CPD-OFF +class CloudManagedEMACommandManagerTests { + + @SpyBean + private CommandManager commandManager; + + @Autowired + private TerraformManager terraformManager; + + @Autowired + private CommandPublisher commandPublisher; + + @Autowired + private MessagingServiceDelegateService messagingServiceDelegateService; + + @Autowired + private EventPortalProperties eventPortalProperties; + + private static final String MESSAGING_SERVICE_ID = "myMessagingServiceId"; + + private ArgumentCaptor> executionLogFileCaptor; + private ArgumentCaptor> topicArgCaptor; + private ArgumentCaptor> envArgCaptor; + private ArgumentCaptor responseCaptor; + + private CommandMessage message; + + @BeforeEach + void setUp() { + message = CommandManagerTestHelper.buildCommandMessageForConfigPush(eventPortalProperties.getOrganizationId(), MESSAGING_SERVICE_ID); + executionLogFileCaptor = ArgumentCaptor.forClass(List.class); + topicArgCaptor = ArgumentCaptor.forClass(Map.class); + doNothing().when(commandPublisher).sendCommandResponse(any(), any()); + when(messagingServiceDelegateService.getMessagingServiceClient(any())).thenReturn( + new SolaceHttpSemp(SempClient.builder() + .username("myUsername") + .password("myPassword") + .connectionUrl("myConnectionUrl") + .build())); + envArgCaptor = ArgumentCaptor.forClass(Map.class); + responseCaptor = ArgumentCaptor.forClass(CommandMessage.class); + } + + @Test + void noLogsStreamingToEP(@TempDir Path basePath) { + doAnswer((Answer) invocation -> { + Command command = (Command) invocation.getArgument(1); + return CommandManagerTestHelper.setCommandStatusAndReturnExecutionLog(command, JobStatus.success, true, basePath); + }).when(terraformManager).execute(any(), any(), any()); + + commandManager.execute(message); + + // Wait for the command thread to complete + await().atMost(5, TimeUnit.SECONDS).until(() -> CommandManagerTestHelper.verifyCommandPublisherIsInvoked(commandPublisher, 1)); + + verify(terraformManager, times(4)).execute(any(), any(), envArgCaptor.capture()); + verify(commandPublisher, times(1)).sendCommandResponse(responseCaptor.capture(), topicArgCaptor.capture()); + + //Logs will not be streamed as EMA is cloud managed + verify(commandManager, times(0)).streamCommandExecutionLogToEpCore(any(), any(), any()); + + //Logs will be cleaned up anyway + verify(commandManager, times(1)).deleteExecutionLogFiles(executionLogFileCaptor.capture()); + Assertions.assertThat(executionLogFileCaptor.getValue()) + .containsExactlyInAnyOrder( + basePath.resolve("apply"), + basePath.resolve("write_HCL"), + basePath.resolve("write_HCL"), + basePath.resolve("sync")); + } +} \ No newline at end of file diff --git a/service/application/src/test/java/com/solace/maas/ep/event/management/agent/commandManager/CommandManagerTestHelper.java b/service/application/src/test/java/com/solace/maas/ep/event/management/agent/commandManager/CommandManagerTestHelper.java new file mode 100644 index 000000000..a850cf578 --- /dev/null +++ b/service/application/src/test/java/com/solace/maas/ep/event/management/agent/commandManager/CommandManagerTestHelper.java @@ -0,0 +1,97 @@ +package com.solace.maas.ep.event.management.agent.commandManager; + +import com.solace.maas.ep.common.messages.CommandMessage; +import com.solace.maas.ep.event.management.agent.plugin.command.model.Command; +import com.solace.maas.ep.event.management.agent.plugin.command.model.CommandBundle; +import com.solace.maas.ep.event.management.agent.plugin.command.model.CommandResult; +import com.solace.maas.ep.event.management.agent.plugin.command.model.CommandType; +import com.solace.maas.ep.event.management.agent.plugin.command.model.ExecutionType; +import com.solace.maas.ep.event.management.agent.plugin.command.model.JobStatus; +import com.solace.maas.ep.event.management.agent.plugin.mop.MOPSvcType; +import com.solace.maas.ep.event.management.agent.publisher.CommandPublisher; +import org.mockito.Mockito; + +import java.nio.file.Path; +import java.util.List; +import java.util.Map; + +import static com.solace.maas.ep.event.management.agent.plugin.mop.MOPMessageType.generic; + +public final class CommandManagerTestHelper { + + private CommandManagerTestHelper() { + throw new UnsupportedOperationException(); + } + + public static Boolean verifyCommandPublisherIsInvoked(CommandPublisher commandPublisher, int numberOfExpectedInvocations) { + return Mockito.mockingDetails(commandPublisher).getInvocations().size() == numberOfExpectedInvocations; + } + + + public static Path setCommandStatusAndReturnExecutionLog(Command targetCommand, + JobStatus targetStatus, + boolean ignoreResult, + Path basePath) { + + if (targetStatus == JobStatus.success) { + targetCommand.setResult(CommandResult.builder() + .status(JobStatus.success) + .result(Map.of()).build()); + return basePath.resolve(targetCommand.getCommand()); + } else { + //simulating a failed command + targetCommand.setResult(null); + targetCommand.setIgnoreResult(ignoreResult); + return null; + } + + } + + public static CommandMessage buildCommandMessageForConfigPush(String targetOrgId, + String targetMessagingServiceId) { + + List commands = List.of( + Command.builder() + .commandType(CommandType.terraform) + .ignoreResult(false) + .body("asdfasdfadsf") + .command("write_HCL") + .build(), + Command.builder() + .commandType(CommandType.terraform) + .ignoreResult(false) + .body("asdfasdfadsf") + .command("write_HCL") + .build(), + Command.builder() + .commandType(CommandType.terraform) + .ignoreResult(true) + .body("asdfasdfadsf") + .command("sync") + .build(), + + Command.builder() + .commandType(CommandType.terraform) + .ignoreResult(false) + .body("asdfasdfadsf") + .command("apply") + .build()); + + CommandMessage message = new CommandMessage(); + message.setOrigType(MOPSvcType.maasEventMgmt); + message.withMessageType(generic); + message.setContext("abc"); + message.setServiceId(targetMessagingServiceId); + message.setActorId("myActorId"); + message.setOrgId(targetOrgId); + message.setTraceId("myTraceId"); + message.setCommandCorrelationId("myCorrelationIdabc"); + message.setCommandBundles(List.of( + CommandBundle.builder() + .executionType(ExecutionType.serial) + .exitOnFailure(true) + .commands(commands) + .build())); + return message; + } +} diff --git a/service/application/src/test/java/com/solace/maas/ep/event/management/agent/commandManager/CommandManagerTests.java b/service/application/src/test/java/com/solace/maas/ep/event/management/agent/commandManager/CommandManagerTests.java index c838f200f..8a49b027f 100644 --- a/service/application/src/test/java/com/solace/maas/ep/event/management/agent/commandManager/CommandManagerTests.java +++ b/service/application/src/test/java/com/solace/maas/ep/event/management/agent/commandManager/CommandManagerTests.java @@ -26,11 +26,9 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; import org.mockito.ArgumentCaptor; -import org.mockito.Mockito; import org.mockito.stubbing.Answer; import org.slf4j.MDC; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.mock.mockito.SpyBean; @@ -38,6 +36,8 @@ import org.springframework.test.context.ActiveProfiles; import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.LinkOption; import java.nio.file.Path; import java.util.ArrayList; import java.util.List; @@ -46,6 +46,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.IntStream; +import java.util.stream.Stream; import static com.solace.maas.ep.event.management.agent.constants.Command.COMMAND_CORRELATION_ID; import static com.solace.maas.ep.event.management.agent.plugin.constants.RouteConstants.TRACE_ID; @@ -81,9 +82,8 @@ class CommandManagerTests { @Autowired private CommandPublisher commandPublisher; - @Autowired - @Qualifier("mockedCommandLogStreamingProcessor") - private CommandLogStreamingProcessor mockedCommandLogStreamingProcessor; + @SpyBean + private CommandLogStreamingProcessor commandLogStreamingProcessor; @Autowired private MessagingServiceDelegateService messagingServiceDelegateService; @@ -100,7 +100,7 @@ public void cleanup() { reset(commandPublisher); reset(commandManager); reset(messagingServiceDelegateService); - reset(mockedCommandLogStreamingProcessor); + reset(commandLogStreamingProcessor); } @Test @@ -139,7 +139,7 @@ void testMultiThreadedCommandManager() throws InterruptedException { CompletableFuture.runAsync(() -> commandManager.execute(messageList.get(i - 1)), testThreadPool)); // Wait for all the threads to complete (add a timeout just in case) - await().atMost(10, TimeUnit.SECONDS).until(() -> commandPublisherIsInvoked(commandThreadPoolQueueSize)); + await().atMost(10, TimeUnit.SECONDS).until(() -> CommandManagerTestHelper.verifyCommandPublisherIsInvoked(commandPublisher, 1)); // Verify terraform manager is called ArgumentCaptor> envArgCaptor = ArgumentCaptor.forClass(Map.class); @@ -175,7 +175,7 @@ void failSendingResponseBackToEp() { commandManager.execute(message); // Wait for the command thread to complete - await().atMost(10, TimeUnit.SECONDS).until(() -> commandPublisherIsInvoked(2)); + await().atMost(10, TimeUnit.SECONDS).until(() -> CommandManagerTestHelper.verifyCommandPublisherIsInvoked(commandPublisher, 1)); ArgumentCaptor messageArgCaptor = ArgumentCaptor.forClass(CommandMessage.class); verify(commandPublisher, times(2)).sendCommandResponse(messageArgCaptor.capture(), any()); @@ -201,7 +201,7 @@ void failSettingBrokerSpecificEnvironmentVariables() { .when(messagingServiceDelegateService).getMessagingServiceClient(MESSAGING_SERVICE_ID); commandManager.execute(message); - await().atMost(10, TimeUnit.SECONDS).until(() -> commandPublisherIsInvoked(1)); + await().atMost(10, TimeUnit.SECONDS).until(() -> CommandManagerTestHelper.verifyCommandPublisherIsInvoked(commandPublisher, 1)); ArgumentCaptor messageArgCaptor = ArgumentCaptor.forClass(CommandMessage.class); verify(commandPublisher, times(1)).sendCommandResponse(messageArgCaptor.capture(), any()); @@ -218,7 +218,7 @@ void failConfigPushCommand() { doThrow(new RuntimeException("Error running command.")).when(commandManager).configPush(commandMapper.map(message)); commandManager.execute(message); - await().atMost(10, TimeUnit.SECONDS).until(() -> commandPublisherIsInvoked(1)); + await().atMost(10, TimeUnit.SECONDS).until(() -> CommandManagerTestHelper.verifyCommandPublisherIsInvoked(commandPublisher, 1)); ArgumentCaptor messageArgCaptor = ArgumentCaptor.forClass(CommandMessage.class); verify(commandPublisher, times(1)).sendCommandResponse(messageArgCaptor.capture(), any()); @@ -245,7 +245,7 @@ void testCommandManager() { commandManager.execute(message); // Wait for the command thread to complete - await().atMost(10, TimeUnit.SECONDS).until(() -> commandPublisherIsInvoked(1)); + await().atMost(10, TimeUnit.SECONDS).until(() -> CommandManagerTestHelper.verifyCommandPublisherIsInvoked(commandPublisher, 1)); // Verify terraform manager is called ArgumentCaptor> envArgCaptor = ArgumentCaptor.forClass(Map.class); @@ -356,7 +356,7 @@ void verifyMDCIsSetInCommandManagerThread() { commandManager.execute(message); // Wait for the command thread to complete - await().atMost(10, TimeUnit.SECONDS).until(() -> commandPublisherIsInvoked(1)); + await().atMost(10, TimeUnit.SECONDS).until(() -> CommandManagerTestHelper.verifyCommandPublisherIsInvoked(commandPublisher, 1)); assertTrue(mdcIsSet.get()); } @@ -394,58 +394,6 @@ private static Command buildCommand(boolean ignoreResult) { .build(); } - - private CommandMessage buildCommandMessageForConfigPush() { - - List commands = List.of( - Command.builder() - .commandType(CommandType.terraform) - .ignoreResult(false) - .body("asdfasdfadsf") - .command("write_HCL") - .build(), - Command.builder() - .commandType(CommandType.terraform) - .ignoreResult(false) - .body("asdfasdfadsf") - .command("write_HCL") - .build(), - Command.builder() - .commandType(CommandType.terraform) - .ignoreResult(true) - .body("asdfasdfadsf") - .command("sync") - .build(), - - Command.builder() - .commandType(CommandType.terraform) - .ignoreResult(false) - .body("asdfasdfadsf") - .command("apply") - .build()); - - CommandMessage message = new CommandMessage(); - message.setOrigType(MOPSvcType.maasEventMgmt); - message.withMessageType(generic); - message.setContext("abc"); - message.setServiceId(MESSAGING_SERVICE_ID); - message.setActorId("myActorId"); - message.setOrgId(eventPortalProperties.getOrganizationId()); - message.setTraceId("myTraceId"); - message.setCommandCorrelationId("myCorrelationIdabc"); - message.setCommandBundles(List.of( - CommandBundle.builder() - .executionType(ExecutionType.serial) - .exitOnFailure(true) - .commands(commands) - .build())); - return message; - } - - private Boolean commandPublisherIsInvoked(int numberOfExpectedInvocations) { - return Mockito.mockingDetails(commandPublisher).getInvocations().size() == numberOfExpectedInvocations; - } - private ArgumentCaptor executeCommandAndGetResponseMessage(CommandMessage message) { ArgumentCaptor mopMessageCaptor = ArgumentCaptor.forClass(MOPMessage.class); @@ -460,7 +408,7 @@ private ArgumentCaptor executeCommandAndGetResponseMessage(CommandMe commandManager.execute(message); // Wait for the command thread to complete - await().atMost(10, TimeUnit.SECONDS).until(() -> commandPublisherIsInvoked(1)); + await().atMost(10, TimeUnit.SECONDS).until(() -> CommandManagerTestHelper.verifyCommandPublisherIsInvoked(commandPublisher, 1)); return mopMessageCaptor; } @@ -475,7 +423,7 @@ class LogStreamingToEpTest { @BeforeEach void setUp() { - message = buildCommandMessageForConfigPush(); + message = CommandManagerTestHelper.buildCommandMessageForConfigPush(eventPortalProperties.getOrganizationId(), MESSAGING_SERVICE_ID); executionLogFileCaptor = ArgumentCaptor.forClass(List.class); topicArgCaptor = ArgumentCaptor.forClass(Map.class); doNothing().when(commandPublisher).sendCommandResponse(any(), any()); @@ -503,7 +451,7 @@ void testLogStreamingToEP(@TempDir Path basePath) throws IOException { */ executeCommandsAndVerify(4, 4); - verify(mockedCommandLogStreamingProcessor, times(1)).deleteExecutionLogFiles(executionLogFileCaptor.capture()); + verify(commandManager, times(1)).deleteExecutionLogFiles(executionLogFileCaptor.capture()); Assertions.assertThat(executionLogFileCaptor.getValue()) .containsExactlyInAnyOrder( basePath.resolve("apply"), @@ -532,7 +480,7 @@ void testExecutionLogCleanupWhenOneOfTheExecutionLogPathIsNull(@TempDir Path bas however only 3 log files will be streamed + cleaned */ executeCommandsAndVerify(4, 3); - verify(mockedCommandLogStreamingProcessor, times(1)).deleteExecutionLogFiles(executionLogFileCaptor.capture()); + verify(commandManager, times(1)).deleteExecutionLogFiles(executionLogFileCaptor.capture()); Assertions.assertThat(executionLogFileCaptor.getValue()) .containsExactlyInAnyOrder( basePath.resolve("apply"), @@ -567,7 +515,7 @@ void testExecutionLogCleanupWhenExitEarlyOnFailedCommand(@TempDir Path basePath) so we expect 3 commands to be executed and 2 log files to be streamed + cleaned */ executeCommandsAndVerify(3, 2); - verify(mockedCommandLogStreamingProcessor, times(1)).deleteExecutionLogFiles(executionLogFileCaptor.capture()); + verify(commandManager, times(1)).deleteExecutionLogFiles(executionLogFileCaptor.capture()); Assertions.assertThat(executionLogFileCaptor.getValue()) .containsExactlyInAnyOrder( basePath.resolve("write_HCL"), @@ -582,16 +530,16 @@ void testExecutionLogCleanupWhenLogStreamingToEpFails(@TempDir Path basePath) { return setCommandStatusAndReturnExecutionLog(command, JobStatus.success, true, basePath); }).when(terraformManager).execute(any(), any(), any()); - doThrow(new IllegalArgumentException("fake")).when(mockedCommandLogStreamingProcessor).streamLogsToEP(any(), any(), any()); + doThrow(new IllegalArgumentException("fake")).when(commandLogStreamingProcessor).streamLogsToEP(any(), any(), any()); commandManager.execute(message); // Wait for the command thread to complete - await().atMost(5, TimeUnit.SECONDS).until(() -> commandPublisherIsInvoked(1)); + await().atMost(5, TimeUnit.SECONDS).until(() -> CommandManagerTestHelper.verifyCommandPublisherIsInvoked(commandPublisher, 1)); - verify(mockedCommandLogStreamingProcessor, times(4)).streamLogsToEP(any(), any(), any()); + verify(commandLogStreamingProcessor, times(4)).streamLogsToEP(any(), any(), any()); //we still expect cleanup to occur even though log streaming to ep fails - verify(mockedCommandLogStreamingProcessor, times(1)).deleteExecutionLogFiles(executionLogFileCaptor.capture()); + verify(commandManager, times(1)).deleteExecutionLogFiles(executionLogFileCaptor.capture()); Assertions.assertThat(executionLogFileCaptor.getValue()) .containsExactlyInAnyOrder( basePath.resolve("apply"), @@ -600,16 +548,70 @@ void testExecutionLogCleanupWhenLogStreamingToEpFails(@TempDir Path basePath) { basePath.resolve("sync")); } + @Test + void testExecutionLogDeletionSuccessFlow(@TempDir Path logPath) throws IOException { + Path commandLog1 = logPath.resolve("log1"); + Path commandLog2 = logPath.resolve("log2"); + Path commandLog3 = logPath.resolve("log3"); + Path commandLog4 = logPath.resolve("log4"); + + Files.writeString(commandLog1, "log 1"); + Files.writeString(commandLog2, "log 2"); + Files.writeString(commandLog3, "log 3"); + Files.writeString(commandLog4, "log 4"); + List allLogs = List.of(commandLog1, commandLog2, commandLog3, commandLog4); + + Assertions.assertThat( + allLogs.stream().allMatch(path -> Files.exists(path, LinkOption.NOFOLLOW_LINKS)) + ).isTrue(); + commandManager.deleteExecutionLogFiles( + List.of(commandLog1, commandLog2, commandLog3, commandLog4) + ); + + Assertions.assertThat( + allLogs.stream().noneMatch(path -> Files.exists(path, LinkOption.NOFOLLOW_LINKS)) + ).isTrue(); + } + + @Test + void testExecutionLogDeletionWhenSomeLogFilesDontExist(@TempDir Path logPath) throws IOException { + Path commandLog1 = logPath.resolve("log1"); + Path commandLog2 = logPath.resolve("log2"); + Path commandLog3 = logPath.resolve("log3"); + Path commandLog4 = logPath.resolve("log4"); + + Files.writeString(commandLog1, "log 1"); + Files.writeString(commandLog2, "log 2"); + + List allLogs = List.of(commandLog1, commandLog2, commandLog3, commandLog4); + + // Only 2 of the log files exist + Assertions.assertThat( + Stream.of(commandLog1, commandLog2).allMatch(path -> Files.exists(path, LinkOption.NOFOLLOW_LINKS)) + ).isTrue(); + /* Although only 2 out of 4 log files exist, the 2 log files will be deleted anyway + and the errors will be handled gracefully + */ + commandManager.deleteExecutionLogFiles( + List.of(commandLog1, commandLog2, commandLog3, commandLog4) + ); + + Assertions.assertThat( + allLogs.stream().noneMatch(path -> Files.exists(path, LinkOption.NOFOLLOW_LINKS)) + ).isTrue(); + + } + private void executeCommandsAndVerify(int expectedNumberOfCommandExecutions, int expectedNumberOfLogFilesStreamed) { commandManager.execute(message); // Wait for the command thread to complete - await().atMost(5, TimeUnit.SECONDS).until(() -> commandPublisherIsInvoked(1)); + await().atMost(5, TimeUnit.SECONDS).until(() -> CommandManagerTestHelper.verifyCommandPublisherIsInvoked(commandPublisher, 1)); verify(terraformManager, times(expectedNumberOfCommandExecutions)).execute(any(), any(), envArgCaptor.capture()); verify(commandPublisher, times(1)).sendCommandResponse(responseCaptor.capture(), topicArgCaptor.capture()); - verify(mockedCommandLogStreamingProcessor, times(expectedNumberOfLogFilesStreamed)).streamLogsToEP(any(), any(), any()); + verify(commandLogStreamingProcessor, times(expectedNumberOfLogFilesStreamed)).streamLogsToEP(any(), any(), any()); } private Path setCommandStatusAndReturnExecutionLog(Command targetCommand, @@ -631,4 +633,6 @@ private Path setCommandStatusAndReturnExecutionLog(Command targetCommand, } } + + } diff --git a/service/application/src/test/java/com/solace/maas/ep/event/management/agent/processor/CommandLogStreamProcessorTest.java b/service/application/src/test/java/com/solace/maas/ep/event/management/agent/processor/CommandLogStreamProcessorTest.java index 426f18602..0e5618c3b 100644 --- a/service/application/src/test/java/com/solace/maas/ep/event/management/agent/processor/CommandLogStreamProcessorTest.java +++ b/service/application/src/test/java/com/solace/maas/ep/event/management/agent/processor/CommandLogStreamProcessorTest.java @@ -16,10 +16,8 @@ import org.assertj.core.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.io.TempDir; import org.mockito.ArgumentCaptor; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.ActiveProfiles; @@ -27,11 +25,8 @@ import java.io.File; import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.LinkOption; import java.nio.file.Path; import java.util.List; -import java.util.stream.Stream; import static com.solace.maas.ep.event.management.agent.plugin.mop.MOPMessageType.generic; import static org.junit.Assert.assertThrows; @@ -52,7 +47,6 @@ public class CommandLogStreamProcessorTest { private EventPortalProperties eventPortalProperties; @Autowired - @Qualifier("realCommandLogStreamingProcessor") private CommandLogStreamingProcessor realCommandLogStreamingProcessor; @Autowired @@ -166,60 +160,6 @@ void testStreamLogsToEPErrorCase() throws IOException { } - @Test - void testExecutionLogDeletionSuccessFlow(@TempDir Path logPath) throws IOException { - Path commandLog1 = logPath.resolve("log1"); - Path commandLog2 = logPath.resolve("log2"); - Path commandLog3 = logPath.resolve("log3"); - Path commandLog4 = logPath.resolve("log4"); - - Files.writeString(commandLog1, "log 1"); - Files.writeString(commandLog2, "log 2"); - Files.writeString(commandLog3, "log 3"); - Files.writeString(commandLog4, "log 4"); - List allLogs = List.of(commandLog1, commandLog2, commandLog3, commandLog4); - - Assertions.assertThat( - allLogs.stream().allMatch(path -> Files.exists(path, LinkOption.NOFOLLOW_LINKS)) - ).isTrue(); - realCommandLogStreamingProcessor.deleteExecutionLogFiles( - List.of(commandLog1, commandLog2, commandLog3, commandLog4) - ); - - Assertions.assertThat( - allLogs.stream().noneMatch(path -> Files.exists(path, LinkOption.NOFOLLOW_LINKS)) - ).isTrue(); - } - - @Test - void testExecutionLogDeletionWhenSomeLogFilesDontExist(@TempDir Path logPath) throws IOException { - Path commandLog1 = logPath.resolve("log1"); - Path commandLog2 = logPath.resolve("log2"); - Path commandLog3 = logPath.resolve("log3"); - Path commandLog4 = logPath.resolve("log4"); - - Files.writeString(commandLog1, "log 1"); - Files.writeString(commandLog2, "log 2"); - - List allLogs = List.of(commandLog1, commandLog2, commandLog3, commandLog4); - - // Only 2 of the log files exist - Assertions.assertThat( - Stream.of(commandLog1, commandLog2).allMatch(path -> Files.exists(path, LinkOption.NOFOLLOW_LINKS)) - ).isTrue(); - /* Although only 2 out of 4 log files exist, the 2 log files will be deleted anyway - and the errors will be handled gracefully - */ - realCommandLogStreamingProcessor.deleteExecutionLogFiles( - List.of(commandLog1, commandLog2, commandLog3, commandLog4) - ); - - Assertions.assertThat( - allLogs.stream().noneMatch(path -> Files.exists(path, LinkOption.NOFOLLOW_LINKS)) - ).isTrue(); - - } - private CommandMessage buildCommandMessageForConfigPush(List commands) { CommandMessage message = new CommandMessage(); diff --git a/service/terraform-plugin/src/main/java/com/solace/maas/ep/event/management/agent/plugin/terraform/manager/TerraformManager.java b/service/terraform-plugin/src/main/java/com/solace/maas/ep/event/management/agent/plugin/terraform/manager/TerraformManager.java index 73a789225..b87d2b9ce 100644 --- a/service/terraform-plugin/src/main/java/com/solace/maas/ep/event/management/agent/plugin/terraform/manager/TerraformManager.java +++ b/service/terraform-plugin/src/main/java/com/solace/maas/ep/event/management/agent/plugin/terraform/manager/TerraformManager.java @@ -7,6 +7,7 @@ import com.solace.maas.ep.event.management.agent.plugin.command.model.CommandResult; import com.solace.maas.ep.event.management.agent.plugin.command.model.JobStatus; import com.solace.maas.ep.event.management.agent.plugin.constants.RouteConstants; +import com.solace.maas.ep.event.management.agent.plugin.messagingService.event.EventProperty; import com.solace.maas.ep.event.management.agent.plugin.terraform.client.TerraformClient; import com.solace.maas.ep.event.management.agent.plugin.terraform.client.TerraformClientFactory; import com.solace.maas.ep.event.management.agent.plugin.terraform.configuration.TerraformProperties; @@ -37,6 +38,9 @@ public class TerraformManager { private final TerraformProperties terraformProperties; private final TerraformClientFactory terraformClientFactory; + + private boolean isManagedAgent; + public TerraformManager(TerraformLogProcessingService terraformLogProcessingService, TerraformProperties terraformProperties, TerraformClientFactory terraformClientFactory) { @@ -45,7 +49,9 @@ public TerraformManager(TerraformLogProcessingService terraformLogProcessingServ this.terraformClientFactory = terraformClientFactory; } - public Path execute(CommandRequest request, Command command, Map envVars) { + public Path execute(CommandRequest request, + Command command, + Map envVars) { MDC.put(RouteConstants.COMMAND_CORRELATION_ID, request.getCommandCorrelationId()); MDC.put(RouteConstants.MESSAGING_SERVICE_ID, request.getServiceId()); setEnvVarsFromParameters(command, envVars); From 10d1a61a382befe75144975ea195781fb3a16e4a Mon Sep 17 00:00:00 2001 From: rudraneel-chakraborty Date: Fri, 5 Jul 2024 14:44:22 -0400 Subject: [PATCH 2/7] DATAGO-79772 wip --- .../agent/processor/CommandLogStreamProcessorTest.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/service/application/src/test/java/com/solace/maas/ep/event/management/agent/processor/CommandLogStreamProcessorTest.java b/service/application/src/test/java/com/solace/maas/ep/event/management/agent/processor/CommandLogStreamProcessorTest.java index 0e5618c3b..518568a61 100644 --- a/service/application/src/test/java/com/solace/maas/ep/event/management/agent/processor/CommandLogStreamProcessorTest.java +++ b/service/application/src/test/java/com/solace/maas/ep/event/management/agent/processor/CommandLogStreamProcessorTest.java @@ -20,6 +20,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.mock.mockito.SpyBean; import org.springframework.test.context.ActiveProfiles; import org.springframework.util.ResourceUtils; @@ -46,7 +47,7 @@ public class CommandLogStreamProcessorTest { @Autowired private EventPortalProperties eventPortalProperties; - @Autowired + @SpyBean private CommandLogStreamingProcessor realCommandLogStreamingProcessor; @Autowired From 2cc1ba52b553cc4e7ed3bc49e2833582cf326fff Mon Sep 17 00:00:00 2001 From: rudraneel-chakraborty Date: Sun, 7 Jul 2024 19:55:55 -0400 Subject: [PATCH 3/7] DATAGO-79772 wip --- .../src/main/resources/application.yml | 392 ++---------------- .../commandManager/CommandManagerTests.java | 4 +- 2 files changed, 39 insertions(+), 357 deletions(-) diff --git a/service/application/src/main/resources/application.yml b/service/application/src/main/resources/application.yml index 6eb9f0ab1..004869de7 100644 --- a/service/application/src/main/resources/application.yml +++ b/service/application/src/main/resources/application.yml @@ -4,393 +4,75 @@ springdoc: path: /docs/event-management-agent swagger-ui: path: /event-management-agent/swagger-ui.html - -logging: - log-in-json-format: false - level: - root: INFO - file: - name: /tmp/EMA.log - logback: - rolling-policy: - file-name-pattern: '${LOG_FILE}.%d{yyyy-MM-dd}.%i.gz' - max-file-size: 10MB - max-history: 7 - total-size-cap: 1GB - -# micrometer / statsd -management: - metrics: - tags: - # key - value pairs - maas_id: my-maas-id - enable: - # enable / disable specific metrics - all: false - # sample metrics to be exposed - application: - started: - time: true - jvm: - info: true - statsd: - metrics: - export: - # enable / disable shipping metrics to StatsD endpoint - enabled: false - flavor: datadog - host: 127.0.0.1 - port: 8125 - protocol: udp - server: port: 8180 - -idGenerator: - originId: event_management_agent_${EP_EVENT_MANAGEMENT_AGENT_ID} - spring: - datasource: - url: jdbc:h2:file:./data/cache;DB_CLOSE_ON_EXIT=FALSE - username: sa - password: password - driver-class-name: org.h2.Driver - - jpa: - database-platform: org.hibernate.dialect.H2Dialect - hibernate: - ddl-auto: create-drop - defer-datasource-initialization: true h2: console: path: /h2 enabled: true settings: web-allow-others: true + datasource: + password: password + driver-class-name: org.h2.Driver + username: sa + url: jdbc:h2:file:./data/cache;DB_CLOSE_ON_EXIT=FALSE + jpa: + defer-datasource-initialization: true + hibernate: + ddl-auto: create-drop + database-platform: org.hibernate.dialect.H2Dialect servlet: multipart: max-request-size: ${MAX_REQUEST_SIZE:5MB} max-file-size: ${MAX_FILE_SIZE:5MB} main: allow-bean-definition-overriding: true - camel: springboot: use-mdc-logging: true - errorHandling: - maximumRedeliveries: 10 - maximumRedeliveryDelay: 60000 - redeliveryDelay: 1000 - kafka: client: config: - connections: - timeout: - value: 60_000 + reconnections: + max-backoff: + unit: milliseconds + value: 1000 + backoff: unit: milliseconds + value: 50 + connections: max-idle: - value: 10_000 unit: milliseconds + value: 10000 request-timeout: - value: 5_000 - unit: milliseconds - reconnections: - backoff: - value: 50 unit: milliseconds - max-backoff: - value: 1_000 + value: 5000 + timeout: unit: milliseconds - + value: 60000 eventPortal: - runtimeAgentId: ${EP_RUNTIME_AGENT_ID:defaultAgentId} - organizationId: ${EP_ORGANIZATION_ID:defaultOrgId} - topicPrefix: ${EP_TOPIC_PREFIX:sc/ep/runtime} + organizationId: ${EP_ORGANIZATION_ID:localrcorg} + runtimeAgentId: ${EP_RUNTIME_AGENT_ID:3jczd7fk0xv} + incomingRequestQueueName: "ep_core_ema_requests_fakedcid" + managed: true gateway: - id: gateway - name: evmr1 + id: f0ydyzsr1jp + name: POCCloudManagedEMAEVMR messaging: - standalone: true + standalone: false rtoSession: false - enableHeartbeats: false - testHeartbeats: false + enableHeartbeats: true + testHeartbeats: true connections: - name: eventPortalGateway - authenticationType: ${EP_GATEWAY_AUTH} - url: ${EP_GATEWAY_URL} - msgVpn: ${EP_GATEWAY_MSGVPN} - trustStoreDir: ${SSL_STORE_DIR} + authenticationType: ${EP_GATEWAY_AUTH:basicAuthentication} + msgVpn: ${EP_GATEWAY_MSGVPN:poccloudmanagedemaevmr} + url: ${EP_GATEWAY_URL:tcps://mr-connection-i41xmmlismp.messaging.solace.cloud:55443} users: - name: messaging1 - username: ${EP_GATEWAY_USERNAME} - password: ${EP_GATEWAY_PASSWORD} - clientName: runtime_agent_${EP_RUNTIME_AGENT_ID} # 'client_' + runtimeAgentId - -# Below is an example of how to set up messaging service configuration in the application.yml file. - -#plugins: -# resources: -# # Confluent Schema Registry No Auth example -# - id: someConfluentId -# name: no auth confluent example -# type: confluent_schema_registry -# relatedServices: -# - id1 -# - id2 -# - id3 -# connections: -# - name: myConfluentNoAuth -# url: ${CONFLUENT_SCHEMA_REGISTRY_URL} -# # Confluent Schema Registry Basic Auth example -# - id: someConfluentId -# name: no auth confluent example -# type: confluent_schema_registry -# relatedServices: -# - id1 -# connections: -# - name: myConfluentBasicAuth -# url: ${CONFLUENT_SCHEMA_REGISTRY_URL} -# authentication: -# - protocol: BASIC -# credentials: -# - properties: -# - name: username -# value: ${CONFLUENT_USERNAME} -# - name: password -# value: ${CONFLUENT_PASSWORD} -# source: ENVIRONMENT_VARIABLE -# operations: -# - name: ALL -# # Confluent Schema Registry Basic Auth With SSL example -# - id: someConfluentId -# name: no auth confluent example -# type: confluent_schema_registry -# relatedServices: -# - id1 -# connections: -# - name: myConfluentBasicAuthWithSSL -# url: ${CONFLUENT_SCHEMA_REGISTRY_URL} -# authentication: -# - protocol: BASIC_SSL -# credentials: -# - properties: -# - name: username -# value: ${CONFLUENT_USERNAME} -# - name: password -# value: ${CONFLUENT_PASSWORD} -# - name: ssl.truststore.location -# value: ${TRUSTSTORE_LOCATION} -# - name: ssl.truststore.password -# value: ${TRUSTSTORE_PASSWORD} -# source: ENVIRONMENT_VARIABLE -# operations: -# - name: ALL -# # Confluent Schema Registry Basic Auth With mTLS example -# - id: someConfluentId -# name: Basic Auth With mTLS example -# type: confluent_schema_registry -# relatedServices: -# - id1 -# connections: -# - name: myConfluentBasicAuthWithMTLS -# url: ${CONFLUENT_SCHEMA_REGISTRY_URL} -# authentication: -# - protocol: BASIC_MTLS -# credentials: -# - properties: -# - name: username -# value: ${CONFLUENT_USERNAME} -# - name: password -# value: ${CONFLUENT_PASSWORD} -# - name: ssl.truststore.location -# value: ${TRUSTSTORE_LOCATION} -# - name: ssl.truststore.password -# value: ${TRUSTSTORE_PASSWORD} -# - name: ssl.keystore.location -# value: ${KEYSTORE_LOCATION} -# - name: ssl.keystore.password -# value: ${KEYSTORE_PASSWORD} -# - name: ssl.key.password -# value: ${KEY_PASSWORD} -# source: ENVIRONMENT_VARIABLE -# operations: -# - name: ALL -# # Confluent Schema Registry TLS example -# - id: someConfluentId -# name: no auth confluent example -# type: confluent_schema_registry -# relatedServices: -# - id1 -# connections: -# - name: myConfluentTLS -# url: ${CONFLUENT_SCHEMA_REGISTRY_URL} -# authentication: -# - protocol: TLS -# credentials: -# - properties: -# - name: ssl.truststore.location -# value: ${TRUSTSTORE_LOCATION} -# - name: ssl.truststore.password -# value: ${TRUSTSTORE_PASSWORD} -# source: ENVIRONMENT_VARIABLE -# operations: -# - name: ALL -# # Confluent Schema Registry MTLS example -# - id: someConfluentId -# name: no auth confluent example -# type: confluent_schema_registry -# relatedServices: -# - id1 -# connections: -# - name: myConfluentMTLS -# url: ${CONFLUENT_SCHEMA_REGISTRY_URL} -# authentication: -# - protocol: MTLS -# credentials: -# - properties: -# - name: ssl.truststore.location -# value: ${TRUSTSTORE_LOCATION} -# - name: ssl.truststore.password -# value: ${TRUSTSTORE_PASSWORD} -# - name: ssl.keystore.password -# value: ${KEYSTORE_PASSWORD} -# - name: ssl.keystore.location -# value: ${KEYSTORE_LOCATION} -# - name: ssl.key.password -# value: ${KEY_PASSWORD} -# source: ENVIRONMENT_VARIABLE -# operations: -# - name: ALL -# # Solace example -# - id: solaceDefaultService -# type: SOLACE -# name: staging service -# connections: -# - name: mySolaceServiceSemp -# url: ${SOLACE_SEMP_URL} -# authentication: -# - protocol: SEMP -# credentials: -# - source: ENVIRONMENT_VARIABLE -# operations: -# - name: ALL -# properties: -# - name: username -# value: ${SOLACE_SEMP_USERNAME} -# - name: password -# value: ${SOLACE_SEMP_PASSWORD} -# properties: -# - name: msgVpn -# value: ${SOLACE_VPN_NAME} -# - name: sempPageSize -# value: 100 -# # MTLS example -# - id: kafkaMTLSServiceExample -# name: mtls kafka cluster example -# type: KAFKA -# connections: -# - name: kafkaMTLSConnection -# url: ${KAFKA_BOOTSTRAP_SERVERS:kafka1:11091,kafka2:11092} -# authentication: -# - protocol: SSL -# credentials: -# - source: ENVIRONMENT_VARIABLE -# operations: -# - name: ALL -# properties: -# - name: ssl.truststore.location -# value: ${TRUSTSTORE_LOCATION} -# - name: ssl.truststore.password -# value: ${TRUSTSTORE_PASSWORD} -# - name: ssl.keystore.password -# value: ${KEYSTORE_PASSWORD} -# - name: ssl.keystore.location -# value: ${KEYSTORE_LOCATION} -# - name: ssl.key.password -# value: ${KEY_PASSWORD} -# # SASL SCRAM -# - id: kafkaSASLSCRAMServiceExample -# name: sasl scram kafka cluster example -# type: KAFKA -# connections: -# - name: kafkaSASLSCRAMConnection -# url: ${KAFKA_BOOTSTRAP_SERVERS:kafka1:14091,kafka2:14092} -# authentication: -# - protocol: SASL_SSL -# credentials: -# - source: ENVIRONMENT_VARIABLE -# operations: -# - name: ALL -# properties: -# - name: ssl.truststore.location -# value: ${TRUSTSTORE_LOCATION} -# - name: ssl.truststore.password -# value: ${TRUSTSTORE_PASSWORD} -# - name: sasl.jaas.config -# value: org.apache.kafka.common.security.scram.ScramLoginModule required username= password=; -# - name: sasl.mechanism -# value: SCRAM-SHA-256 -# # SASL Plain example -# - id: saslplain -# name: saslplain -# type: KAFKA -# connections: -# - name: saslplainconn -# url: ${KAFKA_BOOTSTRAP_SERVERS:kafka1:9091,kafka2:9092} -# authentication: -# - protocol: SASL_PLAINTEXT -# credentials: -# - source: ENVIRONMENT_VARIABLE -# operations: -# - name: ALL -# properties: -# - name: sasl.mechanism -# value: PLAIN -# - name: sasl.jaas.config -# value: org.apache.kafka.common.security.plain.PlainLoginModule required username=">" password=""; -# # SASL Plain over SSL -# - id: saslplainssl -# name: saslplainssl -# type: KAFKA -# connections: -# - name: saslplainsslconn -# url: ${KAFKA_BOOTSTRAP_SERVERS:kafka1:13091,kafka2:13092} -# authentication: -# - protocol: SASL_SSL -# credentials: -# - source: ENVIRONMENT_VARIABLE -# operations: -# - name: ALL -# properties: -# - name: ssl.truststore.location -# value: ${TRUSTSTORE_LOCATION} -# - name: sasl.mechanism -# value: PLAIN -# - name: sasl.jaas.config -# value: org.apache.kafka.common.security.plain.PlainLoginModule required username=">" password=""; -# # AWS IAM Example. -# # NOTE: Remember to set the aws access id and password in the environment or credentials! -# - id: iam -# name: iam -# type: KAFKA -# connections: -# - name: iamConn -# url: ${KAFKA_BOOTSTRAP_SERVERS:awsservers:9098} -# authentication: -# - protocol: SASL_SSL -# credentials: -# - source: ENVIRONMENT_VARIABLE -# operations: -# - name: ALL -# properties: -# - name: ssl.truststore.location -# value: ${TRUSTSTORE_LOCATION} -# - name: security.protocol -# value: SASL_SSL -# - name: sasl.mechanism -# value: AWS_MSK_IAM -# - name: sasl.jaas.config -# value: software.amazon.msk.auth.iam.IAMLoginModule required; -# - name: sasl.client.callback.handler.class -# value: software.amazon.msk.auth.iam.IAMClientCallbackHandler + password: UiDFfo0Cbj4}5H5vlp4FEvjI + username: ${EP_GATEWAY_USERNAME:localrcorg-v6693yiarsp-3jczd7fk0xv} + clientName: client_3jczd7fk0xv + topicPrefix: ${EP_TOPIC_PREFIX:sc/ep/runtime} \ No newline at end of file diff --git a/service/application/src/test/java/com/solace/maas/ep/event/management/agent/commandManager/CommandManagerTests.java b/service/application/src/test/java/com/solace/maas/ep/event/management/agent/commandManager/CommandManagerTests.java index 8a49b027f..ac0e0de47 100644 --- a/service/application/src/test/java/com/solace/maas/ep/event/management/agent/commandManager/CommandManagerTests.java +++ b/service/application/src/test/java/com/solace/maas/ep/event/management/agent/commandManager/CommandManagerTests.java @@ -139,7 +139,7 @@ void testMultiThreadedCommandManager() throws InterruptedException { CompletableFuture.runAsync(() -> commandManager.execute(messageList.get(i - 1)), testThreadPool)); // Wait for all the threads to complete (add a timeout just in case) - await().atMost(10, TimeUnit.SECONDS).until(() -> CommandManagerTestHelper.verifyCommandPublisherIsInvoked(commandPublisher, 1)); + await().atMost(10, TimeUnit.SECONDS).until(() -> CommandManagerTestHelper.verifyCommandPublisherIsInvoked(commandPublisher, commandThreadPoolQueueSize)); // Verify terraform manager is called ArgumentCaptor> envArgCaptor = ArgumentCaptor.forClass(Map.class); @@ -175,7 +175,7 @@ void failSendingResponseBackToEp() { commandManager.execute(message); // Wait for the command thread to complete - await().atMost(10, TimeUnit.SECONDS).until(() -> CommandManagerTestHelper.verifyCommandPublisherIsInvoked(commandPublisher, 1)); + await().atMost(10, TimeUnit.SECONDS).until(() -> CommandManagerTestHelper.verifyCommandPublisherIsInvoked(commandPublisher, 2)); ArgumentCaptor messageArgCaptor = ArgumentCaptor.forClass(CommandMessage.class); verify(commandPublisher, times(2)).sendCommandResponse(messageArgCaptor.capture(), any()); From f0e1733a80b44851a340d6b2bf4c0b72723ffe47 Mon Sep 17 00:00:00 2001 From: rudraneel-chakraborty Date: Sun, 7 Jul 2024 19:59:50 -0400 Subject: [PATCH 4/7] DATAGO-79772 wip --- .../management/agent/commandManager/CommandManagerTests.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/service/application/src/test/java/com/solace/maas/ep/event/management/agent/commandManager/CommandManagerTests.java b/service/application/src/test/java/com/solace/maas/ep/event/management/agent/commandManager/CommandManagerTests.java index ac0e0de47..a27ec7d07 100644 --- a/service/application/src/test/java/com/solace/maas/ep/event/management/agent/commandManager/CommandManagerTests.java +++ b/service/application/src/test/java/com/solace/maas/ep/event/management/agent/commandManager/CommandManagerTests.java @@ -139,7 +139,10 @@ void testMultiThreadedCommandManager() throws InterruptedException { CompletableFuture.runAsync(() -> commandManager.execute(messageList.get(i - 1)), testThreadPool)); // Wait for all the threads to complete (add a timeout just in case) - await().atMost(10, TimeUnit.SECONDS).until(() -> CommandManagerTestHelper.verifyCommandPublisherIsInvoked(commandPublisher, commandThreadPoolQueueSize)); + await().atMost(10, TimeUnit.SECONDS).until(() -> CommandManagerTestHelper.verifyCommandPublisherIsInvoked( + commandPublisher, + commandThreadPoolQueueSize + )); // Verify terraform manager is called ArgumentCaptor> envArgCaptor = ArgumentCaptor.forClass(Map.class); From 4a6d4e7807958ea74374aafad456ffd0893edeff Mon Sep 17 00:00:00 2001 From: rudraneel-chakraborty Date: Mon, 8 Jul 2024 10:59:27 -0400 Subject: [PATCH 5/7] DATAGO-79772 wip --- .../src/main/resources/application.yml | 392 ++++++++++++++++-- 1 file changed, 355 insertions(+), 37 deletions(-) diff --git a/service/application/src/main/resources/application.yml b/service/application/src/main/resources/application.yml index 004869de7..6eb9f0ab1 100644 --- a/service/application/src/main/resources/application.yml +++ b/service/application/src/main/resources/application.yml @@ -4,75 +4,393 @@ springdoc: path: /docs/event-management-agent swagger-ui: path: /event-management-agent/swagger-ui.html + +logging: + log-in-json-format: false + level: + root: INFO + file: + name: /tmp/EMA.log + logback: + rolling-policy: + file-name-pattern: '${LOG_FILE}.%d{yyyy-MM-dd}.%i.gz' + max-file-size: 10MB + max-history: 7 + total-size-cap: 1GB + +# micrometer / statsd +management: + metrics: + tags: + # key - value pairs + maas_id: my-maas-id + enable: + # enable / disable specific metrics + all: false + # sample metrics to be exposed + application: + started: + time: true + jvm: + info: true + statsd: + metrics: + export: + # enable / disable shipping metrics to StatsD endpoint + enabled: false + flavor: datadog + host: 127.0.0.1 + port: 8125 + protocol: udp + server: port: 8180 + +idGenerator: + originId: event_management_agent_${EP_EVENT_MANAGEMENT_AGENT_ID} + spring: - h2: - console: - path: /h2 - enabled: true - settings: - web-allow-others: true datasource: + url: jdbc:h2:file:./data/cache;DB_CLOSE_ON_EXIT=FALSE + username: sa password: password driver-class-name: org.h2.Driver - username: sa - url: jdbc:h2:file:./data/cache;DB_CLOSE_ON_EXIT=FALSE + jpa: - defer-datasource-initialization: true + database-platform: org.hibernate.dialect.H2Dialect hibernate: ddl-auto: create-drop - database-platform: org.hibernate.dialect.H2Dialect + defer-datasource-initialization: true + h2: + console: + path: /h2 + enabled: true + settings: + web-allow-others: true servlet: multipart: max-request-size: ${MAX_REQUEST_SIZE:5MB} max-file-size: ${MAX_FILE_SIZE:5MB} main: allow-bean-definition-overriding: true + camel: springboot: use-mdc-logging: true + errorHandling: + maximumRedeliveries: 10 + maximumRedeliveryDelay: 60000 + redeliveryDelay: 1000 + kafka: client: config: - reconnections: - max-backoff: - unit: milliseconds - value: 1000 - backoff: - unit: milliseconds - value: 50 connections: + timeout: + value: 60_000 + unit: milliseconds max-idle: + value: 10_000 unit: milliseconds - value: 10000 request-timeout: + value: 5_000 unit: milliseconds - value: 5000 - timeout: + reconnections: + backoff: + value: 50 + unit: milliseconds + max-backoff: + value: 1_000 unit: milliseconds - value: 60000 + eventPortal: - organizationId: ${EP_ORGANIZATION_ID:localrcorg} - runtimeAgentId: ${EP_RUNTIME_AGENT_ID:3jczd7fk0xv} - incomingRequestQueueName: "ep_core_ema_requests_fakedcid" - managed: true + runtimeAgentId: ${EP_RUNTIME_AGENT_ID:defaultAgentId} + organizationId: ${EP_ORGANIZATION_ID:defaultOrgId} + topicPrefix: ${EP_TOPIC_PREFIX:sc/ep/runtime} gateway: - id: f0ydyzsr1jp - name: POCCloudManagedEMAEVMR + id: gateway + name: evmr1 messaging: - standalone: false + standalone: true rtoSession: false - enableHeartbeats: true - testHeartbeats: true + enableHeartbeats: false + testHeartbeats: false connections: - name: eventPortalGateway - authenticationType: ${EP_GATEWAY_AUTH:basicAuthentication} - msgVpn: ${EP_GATEWAY_MSGVPN:poccloudmanagedemaevmr} - url: ${EP_GATEWAY_URL:tcps://mr-connection-i41xmmlismp.messaging.solace.cloud:55443} + authenticationType: ${EP_GATEWAY_AUTH} + url: ${EP_GATEWAY_URL} + msgVpn: ${EP_GATEWAY_MSGVPN} + trustStoreDir: ${SSL_STORE_DIR} users: - name: messaging1 - password: UiDFfo0Cbj4}5H5vlp4FEvjI - username: ${EP_GATEWAY_USERNAME:localrcorg-v6693yiarsp-3jczd7fk0xv} - clientName: client_3jczd7fk0xv - topicPrefix: ${EP_TOPIC_PREFIX:sc/ep/runtime} \ No newline at end of file + username: ${EP_GATEWAY_USERNAME} + password: ${EP_GATEWAY_PASSWORD} + clientName: runtime_agent_${EP_RUNTIME_AGENT_ID} # 'client_' + runtimeAgentId + +# Below is an example of how to set up messaging service configuration in the application.yml file. + +#plugins: +# resources: +# # Confluent Schema Registry No Auth example +# - id: someConfluentId +# name: no auth confluent example +# type: confluent_schema_registry +# relatedServices: +# - id1 +# - id2 +# - id3 +# connections: +# - name: myConfluentNoAuth +# url: ${CONFLUENT_SCHEMA_REGISTRY_URL} +# # Confluent Schema Registry Basic Auth example +# - id: someConfluentId +# name: no auth confluent example +# type: confluent_schema_registry +# relatedServices: +# - id1 +# connections: +# - name: myConfluentBasicAuth +# url: ${CONFLUENT_SCHEMA_REGISTRY_URL} +# authentication: +# - protocol: BASIC +# credentials: +# - properties: +# - name: username +# value: ${CONFLUENT_USERNAME} +# - name: password +# value: ${CONFLUENT_PASSWORD} +# source: ENVIRONMENT_VARIABLE +# operations: +# - name: ALL +# # Confluent Schema Registry Basic Auth With SSL example +# - id: someConfluentId +# name: no auth confluent example +# type: confluent_schema_registry +# relatedServices: +# - id1 +# connections: +# - name: myConfluentBasicAuthWithSSL +# url: ${CONFLUENT_SCHEMA_REGISTRY_URL} +# authentication: +# - protocol: BASIC_SSL +# credentials: +# - properties: +# - name: username +# value: ${CONFLUENT_USERNAME} +# - name: password +# value: ${CONFLUENT_PASSWORD} +# - name: ssl.truststore.location +# value: ${TRUSTSTORE_LOCATION} +# - name: ssl.truststore.password +# value: ${TRUSTSTORE_PASSWORD} +# source: ENVIRONMENT_VARIABLE +# operations: +# - name: ALL +# # Confluent Schema Registry Basic Auth With mTLS example +# - id: someConfluentId +# name: Basic Auth With mTLS example +# type: confluent_schema_registry +# relatedServices: +# - id1 +# connections: +# - name: myConfluentBasicAuthWithMTLS +# url: ${CONFLUENT_SCHEMA_REGISTRY_URL} +# authentication: +# - protocol: BASIC_MTLS +# credentials: +# - properties: +# - name: username +# value: ${CONFLUENT_USERNAME} +# - name: password +# value: ${CONFLUENT_PASSWORD} +# - name: ssl.truststore.location +# value: ${TRUSTSTORE_LOCATION} +# - name: ssl.truststore.password +# value: ${TRUSTSTORE_PASSWORD} +# - name: ssl.keystore.location +# value: ${KEYSTORE_LOCATION} +# - name: ssl.keystore.password +# value: ${KEYSTORE_PASSWORD} +# - name: ssl.key.password +# value: ${KEY_PASSWORD} +# source: ENVIRONMENT_VARIABLE +# operations: +# - name: ALL +# # Confluent Schema Registry TLS example +# - id: someConfluentId +# name: no auth confluent example +# type: confluent_schema_registry +# relatedServices: +# - id1 +# connections: +# - name: myConfluentTLS +# url: ${CONFLUENT_SCHEMA_REGISTRY_URL} +# authentication: +# - protocol: TLS +# credentials: +# - properties: +# - name: ssl.truststore.location +# value: ${TRUSTSTORE_LOCATION} +# - name: ssl.truststore.password +# value: ${TRUSTSTORE_PASSWORD} +# source: ENVIRONMENT_VARIABLE +# operations: +# - name: ALL +# # Confluent Schema Registry MTLS example +# - id: someConfluentId +# name: no auth confluent example +# type: confluent_schema_registry +# relatedServices: +# - id1 +# connections: +# - name: myConfluentMTLS +# url: ${CONFLUENT_SCHEMA_REGISTRY_URL} +# authentication: +# - protocol: MTLS +# credentials: +# - properties: +# - name: ssl.truststore.location +# value: ${TRUSTSTORE_LOCATION} +# - name: ssl.truststore.password +# value: ${TRUSTSTORE_PASSWORD} +# - name: ssl.keystore.password +# value: ${KEYSTORE_PASSWORD} +# - name: ssl.keystore.location +# value: ${KEYSTORE_LOCATION} +# - name: ssl.key.password +# value: ${KEY_PASSWORD} +# source: ENVIRONMENT_VARIABLE +# operations: +# - name: ALL +# # Solace example +# - id: solaceDefaultService +# type: SOLACE +# name: staging service +# connections: +# - name: mySolaceServiceSemp +# url: ${SOLACE_SEMP_URL} +# authentication: +# - protocol: SEMP +# credentials: +# - source: ENVIRONMENT_VARIABLE +# operations: +# - name: ALL +# properties: +# - name: username +# value: ${SOLACE_SEMP_USERNAME} +# - name: password +# value: ${SOLACE_SEMP_PASSWORD} +# properties: +# - name: msgVpn +# value: ${SOLACE_VPN_NAME} +# - name: sempPageSize +# value: 100 +# # MTLS example +# - id: kafkaMTLSServiceExample +# name: mtls kafka cluster example +# type: KAFKA +# connections: +# - name: kafkaMTLSConnection +# url: ${KAFKA_BOOTSTRAP_SERVERS:kafka1:11091,kafka2:11092} +# authentication: +# - protocol: SSL +# credentials: +# - source: ENVIRONMENT_VARIABLE +# operations: +# - name: ALL +# properties: +# - name: ssl.truststore.location +# value: ${TRUSTSTORE_LOCATION} +# - name: ssl.truststore.password +# value: ${TRUSTSTORE_PASSWORD} +# - name: ssl.keystore.password +# value: ${KEYSTORE_PASSWORD} +# - name: ssl.keystore.location +# value: ${KEYSTORE_LOCATION} +# - name: ssl.key.password +# value: ${KEY_PASSWORD} +# # SASL SCRAM +# - id: kafkaSASLSCRAMServiceExample +# name: sasl scram kafka cluster example +# type: KAFKA +# connections: +# - name: kafkaSASLSCRAMConnection +# url: ${KAFKA_BOOTSTRAP_SERVERS:kafka1:14091,kafka2:14092} +# authentication: +# - protocol: SASL_SSL +# credentials: +# - source: ENVIRONMENT_VARIABLE +# operations: +# - name: ALL +# properties: +# - name: ssl.truststore.location +# value: ${TRUSTSTORE_LOCATION} +# - name: ssl.truststore.password +# value: ${TRUSTSTORE_PASSWORD} +# - name: sasl.jaas.config +# value: org.apache.kafka.common.security.scram.ScramLoginModule required username= password=; +# - name: sasl.mechanism +# value: SCRAM-SHA-256 +# # SASL Plain example +# - id: saslplain +# name: saslplain +# type: KAFKA +# connections: +# - name: saslplainconn +# url: ${KAFKA_BOOTSTRAP_SERVERS:kafka1:9091,kafka2:9092} +# authentication: +# - protocol: SASL_PLAINTEXT +# credentials: +# - source: ENVIRONMENT_VARIABLE +# operations: +# - name: ALL +# properties: +# - name: sasl.mechanism +# value: PLAIN +# - name: sasl.jaas.config +# value: org.apache.kafka.common.security.plain.PlainLoginModule required username=">" password=""; +# # SASL Plain over SSL +# - id: saslplainssl +# name: saslplainssl +# type: KAFKA +# connections: +# - name: saslplainsslconn +# url: ${KAFKA_BOOTSTRAP_SERVERS:kafka1:13091,kafka2:13092} +# authentication: +# - protocol: SASL_SSL +# credentials: +# - source: ENVIRONMENT_VARIABLE +# operations: +# - name: ALL +# properties: +# - name: ssl.truststore.location +# value: ${TRUSTSTORE_LOCATION} +# - name: sasl.mechanism +# value: PLAIN +# - name: sasl.jaas.config +# value: org.apache.kafka.common.security.plain.PlainLoginModule required username=">" password=""; +# # AWS IAM Example. +# # NOTE: Remember to set the aws access id and password in the environment or credentials! +# - id: iam +# name: iam +# type: KAFKA +# connections: +# - name: iamConn +# url: ${KAFKA_BOOTSTRAP_SERVERS:awsservers:9098} +# authentication: +# - protocol: SASL_SSL +# credentials: +# - source: ENVIRONMENT_VARIABLE +# operations: +# - name: ALL +# properties: +# - name: ssl.truststore.location +# value: ${TRUSTSTORE_LOCATION} +# - name: security.protocol +# value: SASL_SSL +# - name: sasl.mechanism +# value: AWS_MSK_IAM +# - name: sasl.jaas.config +# value: software.amazon.msk.auth.iam.IAMLoginModule required; +# - name: sasl.client.callback.handler.class +# value: software.amazon.msk.auth.iam.IAMClientCallbackHandler From 294fff0de5418eb5a2f8f44b208a7cd567967d9a Mon Sep 17 00:00:00 2001 From: rudraneel-chakraborty Date: Mon, 8 Jul 2024 16:24:19 -0400 Subject: [PATCH 6/7] DATAGO-79772 wip --- .../agent/plugin/terraform/manager/TerraformManager.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/service/terraform-plugin/src/main/java/com/solace/maas/ep/event/management/agent/plugin/terraform/manager/TerraformManager.java b/service/terraform-plugin/src/main/java/com/solace/maas/ep/event/management/agent/plugin/terraform/manager/TerraformManager.java index b87d2b9ce..23a205df7 100644 --- a/service/terraform-plugin/src/main/java/com/solace/maas/ep/event/management/agent/plugin/terraform/manager/TerraformManager.java +++ b/service/terraform-plugin/src/main/java/com/solace/maas/ep/event/management/agent/plugin/terraform/manager/TerraformManager.java @@ -39,8 +39,6 @@ public class TerraformManager { private final TerraformClientFactory terraformClientFactory; - private boolean isManagedAgent; - public TerraformManager(TerraformLogProcessingService terraformLogProcessingService, TerraformProperties terraformProperties, TerraformClientFactory terraformClientFactory) { From ad6d884d325a76343bab039035744ca9e61795eb Mon Sep 17 00:00:00 2001 From: rudraneel-chakraborty Date: Wed, 10 Jul 2024 09:08:57 -0400 Subject: [PATCH 7/7] DATAGO-79772 PR review comment --- .../agent/plugin/terraform/manager/TerraformManager.java | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/service/terraform-plugin/src/main/java/com/solace/maas/ep/event/management/agent/plugin/terraform/manager/TerraformManager.java b/service/terraform-plugin/src/main/java/com/solace/maas/ep/event/management/agent/plugin/terraform/manager/TerraformManager.java index 23a205df7..73a789225 100644 --- a/service/terraform-plugin/src/main/java/com/solace/maas/ep/event/management/agent/plugin/terraform/manager/TerraformManager.java +++ b/service/terraform-plugin/src/main/java/com/solace/maas/ep/event/management/agent/plugin/terraform/manager/TerraformManager.java @@ -7,7 +7,6 @@ import com.solace.maas.ep.event.management.agent.plugin.command.model.CommandResult; import com.solace.maas.ep.event.management.agent.plugin.command.model.JobStatus; import com.solace.maas.ep.event.management.agent.plugin.constants.RouteConstants; -import com.solace.maas.ep.event.management.agent.plugin.messagingService.event.EventProperty; import com.solace.maas.ep.event.management.agent.plugin.terraform.client.TerraformClient; import com.solace.maas.ep.event.management.agent.plugin.terraform.client.TerraformClientFactory; import com.solace.maas.ep.event.management.agent.plugin.terraform.configuration.TerraformProperties; @@ -38,7 +37,6 @@ public class TerraformManager { private final TerraformProperties terraformProperties; private final TerraformClientFactory terraformClientFactory; - public TerraformManager(TerraformLogProcessingService terraformLogProcessingService, TerraformProperties terraformProperties, TerraformClientFactory terraformClientFactory) { @@ -47,9 +45,7 @@ public TerraformManager(TerraformLogProcessingService terraformLogProcessingServ this.terraformClientFactory = terraformClientFactory; } - public Path execute(CommandRequest request, - Command command, - Map envVars) { + public Path execute(CommandRequest request, Command command, Map envVars) { MDC.put(RouteConstants.COMMAND_CORRELATION_ID, request.getCommandCorrelationId()); MDC.put(RouteConstants.MESSAGING_SERVICE_ID, request.getServiceId()); setEnvVarsFromParameters(command, envVars);