Skip to content

Commit

Permalink
Merge pull request #191 from SolaceProducts/DATAGO-79772-no-log-strea…
Browse files Browse the repository at this point in the history
…ming-to-ep-for-cema

DATAGO-79772: Cloud managed EMA must not stream configPush logs to EP Core
  • Loading branch information
rudraneel-chakraborty authored Jul 10, 2024
2 parents cf1a513 + ad6d884 commit 9573772
Show file tree
Hide file tree
Showing 9 changed files with 363 additions and 187 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,15 @@
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.OffsetDateTime;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;

import static com.solace.maas.ep.event.management.agent.constants.Command.COMMAND_CORRELATION_ID;
Expand All @@ -45,14 +48,14 @@ public class CommandManager {
private final MessagingServiceDelegateService messagingServiceDelegateService;
private final EventPortalProperties eventPortalProperties;
private final ThreadPoolTaskExecutor configPushPool;
private final CommandLogStreamingProcessor commandLogStreamingProcessor;
private final Optional<CommandLogStreamingProcessor> commandLogStreamingProcessorOpt;

public CommandManager(TerraformManager terraformManager,
CommandMapper commandMapper,
CommandPublisher commandPublisher,
MessagingServiceDelegateService messagingServiceDelegateService,
EventPortalProperties eventPortalProperties,
CommandLogStreamingProcessor commandLogStreamingProcessor) {
Optional<CommandLogStreamingProcessor> commandLogStreamingProcessorOpt) {
this.terraformManager = terraformManager;
this.commandMapper = commandMapper;
this.commandPublisher = commandPublisher;
Expand All @@ -65,7 +68,7 @@ public CommandManager(TerraformManager terraformManager,
configPushPool.setThreadNamePrefix("config-push-pool-");
configPushPool.setTaskDecorator(new MdcTaskDecorator());
configPushPool.initialize();
this.commandLogStreamingProcessor = commandLogStreamingProcessor;
this.commandLogStreamingProcessorOpt = commandLogStreamingProcessorOpt;
}

public void execute(CommandMessage request) {
Expand Down Expand Up @@ -106,7 +109,10 @@ public void configPush(CommandRequest request) {
for (Command command : bundle.getCommands()) {
Path executionLog = executeCommand(request, command, envVars);
if (executionLog != null) {
streamCommandExecutionLogToEpCore(request, command, executionLog);
if (commandLogStreamingProcessorOpt.isPresent()) {
streamCommandExecutionLogToEpCore(request, command, executionLog);
}

executionLogFilesToClean.add(executionLog);
}
if (exitEarlyOnFailedCommand(bundle, command)) {
Expand Down Expand Up @@ -153,17 +159,41 @@ private Path executeCommand(CommandRequest request,

private void cleanup(List<Path> listOfExecutionLogFiles) {
try {
commandLogStreamingProcessor.deleteExecutionLogFiles(listOfExecutionLogFiles);
deleteExecutionLogFiles(listOfExecutionLogFiles);
} catch (Exception e) {
log.error("Error while deleting execution log.", e);
}
}

private void streamCommandExecutionLogToEpCore(CommandRequest request, Command command, Path executionLog) {
public void deleteExecutionLogFiles(List<Path> listOfExecutionLogFiles) {
boolean allFilesDeleted = listOfExecutionLogFiles
.stream()
.allMatch(this::deleteExecutionLogFile);
if (!allFilesDeleted) {
throw new IllegalArgumentException("Some of the execution log files were not deleted. Please check the logs");
}
}

private boolean deleteExecutionLogFile(Path path) {
try {
if (Files.exists(path)) {
Files.delete(path);
}
} catch (IOException e) {
log.warn("Error while deleting execution log at {}", path, e);
return false;
}
return true;
}

public void streamCommandExecutionLogToEpCore(CommandRequest request, Command command, Path executionLog) {
if (commandLogStreamingProcessorOpt.isEmpty()) {
throw new UnsupportedOperationException("Streaming logs to ep is not supported for this event management agent type");
}
try {
commandLogStreamingProcessor.streamLogsToEP(request, command, executionLog);
commandLogStreamingProcessorOpt.get().streamLogsToEP(request, command, executionLog);
} catch (Exception e) {
log.error("Error sending logs to ep-core for command with commandCorrelationId",
log.error("Error sending logs to ep-core for command with commandCorrelationId {}",
request.getCommandCorrelationId(), e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,9 @@
import com.solace.maas.ep.event.management.agent.publisher.CommandLogsPublisher;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.MDC;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Instant;
Expand All @@ -33,7 +32,7 @@

@Component
@Slf4j
@ConditionalOnProperty(name = "event-portal.gateway.messaging.standalone", havingValue = "false")
@ConditionalOnExpression("${event-portal.gateway.messaging.standalone:false}== false && ${event-portal.managed:false} == false")
public class CommandLogStreamingProcessor {

public static final String ANY = "*";
Expand All @@ -50,28 +49,6 @@ public CommandLogStreamingProcessor(CommandLogsPublisher commandLogsPublisher,
}


public void deleteExecutionLogFiles(List<Path> listOfExecutionLogFiles) {
boolean allFilesDeleted = listOfExecutionLogFiles
.stream()
.allMatch(this::deleteExecutionLogFile);
if (!allFilesDeleted) {
throw new IllegalArgumentException("Some of the execution log files were not deleted. Please check the logs");
}
}

private boolean deleteExecutionLogFile(Path path) {
try {
if (Files.exists(path)) {
Files.delete(path);
}
} catch (IOException e) {
log.warn("Error while deleting execution log at {}", path, e);
return false;
}
return true;
}


public void streamLogsToEP(CommandRequest request, Command executedCommand, Path commandExecutionLog) {

if (executedCommand.getIgnoreResult()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.solace.maas.ep.event.management.agent;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.solace.maas.ep.event.management.agent.command.CommandManager;
import com.solace.maas.ep.event.management.agent.command.mapper.CommandMapper;
import com.solace.maas.ep.event.management.agent.config.SolaceConfiguration;
Expand Down Expand Up @@ -31,13 +30,13 @@
import org.apache.camel.ProducerTemplate;
import org.apache.camel.support.DefaultExchange;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Import;
import org.springframework.context.annotation.Primary;
import org.springframework.context.annotation.Profile;

import java.util.Optional;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -161,29 +160,14 @@ public CommandLogsPublisher getComaCommandLogsPublisher() {
return mock(CommandLogsPublisher.class);
}

@Bean
@Qualifier("realCommandLogStreamingProcessor")
public CommandLogStreamingProcessor realCommandLogStreamingProcessor(CommandLogsPublisher commandLogsPublisher,
EventPortalProperties eventPortalProperties,
ObjectMapper objectMapper) {
return new CommandLogStreamingProcessor(commandLogsPublisher, eventPortalProperties, objectMapper);
}

@Bean
@Qualifier("mockedCommandLogStreamingProcessor")
@Primary
public CommandLogStreamingProcessor mockedCommandLogStreamingProcessor() {
return mock(CommandLogStreamingProcessor.class);
}

@Bean
@Primary
public CommandManager getCommandManager(TerraformManager terraformManager,
CommandMapper commandMapper,
CommandPublisher commandPublisher,
MessagingServiceDelegateService messagingServiceDelegateService,
EventPortalProperties eventPortalProperties,
CommandLogStreamingProcessor commandLogStreamingProcessor) {
Optional<CommandLogStreamingProcessor> commandLogStreamingProcessor) {
return new CommandManager(
terraformManager,
commandMapper,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,15 @@ void testDirectMessageHandlerBeansAreNotLoaded() {
);

}

@Test
void testCommandLogStreamingProcessorBeanIsNotLoaded() {
String[] allBeanNames = applicationContext.getBeanDefinitionNames();
assertThat(
Arrays.stream(allBeanNames)
.map(StringUtils::lowerCase)
.collect(Collectors.toSet()))
.doesNotContain(StringUtils.lowerCase("commandLogStreamingProcessor"));

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,4 +55,14 @@ void testDirectMessageHandlerBeansAreLoaded() {
);

}

@Test
void testCommandLogStreamingProcessorBeanIsLoaded() {
String[] allBeanNames = applicationContext.getBeanDefinitionNames();
assertThat(
Arrays.stream(allBeanNames)
.map(StringUtils::lowerCase)
.collect(Collectors.toSet())).contains(StringUtils.lowerCase("commandLogStreamingProcessor"));

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package com.solace.maas.ep.event.management.agent.commandManager;

import com.solace.maas.ep.common.messages.CommandMessage;
import com.solace.maas.ep.event.management.agent.command.CommandManager;
import com.solace.maas.ep.event.management.agent.config.eventPortal.EventPortalProperties;
import com.solace.maas.ep.event.management.agent.plugin.command.model.Command;
import com.solace.maas.ep.event.management.agent.plugin.command.model.JobStatus;
import com.solace.maas.ep.event.management.agent.plugin.service.MessagingServiceDelegateService;
import com.solace.maas.ep.event.management.agent.plugin.solace.processor.semp.SempClient;
import com.solace.maas.ep.event.management.agent.plugin.solace.processor.semp.SolaceHttpSemp;
import com.solace.maas.ep.event.management.agent.plugin.terraform.manager.TerraformManager;
import com.solace.maas.ep.event.management.agent.publisher.CommandPublisher;
import lombok.extern.slf4j.Slf4j;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.mockito.ArgumentCaptor;
import org.mockito.stubbing.Answer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.mock.mockito.SpyBean;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.ActiveProfiles;

import java.nio.file.Path;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import static org.awaitility.Awaitility.await;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.springframework.test.annotation.DirtiesContext.ClassMode.BEFORE_EACH_TEST_METHOD;

@Slf4j
@ActiveProfiles("TEST")
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, properties = {
"eventPortal.gateway.messaging.standalone=false",
"eventPortal.managed=true",
"eventPortal.incomingRequestQueueName = ep_core_ema_requests_123456_123123",
"event-portal.gateway.messaging.rto-session=false"
})
@DirtiesContext(classMode = BEFORE_EACH_TEST_METHOD)
//CPD-OFF
class CloudManagedEMACommandManagerTests {

@SpyBean
private CommandManager commandManager;

@Autowired
private TerraformManager terraformManager;

@Autowired
private CommandPublisher commandPublisher;

@Autowired
private MessagingServiceDelegateService messagingServiceDelegateService;

@Autowired
private EventPortalProperties eventPortalProperties;

private static final String MESSAGING_SERVICE_ID = "myMessagingServiceId";

private ArgumentCaptor<List<Path>> executionLogFileCaptor;
private ArgumentCaptor<Map<String, String>> topicArgCaptor;
private ArgumentCaptor<Map<String, String>> envArgCaptor;
private ArgumentCaptor<CommandMessage> responseCaptor;

private CommandMessage message;

@BeforeEach
void setUp() {
message = CommandManagerTestHelper.buildCommandMessageForConfigPush(eventPortalProperties.getOrganizationId(), MESSAGING_SERVICE_ID);
executionLogFileCaptor = ArgumentCaptor.forClass(List.class);
topicArgCaptor = ArgumentCaptor.forClass(Map.class);
doNothing().when(commandPublisher).sendCommandResponse(any(), any());
when(messagingServiceDelegateService.getMessagingServiceClient(any())).thenReturn(
new SolaceHttpSemp(SempClient.builder()
.username("myUsername")
.password("myPassword")
.connectionUrl("myConnectionUrl")
.build()));
envArgCaptor = ArgumentCaptor.forClass(Map.class);
responseCaptor = ArgumentCaptor.forClass(CommandMessage.class);
}

@Test
void noLogsStreamingToEP(@TempDir Path basePath) {
doAnswer((Answer<Path>) invocation -> {
Command command = (Command) invocation.getArgument(1);
return CommandManagerTestHelper.setCommandStatusAndReturnExecutionLog(command, JobStatus.success, true, basePath);
}).when(terraformManager).execute(any(), any(), any());

commandManager.execute(message);

// Wait for the command thread to complete
await().atMost(5, TimeUnit.SECONDS).until(() -> CommandManagerTestHelper.verifyCommandPublisherIsInvoked(commandPublisher, 1));

verify(terraformManager, times(4)).execute(any(), any(), envArgCaptor.capture());
verify(commandPublisher, times(1)).sendCommandResponse(responseCaptor.capture(), topicArgCaptor.capture());

//Logs will not be streamed as EMA is cloud managed
verify(commandManager, times(0)).streamCommandExecutionLogToEpCore(any(), any(), any());

//Logs will be cleaned up anyway
verify(commandManager, times(1)).deleteExecutionLogFiles(executionLogFileCaptor.capture());
Assertions.assertThat(executionLogFileCaptor.getValue())
.containsExactlyInAnyOrder(
basePath.resolve("apply"),
basePath.resolve("write_HCL"),
basePath.resolve("write_HCL"),
basePath.resolve("sync"));
}
}
Loading

0 comments on commit 9573772

Please sign in to comment.