Skip to content

Commit

Permalink
DATAGO-64297: Added message subscriber/publisher (#134)
Browse files Browse the repository at this point in the history
  • Loading branch information
gregmeldrum authored Dec 1, 2023
1 parent a96d82d commit 9f100aa
Show file tree
Hide file tree
Showing 16 changed files with 330 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public CommandMessage(String serviceId,
List<CommandBundle> commandBundles) {
super();
withMessageType(MOPMessageType.generic)
.withProtocol(MOPProtocol.commandProtocol)
.withProtocol(MOPProtocol.epConfigPush)
.withVersion("1")
.withUhFlag(MOPUHFlag.ignore);
this.serviceId = serviceId;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
package com.solace.maas.ep.event.management.agent.command;

import com.solace.maas.ep.common.messages.CommandMessage;
import com.solace.maas.ep.event.management.agent.command.mapper.CommandMapper;
import com.solace.maas.ep.event.management.agent.config.eventPortal.EventPortalProperties;
import com.solace.maas.ep.event.management.agent.plugin.command.model.Command;
import com.solace.maas.ep.event.management.agent.plugin.command.model.CommandBundle;
import com.solace.maas.ep.event.management.agent.plugin.command.model.CommandRequest;
import com.solace.maas.ep.event.management.agent.plugin.service.MessagingServiceDelegateService;
import com.solace.maas.ep.event.management.agent.plugin.solace.processor.semp.SempClient;
import com.solace.maas.ep.event.management.agent.plugin.solace.processor.semp.SolaceHttpSemp;
import com.solace.maas.ep.event.management.agent.plugin.terraform.manager.TerraformManager;
import com.solace.maas.ep.event.management.agent.publisher.CommandPublisher;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

Expand All @@ -17,28 +20,42 @@
@Service
public class CommandManager {
private final TerraformManager terraformManager;

private final CommandMapper commandMapper;
private final CommandPublisher commandPublisher;
private final MessagingServiceDelegateService messagingServiceDelegateService;
private final EventPortalProperties eventPortalProperties;

public CommandManager(TerraformManager terraformManager, MessagingServiceDelegateService messagingServiceDelegateService) {
public CommandManager(TerraformManager terraformManager, CommandMapper commandMapper,
CommandPublisher commandPublisher, MessagingServiceDelegateService messagingServiceDelegateService,
EventPortalProperties eventPortalProperties) {
this.terraformManager = terraformManager;
this.commandMapper = commandMapper;
this.commandPublisher = commandPublisher;
this.messagingServiceDelegateService = messagingServiceDelegateService;
this.eventPortalProperties = eventPortalProperties;
}

public void execute(CommandRequest request) {
public void execute(CommandMessage request) {
Map<String, String> envVars = setBrokerSpecificEnvVars(request.getServiceId());
for (CommandBundle bundle : request.getCommandBundles()) {
// For now everything is run serially
for (Command command : bundle.getCommands()) {
switch (command.getCommandType()) {
case terraform:
terraformManager.execute(request, command, envVars);
terraformManager.execute(commandMapper.map(request), command, envVars);
break;
default:
throw new IllegalStateException("Unexpected value: " + command.getCommandType());
}
}
}

Map<String, String> topicVars = Map.of(
"orgId", request.getOrgId(),
"runtimeAgentId", eventPortalProperties.getRuntimeAgentId(),
"correlationId", request.getCorrelationId()
);
commandPublisher.sendCommandResponse(request, topicVars);
}

private Map<String, String> setBrokerSpecificEnvVars(String messagingServiceId) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package com.solace.maas.ep.event.management.agent.command.mapper;

import com.solace.maas.ep.common.messages.CommandMessage;
import com.solace.maas.ep.event.management.agent.plugin.command.model.CommandRequest;
import org.mapstruct.Mapper;

@Mapper(componentModel = "spring")
public interface CommandMapper {

CommandRequest map(CommandMessage input);

CommandMessage map(CommandRequest input);
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package com.solace.maas.ep.event.management.agent.publisher;

import com.solace.maas.ep.event.management.agent.config.SolaceConfiguration;
import com.solace.maas.ep.event.management.agent.plugin.mop.MOPMessage;
import com.solace.maas.ep.event.management.agent.plugin.publisher.SolacePublisher;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;

import java.util.Map;

@Component
@ConditionalOnProperty(name = "event-portal.gateway.messaging.standalone", havingValue = "false")
public class CommandPublisher {

private final SolacePublisher solacePublisher;
private final SolaceConfiguration solaceConfiguration;

public CommandPublisher(SolacePublisher solacePublisher, SolaceConfiguration solaceConfiguration) {
this.solacePublisher = solacePublisher;
this.solaceConfiguration = solaceConfiguration;
}

/**
* Sends the command response to EP.
* <p>
* The topic for command response:
* sc/ep/runtime/{orgId}/{runtimeAgentId}/commandResponse/v1/{correlationId}
*/

public void sendCommandResponse(MOPMessage message, Map<String, String> topicDetails) {

String topicString =
String.format("%scommandResponse/v1/%s",
solaceConfiguration.getTopicPrefix(),
topicDetails.get("correlationId"));

solacePublisher.publish(message, topicString);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package com.solace.maas.ep.event.management.agent.subscriber;

import com.solace.maas.ep.common.messages.CommandMessage;
import com.solace.maas.ep.event.management.agent.command.CommandManager;
import com.solace.maas.ep.event.management.agent.config.SolaceConfiguration;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;

@Slf4j
@Component
@ConditionalOnProperty(name = "event-portal.gateway.messaging.standalone", havingValue = "false")
public class CommandMessageHandler extends SolaceMessageHandler<CommandMessage> {

private final CommandManager commandManager;

public CommandMessageHandler(
SolaceConfiguration solaceConfiguration,
SolaceSubscriber solaceSubscriber, CommandManager commandManager) {
super(solaceConfiguration.getTopicPrefix() + "command/v1/>", solaceSubscriber);
this.commandManager = commandManager;
}

@Override
public void receiveMessage(String destinationName, CommandMessage message) {
log.debug("receiveMessage {}\n{}", destinationName, message);
commandManager.execute(message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.jboss.logging.MDC;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

@Slf4j
Expand Down Expand Up @@ -69,19 +70,7 @@ public void onMessage(InboundMessage inboundMessage) {

String receivedClassName = messageClass.getSimpleName();

if ("ScanCommandMessage".equals(receivedClassName) || "ScanDataImportMessage".equals(receivedClassName)) {
Map<String, Object> map = objectMapper.readValue(messageAsString, Map.class);
String scanId = (String) map.get("scanId");
String traceId = (String) map.get("traceId");
String actorId = (String) map.get("actorId");
String messagingServiceId = (String) map.get("messagingServiceId");

MDC.clear();
MDC.put(RouteConstants.SCAN_ID, scanId);
MDC.put(RouteConstants.TRACE_ID, traceId);
MDC.put(RouteConstants.ACTOR_ID, actorId);
MDC.put(RouteConstants.MESSAGING_SERVICE_ID, messagingServiceId);
}
setupMDC(messageAsString, receivedClassName);

message = (T) objectMapper.readValue(messageAsString, messageClass);
log.trace("onMessage: {}\n{}", messageClass, messageAsString);
Expand All @@ -92,5 +81,26 @@ public void onMessage(InboundMessage inboundMessage) {
}
}

private static void setupMDC(String messageAsString, String receivedClassName) throws JsonProcessingException {
List<String> scanClassNames = List.of("ScanCommandMessage", "ScanDataImportMessage");
List<String> commandClassNames = List.of("CommandMessage");

Map<String, Object> map = objectMapper.readValue(messageAsString, Map.class);

MDC.clear();
MDC.put(RouteConstants.TRACE_ID, map.get("traceId"));
MDC.put(RouteConstants.ACTOR_ID, map.get("actorId"));

if (scanClassNames.contains(receivedClassName)) {
MDC.put(RouteConstants.SCAN_ID, map.get("scanId"));
MDC.put(RouteConstants.MESSAGING_SERVICE_ID, map.get("messagingServiceId"));
}

if (commandClassNames.contains(receivedClassName)) {
MDC.put(RouteConstants.COMMAND_CORRELATION_ID, map.get("correlationId"));
MDC.put(RouteConstants.MESSAGING_SERVICE_ID, map.get("serviceId"));
}
}

public abstract void receiveMessage(String destinationName, T message);
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@
import com.solace.maas.ep.event.management.agent.plugin.manager.client.kafkaClient.KafkaClientReconnection;
import com.solace.maas.ep.event.management.agent.plugin.manager.client.kafkaClient.KafkaClientReconnectionConfig;
import com.solace.maas.ep.event.management.agent.plugin.messagingService.RtoMessageBuilder;
import com.solace.maas.ep.event.management.agent.plugin.service.MessagingServiceDelegateService;
import com.solace.maas.ep.event.management.agent.plugin.terraform.manager.TerraformManager;
import com.solace.maas.ep.event.management.agent.plugin.vmr.VmrProcessor;
import com.solace.maas.ep.event.management.agent.publisher.CommandPublisher;
import com.solace.maas.ep.event.management.agent.testConfigs.MessagingServiceTestConfig;
import com.solace.maas.ep.event.management.agent.testConfigs.PublisherTestConfig;
import com.solace.maas.ep.event.management.agent.util.IDGenerator;
Expand Down Expand Up @@ -79,6 +82,18 @@ public VmrProcessor getVmrProcessor() {
return processor;
}

@Bean
@Primary
public MessagingServiceDelegateService getMessagingServiceDelegateService() {
return mock(MessagingServiceDelegateService.class);
}

@Bean
@Primary
public TerraformManager getTerraformManager() {
return mock(TerraformManager.class);
}

@Bean
@Primary
public OutboundMessageBuilder outboundMessageBuilder() {
Expand Down Expand Up @@ -119,6 +134,12 @@ public IDGenerator idGenerator() {
return idGenerator;
}

@Bean
@Primary
public CommandPublisher getCommandPublisher() {
return mock(CommandPublisher.class);
}

@Bean
@Primary
KafkaClientConnection kafkaClientConnection() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package com.solace.maas.ep.event.management.agent.commandManager;

import com.solace.maas.ep.common.messages.CommandMessage;
import com.solace.maas.ep.event.management.agent.TestConfig;
import com.solace.maas.ep.event.management.agent.command.CommandManager;
import com.solace.maas.ep.event.management.agent.config.eventPortal.EventPortalProperties;
import com.solace.maas.ep.event.management.agent.plugin.command.model.Command;
import com.solace.maas.ep.event.management.agent.plugin.command.model.CommandBundle;
import com.solace.maas.ep.event.management.agent.plugin.command.model.CommandType;
import com.solace.maas.ep.event.management.agent.plugin.command.model.ExecutionType;
import com.solace.maas.ep.event.management.agent.plugin.mop.MOPSvcType;
import com.solace.maas.ep.event.management.agent.plugin.service.MessagingServiceDelegateService;
import com.solace.maas.ep.event.management.agent.plugin.solace.processor.semp.SempClient;
import com.solace.maas.ep.event.management.agent.plugin.solace.processor.semp.SolaceHttpSemp;
import com.solace.maas.ep.event.management.agent.plugin.terraform.manager.TerraformManager;
import com.solace.maas.ep.event.management.agent.publisher.CommandPublisher;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ActiveProfiles;

import java.util.List;
import java.util.Map;

import static com.solace.maas.ep.event.management.agent.plugin.mop.MOPMessageType.generic;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

@ActiveProfiles("TEST")
@EnableAutoConfiguration
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, classes = TestConfig.class)
public class CommandManagerTests {

@Autowired
CommandManager commandManager;

@Autowired
TerraformManager terraformManager;

@Autowired
CommandPublisher commandPublisher;

@Autowired
MessagingServiceDelegateService messagingServiceDelegateService;

@Autowired
EventPortalProperties eventPortalProperties;

@Test
public void testCommandManager() {
// Create a command request message
CommandMessage message = new CommandMessage();
message.setOrigType(MOPSvcType.maasEventMgmt);
message.withMessageType(generic);
message.setContext("abc");
message.setActorId("myActorId");
message.setOrgId(eventPortalProperties.getOrganizationId());
message.setTraceId("myTraceId");
message.setCorrelationId("myCorrelationId");
message.setCommandBundles(List.of(
CommandBundle.builder()
.executionType(ExecutionType.serial)
.exitOnFailure(false)
.commands(List.of(
Command.builder()
.commandType(CommandType.terraform)
.body("asdfasdfadsf")
.command("apply")
.build()))
.build()));

doNothing().when(terraformManager).execute(any(), any(), any());

ArgumentCaptor<Map<String, String>> topicArgCaptor = ArgumentCaptor.forClass(Map.class);
doNothing().when(commandPublisher).sendCommandResponse(any(), any());
when(messagingServiceDelegateService.getMessagingServiceClient(any())).thenReturn(
new SolaceHttpSemp(SempClient.builder()
.username("myUsername")
.password("myPassword")
.connectionUrl("myConnectionUrl")
.build()));
commandManager.execute(message);

// Verify terraform manager is called
ArgumentCaptor<Map<String, String>> envArgCaptor = ArgumentCaptor.forClass(Map.class);
verify(terraformManager, times(1)).execute(any(), any(), envArgCaptor.capture());

// Verify the env vars are set with the terraform manager is called
Map<String, String> envVars = envArgCaptor.getValue();
assert envVars.get("TF_VAR_password").equals("myPassword");
assert envVars.get("TF_VAR_username").equals("myUsername");
assert envVars.get("TF_VAR_url").equals("myConnectionUrl");

verify(commandPublisher, times(1)).sendCommandResponse(any(), topicArgCaptor.capture());

Map<String, String> topicVars = topicArgCaptor.getValue();
assert topicVars.get("orgId").equals(eventPortalProperties.getOrganizationId());
assert topicVars.get("runtimeAgentId").equals(eventPortalProperties.getRuntimeAgentId());
assert topicVars.get("correlationId").equals(message.getCorrelationId());
}
}
Loading

0 comments on commit 9f100aa

Please sign in to comment.