diff --git a/minion-gateway/main/src/main/java/org/opennms/miniongateway/grpc/server/flows/FlowKafkaForwarder.java b/minion-gateway/main/src/main/java/org/opennms/miniongateway/grpc/server/flows/FlowKafkaForwarder.java index 78a6433831..a3c54464f8 100644 --- a/minion-gateway/main/src/main/java/org/opennms/miniongateway/grpc/server/flows/FlowKafkaForwarder.java +++ b/minion-gateway/main/src/main/java/org/opennms/miniongateway/grpc/server/flows/FlowKafkaForwarder.java @@ -28,20 +28,15 @@ package org.opennms.miniongateway.grpc.server.flows; -import lombok.RequiredArgsConstructor; -import org.apache.kafka.clients.producer.ProducerRecord; import org.opennms.horizon.flows.document.FlowDocumentLog; import org.opennms.horizon.flows.document.TenantLocationSpecificFlowDocumentLog; import org.opennms.horizon.shared.flows.mapper.TenantLocationSpecificFlowDocumentLogMapper; -import org.opennms.horizon.shared.grpc.common.LocationServerInterceptor; -import org.opennms.horizon.shared.grpc.common.TenantIDGrpcServerInterceptor; import org.opennms.horizon.shared.ipc.sink.api.MessageConsumer; import org.opennms.horizon.shared.ipc.sink.api.SinkModule; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.opennms.miniongateway.grpc.server.kafka.SinkMessageKafkaPublisher; +import org.opennms.miniongateway.grpc.server.kafka.SinkMessageKafkaPublisherFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; -import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Component; /** @@ -49,29 +44,16 @@ */ @Component public class FlowKafkaForwarder implements MessageConsumer { - public static final String DEFAULT_TASK_RESULTS_TOPIC = "flows"; - - private final Logger logger = LoggerFactory.getLogger(FlowKafkaForwarder.class); - - private final TenantIDGrpcServerInterceptor tenantIDGrpcInterceptor; - - private final LocationServerInterceptor locationServerInterceptor; - - private final KafkaTemplate kafkaTemplate; - - private final TenantLocationSpecificFlowDocumentLogMapper tenantLocationSpecificFlowDocumentLogMapper; - - @Value("${task.results.kafka-topic:" + DEFAULT_TASK_RESULTS_TOPIC + "}") - private String kafkaTopic; - - public FlowKafkaForwarder(@Autowired TenantIDGrpcServerInterceptor tenantIDGrpcInterceptor, - @Autowired LocationServerInterceptor locationServerInterceptor, - @Autowired KafkaTemplate kafkaTemplate, - @Autowired TenantLocationSpecificFlowDocumentLogMapper tenantLocationSpecificFlowDocumentLogMapper) { - this.tenantIDGrpcInterceptor = tenantIDGrpcInterceptor; - this.locationServerInterceptor = locationServerInterceptor; - this.kafkaTemplate = kafkaTemplate; - this.tenantLocationSpecificFlowDocumentLogMapper = tenantLocationSpecificFlowDocumentLogMapper; + public static final String DEFAULT_FLOW_RESULTS_TOPIC = "flows"; + private final SinkMessageKafkaPublisher kafkaPublisher; + + @Autowired + public FlowKafkaForwarder(SinkMessageKafkaPublisherFactory messagePublisherFactory, TenantLocationSpecificFlowDocumentLogMapper mapper, + @Value("${flow.results.kafka-topic:" + DEFAULT_FLOW_RESULTS_TOPIC + "}") String kafkaTopic) { + this.kafkaPublisher = messagePublisherFactory.create( + mapper::mapBareToTenanted, + kafkaTopic + ); } @Override @@ -81,27 +63,7 @@ public SinkModule getModule() { @Override public void handleMessage(FlowDocumentLog messageLog) { - // Retrieve the Tenant ID from the TenantID GRPC Interceptor - String tenantId = tenantIDGrpcInterceptor.readCurrentContextTenantId(); - // Ditto for location - String locationId = locationServerInterceptor.readCurrentContextLocationId(); - logger.trace("Received flow; sending to Kafka: tenantId: {}; locationId={}; kafka-topic={}; message={}", tenantId, locationId, kafkaTopic, messageLog); - - - var tenantLocationSpecificFlowDocumentLog = - tenantLocationSpecificFlowDocumentLogMapper.mapBareToTenanted(tenantId, locationId, messageLog); - - sendToKafka(tenantLocationSpecificFlowDocumentLog); + this.kafkaPublisher.send(messageLog); } -//======================================== -// INTERNALS -//---------------------------------------- - - private void sendToKafka(TenantLocationSpecificFlowDocumentLog tenantLocationSpecificFlowDocumentLog) { - byte[] rawContent = tenantLocationSpecificFlowDocumentLog.toByteArray(); - var producerRecord = new ProducerRecord(kafkaTopic, rawContent); - - this.kafkaTemplate.send(producerRecord); - } } diff --git a/minion-gateway/main/src/main/java/org/opennms/miniongateway/grpc/server/heartbeat/HeartbeatKafkaForwarder.java b/minion-gateway/main/src/main/java/org/opennms/miniongateway/grpc/server/heartbeat/HeartbeatKafkaForwarder.java index 3fdc147f0a..20aaa5073b 100644 --- a/minion-gateway/main/src/main/java/org/opennms/miniongateway/grpc/server/heartbeat/HeartbeatKafkaForwarder.java +++ b/minion-gateway/main/src/main/java/org/opennms/miniongateway/grpc/server/heartbeat/HeartbeatKafkaForwarder.java @@ -28,46 +28,33 @@ package org.opennms.miniongateway.grpc.server.heartbeat; -import org.apache.kafka.clients.producer.ProducerRecord; import org.opennms.horizon.grpc.heartbeat.contract.HeartbeatMessage; import org.opennms.horizon.grpc.heartbeat.contract.TenantLocationSpecificHeartbeatMessage; import org.opennms.horizon.grpc.heartbeat.contract.mapper.TenantLocationSpecificHeartbeatMessageMapper; -import org.opennms.horizon.shared.grpc.common.LocationServerInterceptor; -import org.opennms.horizon.shared.grpc.common.TenantIDGrpcServerInterceptor; import org.opennms.horizon.shared.ipc.sink.api.MessageConsumer; import org.opennms.horizon.shared.ipc.sink.api.SinkModule; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.opennms.miniongateway.grpc.server.kafka.SinkMessageKafkaPublisher; +import org.opennms.miniongateway.grpc.server.kafka.SinkMessageKafkaPublisherFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; -import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Component; -import io.opentelemetry.api.trace.Span; - /** * Forwarder of Heartbeat messages - received via GRPC and forwarded to Kafka. */ @Component public class HeartbeatKafkaForwarder implements MessageConsumer { - public static final String DEFAULT_TASK_RESULTS_TOPIC = "heartbeat"; - - private final Logger logger = LoggerFactory.getLogger(HeartbeatKafkaForwarder.class); - - @Autowired - private KafkaTemplate kafkaTemplate; - - @Value("${task.results.kafka-topic:" + DEFAULT_TASK_RESULTS_TOPIC + "}") - private String kafkaTopic; - - @Autowired - private TenantIDGrpcServerInterceptor tenantIDGrpcInterceptor; + public static final String DEFAULT_HEARTBEAT_RESULTS_TOPIC = "heartbeat"; + private final SinkMessageKafkaPublisher kafkaPublisher; @Autowired - private LocationServerInterceptor locationServerInterceptor; - - @Autowired - private TenantLocationSpecificHeartbeatMessageMapper tenantLocationSpecificHeartbeatMessageMapper; + public HeartbeatKafkaForwarder(SinkMessageKafkaPublisherFactory messagePublisherFactory, TenantLocationSpecificHeartbeatMessageMapper mapper, + @Value("${heartbeat.results.kafka-topic:" + DEFAULT_HEARTBEAT_RESULTS_TOPIC + "}") String kafkaTopic) { + this.kafkaPublisher = messagePublisherFactory.create( + mapper::mapBareToTenanted, + kafkaTopic + ); + } @Override public SinkModule getModule() { @@ -75,21 +62,7 @@ public SinkModule getModule() { } @Override - public void handleMessage(HeartbeatMessage heartbeatMessage) { - // Retrieve the Tenant ID from the TenantID GRPC Interceptor - String tenantId = tenantIDGrpcInterceptor.readCurrentContextTenantId(); - // And the locationId from its Interceptor - String locationId = locationServerInterceptor.readCurrentContextLocationId(); - - logger.info("Received heartbeat; sending to Kafka: tenantId={}; locationId={}; kafka-topic={}; message={}", tenantId, locationId, kafkaTopic, heartbeatMessage); - Span.current().setAttribute("message", heartbeatMessage.toString()); - - TenantLocationSpecificHeartbeatMessage mapped = tenantLocationSpecificHeartbeatMessageMapper.mapBareToTenanted(tenantId, locationId, heartbeatMessage); - - byte[] rawContent = mapped.toByteArray(); - var producerRecord = new ProducerRecord(kafkaTopic, rawContent); - - this.kafkaTemplate.send(producerRecord); + public void handleMessage(HeartbeatMessage message) { + this.kafkaPublisher.send(message); } - } diff --git a/minion-gateway/main/src/main/java/org/opennms/miniongateway/grpc/server/kafka/SinkMessageKafkaPublisher.java b/minion-gateway/main/src/main/java/org/opennms/miniongateway/grpc/server/kafka/SinkMessageKafkaPublisher.java new file mode 100644 index 0000000000..3711ce365d --- /dev/null +++ b/minion-gateway/main/src/main/java/org/opennms/miniongateway/grpc/server/kafka/SinkMessageKafkaPublisher.java @@ -0,0 +1,80 @@ +/******************************************************************************* + * This file is part of OpenNMS(R). + * + * Copyright (C) 2022 The OpenNMS Group, Inc. + * OpenNMS(R) is Copyright (C) 1999-2022 The OpenNMS Group, Inc. + * + * OpenNMS(R) is a registered trademark of The OpenNMS Group, Inc. + * + * OpenNMS(R) is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published + * by the Free Software Foundation, either version 3 of the License, + * or (at your option) any later version. + * + * OpenNMS(R) is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with OpenNMS(R). If not, see: + * http://www.gnu.org/licenses/ + * + * For more information contact: + * OpenNMS(R) Licensing + * http://www.opennms.org/ + * http://www.opennms.com/ + *******************************************************************************/ + +package org.opennms.miniongateway.grpc.server.kafka; + +import com.google.protobuf.Message; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.opennms.horizon.shared.grpc.common.LocationServerInterceptor; +import org.opennms.horizon.shared.grpc.common.TenantIDGrpcServerInterceptor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.kafka.core.KafkaTemplate; + +/** + * A helper class which produces kafka messages. + * + * It additionally retrieves tenant information from present context. + * @param Input message (grpc side) kind + * @param Output message (kafka side) type + */ +public class SinkMessageKafkaPublisher { + + private final Logger logger = LoggerFactory.getLogger(SinkMessageKafkaPublisher.class); + private final KafkaTemplate kafkaTemplate; + private final TenantIDGrpcServerInterceptor tenantInterceptor; + private final LocationServerInterceptor locationInterceptor; + private final SinkMessageMapper mapper; + private final String topic; + + public SinkMessageKafkaPublisher(KafkaTemplate kafkaTemplate, + TenantIDGrpcServerInterceptor tenantInterceptor, LocationServerInterceptor locationInterceptor, + SinkMessageMapper mapper, String topic) { + this.kafkaTemplate = kafkaTemplate; + this.tenantInterceptor = tenantInterceptor; + this.locationInterceptor = locationInterceptor; + this.mapper = mapper; + this.topic = topic; + } + + /** + * Map passed In message to a backend message which is then used as a payload for record sent to Kafka. + * + * @param message content to include as the message payload. + */ + public void send(I message) { + String tenantId = tenantInterceptor.readCurrentContextTenantId(); + String locationId = locationInterceptor.readCurrentContextLocationId(); + + O mapped = mapper.map(tenantId, locationId, message); + logger.trace("Received {}; sending a {} to kafka topic {}; tenantId: {}; locationId={}; message={}", + message.getDescriptorForType().getName(), mapped.getDescriptorForType().getName(), topic, tenantId, locationId, mapped); + + kafkaTemplate.send(new ProducerRecord<>(topic, mapped.toByteArray())); + } +} diff --git a/minion-gateway/main/src/main/java/org/opennms/miniongateway/grpc/server/kafka/SinkMessageKafkaPublisherFactory.java b/minion-gateway/main/src/main/java/org/opennms/miniongateway/grpc/server/kafka/SinkMessageKafkaPublisherFactory.java new file mode 100644 index 0000000000..10c481b6e9 --- /dev/null +++ b/minion-gateway/main/src/main/java/org/opennms/miniongateway/grpc/server/kafka/SinkMessageKafkaPublisherFactory.java @@ -0,0 +1,50 @@ +/******************************************************************************* + * This file is part of OpenNMS(R). + * + * Copyright (C) 2023 The OpenNMS Group, Inc. + * OpenNMS(R) is Copyright (C) 1999-2023 The OpenNMS Group, Inc. + * + * OpenNMS(R) is a registered trademark of The OpenNMS Group, Inc. + * + * OpenNMS(R) is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published + * by the Free Software Foundation, either version 3 of the License, + * or (at your option) any later version. + * + * OpenNMS(R) is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with OpenNMS(R). If not, see: + * http://www.gnu.org/licenses/ + * + * For more information contact: + * OpenNMS(R) Licensing + * http://www.opennms.org/ + * http://www.opennms.com/ + *******************************************************************************/ + +package org.opennms.miniongateway.grpc.server.kafka; + +import com.google.protobuf.Message; +import lombok.RequiredArgsConstructor; +import org.opennms.horizon.shared.grpc.common.LocationServerInterceptor; +import org.opennms.horizon.shared.grpc.common.TenantIDGrpcServerInterceptor; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.stereotype.Component; + +@Component +@RequiredArgsConstructor +public class SinkMessageKafkaPublisherFactory { + + private final TenantIDGrpcServerInterceptor tenantInterceptor; + private final LocationServerInterceptor locationInterceptor; + private final KafkaTemplate kafkaTemplate; + + public SinkMessageKafkaPublisher create(SinkMessageMapper mapper, String topic) { + return new SinkMessageKafkaPublisher<>(kafkaTemplate, tenantInterceptor, locationInterceptor, mapper, topic); + } + +} diff --git a/minion-gateway/main/src/main/java/org/opennms/miniongateway/grpc/server/kafka/SinkMessageMapper.java b/minion-gateway/main/src/main/java/org/opennms/miniongateway/grpc/server/kafka/SinkMessageMapper.java new file mode 100644 index 0000000000..e40d670e84 --- /dev/null +++ b/minion-gateway/main/src/main/java/org/opennms/miniongateway/grpc/server/kafka/SinkMessageMapper.java @@ -0,0 +1,51 @@ +/******************************************************************************* + * This file is part of OpenNMS(R). + * + * Copyright (C) 2023 The OpenNMS Group, Inc. + * OpenNMS(R) is Copyright (C) 1999-2023 The OpenNMS Group, Inc. + * + * OpenNMS(R) is a registered trademark of The OpenNMS Group, Inc. + * + * OpenNMS(R) is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published + * by the Free Software Foundation, either version 3 of the License, + * or (at your option) any later version. + * + * OpenNMS(R) is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with OpenNMS(R). If not, see: + * http://www.gnu.org/licenses/ + * + * For more information contact: + * OpenNMS(R) Licensing + * http://www.opennms.org/ + * http://www.opennms.com/ + *******************************************************************************/ + +package org.opennms.miniongateway.grpc.server.kafka; + +import com.google.protobuf.Message; + +/** + * Mapper for messages sent by minion via sink channels. + * + * @param Minion message. + * @param Backend message. + */ +public interface SinkMessageMapper { + + /** + * Maps given message into backend message. + * + * @param tenantId Tenant for which message was generated. + * @param locationId Location for which message was generated. + * @param message Message sent by minion. + * @return Mapped message which should embed both tenantId and locationId. + */ + O map(String tenantId, String locationId, I message); + +} diff --git a/minion-gateway/main/src/main/java/org/opennms/miniongateway/grpc/server/tasktresults/TaskResultsKafkaForwarder.java b/minion-gateway/main/src/main/java/org/opennms/miniongateway/grpc/server/tasktresults/TaskResultsKafkaForwarder.java index 4b42bb728d..a2352d1136 100644 --- a/minion-gateway/main/src/main/java/org/opennms/miniongateway/grpc/server/tasktresults/TaskResultsKafkaForwarder.java +++ b/minion-gateway/main/src/main/java/org/opennms/miniongateway/grpc/server/tasktresults/TaskResultsKafkaForwarder.java @@ -28,18 +28,15 @@ package org.opennms.miniongateway.grpc.server.tasktresults; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.opennms.horizon.shared.grpc.common.LocationServerInterceptor; -import org.opennms.horizon.shared.grpc.common.TenantIDGrpcServerInterceptor; import org.opennms.horizon.shared.ipc.sink.api.MessageConsumer; import org.opennms.horizon.shared.ipc.sink.api.SinkModule; import org.opennms.horizon.shared.protobuf.mapper.TenantLocationSpecificTaskSetResultsMapper; +import org.opennms.miniongateway.grpc.server.kafka.SinkMessageKafkaPublisher; +import org.opennms.miniongateway.grpc.server.kafka.SinkMessageKafkaPublisherFactory; import org.opennms.taskset.contract.TaskSetResults; import org.opennms.taskset.contract.TenantLocationSpecificTaskSetResults; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; -import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Component; /** @@ -49,29 +46,15 @@ public class TaskResultsKafkaForwarder implements MessageConsumer { public static final String DEFAULT_TASK_RESULTS_TOPIC = "task-set.results"; + private final SinkMessageKafkaPublisher kafkaPublisher; - private final Logger logger = LoggerFactory.getLogger(TaskResultsKafkaForwarder.class); - - private final KafkaTemplate kafkaTemplate; - private final TenantIDGrpcServerInterceptor tenantIDGrpcInterceptor; - private final TenantLocationSpecificTaskSetResultsMapper tenantLocationSpecificTaskSetResultsMapper; - private final LocationServerInterceptor locationServerInterceptor; - - private final String kafkaTopic; - - public TaskResultsKafkaForwarder( - KafkaTemplate kafkaTemplate, - TenantIDGrpcServerInterceptor tenantIDGrpcInterceptor, - LocationServerInterceptor locationServerInterceptor, - TenantLocationSpecificTaskSetResultsMapper tenantLocationSpecificTaskSetResultsMapper, - @Value("${task.results.kafka-topic:" + DEFAULT_TASK_RESULTS_TOPIC + "}") - String kafkaTopic) { - - this.kafkaTemplate = kafkaTemplate; - this.tenantIDGrpcInterceptor = tenantIDGrpcInterceptor; - this.locationServerInterceptor = locationServerInterceptor; - this.tenantLocationSpecificTaskSetResultsMapper = tenantLocationSpecificTaskSetResultsMapper; - this.kafkaTopic = kafkaTopic; + @Autowired + public TaskResultsKafkaForwarder(SinkMessageKafkaPublisherFactory messagePublisherFactory, TenantLocationSpecificTaskSetResultsMapper mapper, + @Value("${task.results.kafka-topic:" + DEFAULT_TASK_RESULTS_TOPIC + "}") String kafkaTopic) { + this.kafkaPublisher = messagePublisherFactory.create( + mapper::mapBareToTenanted, + kafkaTopic + ); } @Override @@ -80,21 +63,7 @@ public SinkModule getModule() { } @Override - public void handleMessage(TaskSetResults messageLog) { - // Retrieve the Tenant ID from the TenantID GRPC Interceptor - String tenantId = tenantIDGrpcInterceptor.readCurrentContextTenantId(); - String locationId = locationServerInterceptor.readCurrentContextLocationId(); - - logger.debug("Received results; sending to Kafka: tenant-id={}; location-id={}; kafka-topic={}; message={}", tenantId, locationId, kafkaTopic, messageLog); - - // Map to tenanted - TenantLocationSpecificTaskSetResults tenantLocationSpecificTaskSetResults = tenantLocationSpecificTaskSetResultsMapper.mapBareToTenanted(tenantId, locationId, messageLog); - - // Convert to bytes - byte[] rawContent = tenantLocationSpecificTaskSetResults.toByteArray(); - var producerRecord = new ProducerRecord(kafkaTopic, rawContent); - - // Send to Kafka - this.kafkaTemplate.send(producerRecord); + public void handleMessage(TaskSetResults message) { + this.kafkaPublisher.send(message); } } diff --git a/minion-gateway/main/src/main/java/org/opennms/miniongateway/grpc/server/traps/TrapsKafkaForwarder.java b/minion-gateway/main/src/main/java/org/opennms/miniongateway/grpc/server/traps/TrapsKafkaForwarder.java index db210e3cce..62bc5385b8 100644 --- a/minion-gateway/main/src/main/java/org/opennms/miniongateway/grpc/server/traps/TrapsKafkaForwarder.java +++ b/minion-gateway/main/src/main/java/org/opennms/miniongateway/grpc/server/traps/TrapsKafkaForwarder.java @@ -28,19 +28,15 @@ package org.opennms.miniongateway.grpc.server.traps; -import org.apache.kafka.clients.producer.ProducerRecord; import org.opennms.horizon.grpc.traps.contract.TenantLocationSpecificTrapLogDTO; import org.opennms.horizon.grpc.traps.contract.TrapLogDTO; -import org.opennms.horizon.shared.grpc.common.LocationServerInterceptor; -import org.opennms.horizon.shared.grpc.common.TenantIDGrpcServerInterceptor; import org.opennms.horizon.shared.grpc.traps.contract.mapper.TenantLocationSpecificTrapLogDTOMapper; import org.opennms.horizon.shared.ipc.sink.api.MessageConsumer; import org.opennms.horizon.shared.ipc.sink.api.SinkModule; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.opennms.miniongateway.grpc.server.kafka.SinkMessageKafkaPublisher; +import org.opennms.miniongateway.grpc.server.kafka.SinkMessageKafkaPublisherFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; -import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Component; /** @@ -49,24 +45,18 @@ @Component public class TrapsKafkaForwarder implements MessageConsumer { - public static final String DEFAULT_TASK_RESULTS_TOPIC = "traps"; + public static final String DEFAULT_TRAP_RESULTS_TOPIC = "traps"; - private final Logger logger = LoggerFactory.getLogger(TrapsKafkaForwarder.class); + private final SinkMessageKafkaPublisher kafkaPublisher; @Autowired - private KafkaTemplate kafkaTemplate; - - @Autowired - private TenantIDGrpcServerInterceptor tenantIDGrpcInterceptor; - - @Autowired - private LocationServerInterceptor locationServerInterceptor; - - @Autowired - private TenantLocationSpecificTrapLogDTOMapper tenantLocationSpecificTrapLogDTOMapper; - - @Value("${task.results.kafka-topic:" + DEFAULT_TASK_RESULTS_TOPIC + "}") - private String kafkaTopic; + public TrapsKafkaForwarder(SinkMessageKafkaPublisherFactory messagePublisherFactory, TenantLocationSpecificTrapLogDTOMapper mapper, + @Value("${traps.results.kafka-topic:" + DEFAULT_TRAP_RESULTS_TOPIC + "}") String kafkaTopic) { + this.kafkaPublisher = messagePublisherFactory.create( + mapper::mapBareToTenanted, + kafkaTopic + ); + } @Override public SinkModule getModule() { @@ -74,29 +64,7 @@ public SinkModule getModule() { } @Override - public void handleMessage(TrapLogDTO messageLog) { - // Retrieve the Tenant ID from the TenantID GRPC Interceptor - String tenantId = tenantIDGrpcInterceptor.readCurrentContextTenantId(); - // Ditto for locationId - String locationId = locationServerInterceptor.readCurrentContextLocationId(); - - // String locationId - logger.info("Received traps; sending to Kafka: tenant-id={}; location-id={}, kafka-topic={}; message={}", tenantId, locationId, kafkaTopic, messageLog); - - TenantLocationSpecificTrapLogDTO tenantLocationSpecificTrapLogDTO = - tenantLocationSpecificTrapLogDTOMapper.mapBareToTenanted(tenantId, locationId, messageLog); - - sendToKafka(tenantLocationSpecificTrapLogDTO); - } - -//======================================== -// Internals -//---------------------------------------- - - private void sendToKafka(TenantLocationSpecificTrapLogDTO tenantLocationSpecificTrapLogDTO) { - byte[] rawContent = tenantLocationSpecificTrapLogDTO.toByteArray(); - var producerRecord = new ProducerRecord(kafkaTopic, rawContent); - - this.kafkaTemplate.send(producerRecord); + public void handleMessage(TrapLogDTO message) { + this.kafkaPublisher.send(message); } } diff --git a/minion-gateway/main/src/test/java/org/opennms/miniongateway/grpc/server/FlowKafkaForwarderTest.java b/minion-gateway/main/src/test/java/org/opennms/miniongateway/grpc/server/FlowKafkaForwarderTest.java deleted file mode 100644 index 31e070a9d4..0000000000 --- a/minion-gateway/main/src/test/java/org/opennms/miniongateway/grpc/server/FlowKafkaForwarderTest.java +++ /dev/null @@ -1,115 +0,0 @@ -/******************************************************************************* - * This file is part of OpenNMS(R). - * - * Copyright (C) 2023 The OpenNMS Group, Inc. - * OpenNMS(R) is Copyright (C) 1999-2023 The OpenNMS Group, Inc. - * - * OpenNMS(R) is a registered trademark of The OpenNMS Group, Inc. - * - * OpenNMS(R) is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as published - * by the Free Software Foundation, either version 3 of the License, - * or (at your option) any later version. - * - * OpenNMS(R) is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with OpenNMS(R). If not, see: - * http://www.gnu.org/licenses/ - * - * For more information contact: - * OpenNMS(R) Licensing - * http://www.opennms.org/ - * http://www.opennms.com/ - *******************************************************************************/ - -package org.opennms.miniongateway.grpc.server; - -import org.apache.kafka.clients.producer.ProducerRecord; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.ArgumentMatcher; -import org.mockito.InjectMocks; -import org.mockito.Mockito; -import org.mockito.junit.MockitoJUnitRunner; -import org.opennms.horizon.flows.document.FlowDocument; -import org.opennms.horizon.flows.document.FlowDocumentLog; -import org.opennms.horizon.flows.document.TenantLocationSpecificFlowDocumentLog; -import org.opennms.horizon.shared.flows.mapper.TenantLocationSpecificFlowDocumentLogMapper; -import org.opennms.horizon.shared.grpc.common.LocationServerInterceptor; -import org.opennms.horizon.shared.grpc.common.TenantIDGrpcServerInterceptor; -import org.opennms.miniongateway.grpc.server.flows.FlowApplicationConfig; -import org.opennms.miniongateway.grpc.server.flows.FlowKafkaForwarder; -import org.springframework.kafka.core.KafkaTemplate; -import org.springframework.test.util.ReflectionTestUtils; - -import java.util.Arrays; - -import static org.mockito.ArgumentMatchers.argThat; -import static org.mockito.Mockito.mock; - -@RunWith(MockitoJUnitRunner.class) -public class FlowKafkaForwarderTest { - private final TenantIDGrpcServerInterceptor tenantIDGrpcInterceptor = mock(TenantIDGrpcServerInterceptor.class); - - private final LocationServerInterceptor locationServerInterceptor = mock(LocationServerInterceptor.class); - - private final KafkaTemplate kafkaTemplate = mock(KafkaTemplate.class); - - private final FlowApplicationConfig flowApplicationConfig = new FlowApplicationConfig(); - private final TenantLocationSpecificFlowDocumentLogMapper tenantLocationSpecificFlowDocumentLogMapper = - flowApplicationConfig.tenantLocationSpecificFlowDocumentLogMapper(); - - private final String kafkaTopic = "kafkaTopic"; - - @InjectMocks - private FlowKafkaForwarder flowKafkaForwarder = new FlowKafkaForwarder(tenantIDGrpcInterceptor, - locationServerInterceptor, kafkaTemplate, tenantLocationSpecificFlowDocumentLogMapper); - - @Before - public void setUp() { - ReflectionTestUtils.setField(flowKafkaForwarder, "kafkaTopic", kafkaTopic); - ReflectionTestUtils.setField(flowKafkaForwarder, "tenantLocationSpecificFlowDocumentLogMapper", tenantLocationSpecificFlowDocumentLogMapper); - } - - @Test - public void testForward() { - Mockito.when(tenantIDGrpcInterceptor.readCurrentContextTenantId()).thenReturn("tenantId"); - Mockito.when(locationServerInterceptor.readCurrentContextLocationId()).thenReturn("location"); - var flowsLog = FlowDocumentLog.newBuilder() - .setSystemId("systemId") - .addMessage(FlowDocument.newBuilder() - .setSrcAddress("127.0.0.1")) - .build(); - - var expectedFlowDocumentLog = TenantLocationSpecificFlowDocumentLog.newBuilder() - .setSystemId("systemId") - .setLocationId("location") - .setTenantId("tenantId") - .addMessage(FlowDocument.newBuilder() - .setSrcAddress("127.0.0.1")) - .build(); - var expectedProducerRecord = new ProducerRecord(kafkaTopic, expectedFlowDocumentLog.toByteArray()); - - flowKafkaForwarder.handleMessage(flowsLog); - - class ProducerRecordMatcher implements ArgumentMatcher> { - private final ProducerRecord left; - - public ProducerRecordMatcher(ProducerRecord left) { - this.left = left; - } - - @Override - public boolean matches(ProducerRecord right) { - return left.topic().equals(right.topic()) && Arrays.equals(left.value(), right.value()); - } - } - - Mockito.verify(kafkaTemplate, Mockito.times(1)).send(argThat(new ProducerRecordMatcher(expectedProducerRecord))); - } -} diff --git a/minion-gateway/main/src/test/java/org/opennms/miniongateway/grpc/server/flows/FlowKafkaForwarderTest.java b/minion-gateway/main/src/test/java/org/opennms/miniongateway/grpc/server/flows/FlowKafkaForwarderTest.java new file mode 100644 index 0000000000..c41a6c9a2d --- /dev/null +++ b/minion-gateway/main/src/test/java/org/opennms/miniongateway/grpc/server/flows/FlowKafkaForwarderTest.java @@ -0,0 +1,90 @@ +/******************************************************************************* + * This file is part of OpenNMS(R). + * + * Copyright (C) 2023 The OpenNMS Group, Inc. + * OpenNMS(R) is Copyright (C) 1999-2023 The OpenNMS Group, Inc. + * + * OpenNMS(R) is a registered trademark of The OpenNMS Group, Inc. + * + * OpenNMS(R) is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published + * by the Free Software Foundation, either version 3 of the License, + * or (at your option) any later version. + * + * OpenNMS(R) is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with OpenNMS(R). If not, see: + * http://www.gnu.org/licenses/ + * + * For more information contact: + * OpenNMS(R) Licensing + * http://www.opennms.org/ + * http://www.opennms.com/ + *******************************************************************************/ + +package org.opennms.miniongateway.grpc.server.flows; + +import com.google.protobuf.Message; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; +import org.opennms.horizon.flows.document.FlowDocument; +import org.opennms.horizon.flows.document.FlowDocumentLog; +import org.opennms.horizon.flows.document.TenantLocationSpecificFlowDocumentLog; +import org.opennms.horizon.shared.flows.mapper.TenantLocationSpecificFlowDocumentLogMapper; +import org.opennms.horizon.shared.grpc.common.LocationServerInterceptor; +import org.opennms.horizon.shared.grpc.common.TenantIDGrpcServerInterceptor; +import org.opennms.miniongateway.grpc.server.flows.FlowApplicationConfig; +import org.opennms.miniongateway.grpc.server.flows.FlowKafkaForwarder; +import org.opennms.miniongateway.grpc.server.kafka.SinkMessageKafkaPublisher; +import org.opennms.miniongateway.grpc.server.kafka.SinkMessageKafkaPublisherFactory; +import org.opennms.miniongateway.grpc.server.kafka.SinkMessageMapper; +import org.springframework.kafka.core.KafkaTemplate; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class FlowKafkaForwarderTest { + + + private final String kafkaTopic = "kafkaTopic"; + + @Mock + private SinkMessageKafkaPublisherFactory publisherFactory; + @Mock + private TenantLocationSpecificFlowDocumentLogMapper mapper; + @Mock + private SinkMessageKafkaPublisher publisher; + + private FlowKafkaForwarder flowKafkaForwarder; + + @Before + public void setUp() { + when(publisherFactory.create(any(SinkMessageMapper.class), eq(kafkaTopic))).thenReturn(publisher); + flowKafkaForwarder = new FlowKafkaForwarder(publisherFactory, mapper, kafkaTopic); + } + + @Test + public void testForward() { + var message = FlowDocumentLog.newBuilder() + .setSystemId("systemId") + .addMessage(FlowDocument.newBuilder() + .setSrcAddress("127.0.0.1")) + .build(); + + + flowKafkaForwarder.handleMessage(message); + Mockito.verify(publisher).send(message); + } +} diff --git a/minion-gateway/main/src/test/java/org/opennms/miniongateway/grpc/server/heartbeat/HeartbeatKafkaForwarderTest.java b/minion-gateway/main/src/test/java/org/opennms/miniongateway/grpc/server/heartbeat/HeartbeatKafkaForwarderTest.java new file mode 100644 index 0000000000..a0ce8a3024 --- /dev/null +++ b/minion-gateway/main/src/test/java/org/opennms/miniongateway/grpc/server/heartbeat/HeartbeatKafkaForwarderTest.java @@ -0,0 +1,79 @@ +/******************************************************************************* + * This file is part of OpenNMS(R). + * + * Copyright (C) 2023 The OpenNMS Group, Inc. + * OpenNMS(R) is Copyright (C) 1999-2023 The OpenNMS Group, Inc. + * + * OpenNMS(R) is a registered trademark of The OpenNMS Group, Inc. + * + * OpenNMS(R) is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published + * by the Free Software Foundation, either version 3 of the License, + * or (at your option) any later version. + * + * OpenNMS(R) is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with OpenNMS(R). If not, see: + * http://www.gnu.org/licenses/ + * + * For more information contact: + * OpenNMS(R) Licensing + * http://www.opennms.org/ + * http://www.opennms.com/ + *******************************************************************************/ + +package org.opennms.miniongateway.grpc.server.heartbeat; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.when; + +import com.google.protobuf.Message; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; +import org.opennms.cloud.grpc.minion.Identity; +import org.opennms.horizon.grpc.heartbeat.contract.HeartbeatMessage; +import org.opennms.horizon.grpc.heartbeat.contract.mapper.TenantLocationSpecificHeartbeatMessageMapper; +import org.opennms.miniongateway.grpc.server.kafka.SinkMessageKafkaPublisher; +import org.opennms.miniongateway.grpc.server.kafka.SinkMessageKafkaPublisherFactory; +import org.opennms.miniongateway.grpc.server.kafka.SinkMessageMapper; + +@RunWith(MockitoJUnitRunner.class) +public class HeartbeatKafkaForwarderTest { + + private final String kafkaTopic = "kafkaTopic"; + + @Mock + private SinkMessageKafkaPublisherFactory publisherFactory; + @Mock + private TenantLocationSpecificHeartbeatMessageMapper mapper; + @Mock + private SinkMessageKafkaPublisher publisher; + + private HeartbeatKafkaForwarder heartbeatForwarder; + + @Before + public void setUp() { + when(publisherFactory.create(any(SinkMessageMapper.class), eq(kafkaTopic))).thenReturn(publisher); + heartbeatForwarder = new HeartbeatKafkaForwarder(publisherFactory, mapper, kafkaTopic); + } + + @Test + public void testForward() { + var message = HeartbeatMessage.newBuilder() + .setIdentity(Identity.newBuilder().setSystemId("foo").build()) + .build(); + + + heartbeatForwarder.handleMessage(message); + Mockito.verify(publisher).send(message); + } +} diff --git a/minion-gateway/main/src/test/java/org/opennms/miniongateway/grpc/server/kafka/SinkMessageKafkaPublisherTest.java b/minion-gateway/main/src/test/java/org/opennms/miniongateway/grpc/server/kafka/SinkMessageKafkaPublisherTest.java new file mode 100644 index 0000000000..2d478b3480 --- /dev/null +++ b/minion-gateway/main/src/test/java/org/opennms/miniongateway/grpc/server/kafka/SinkMessageKafkaPublisherTest.java @@ -0,0 +1,106 @@ +/******************************************************************************* + * This file is part of OpenNMS(R). + * + * Copyright (C) 2023 The OpenNMS Group, Inc. + * OpenNMS(R) is Copyright (C) 1999-2023 The OpenNMS Group, Inc. + * + * OpenNMS(R) is a registered trademark of The OpenNMS Group, Inc. + * + * OpenNMS(R) is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published + * by the Free Software Foundation, either version 3 of the License, + * or (at your option) any later version. + * + * OpenNMS(R) is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with OpenNMS(R). If not, see: + * http://www.gnu.org/licenses/ + * + * For more information contact: + * OpenNMS(R) Licensing + * http://www.opennms.org/ + * http://www.opennms.com/ + *******************************************************************************/ + +package org.opennms.miniongateway.grpc.server.kafka; + +import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.google.protobuf.Message; +import java.util.Arrays; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentMatcher; +import org.mockito.Mockito; +import org.opennms.horizon.flows.document.FlowDocumentLog; +import org.opennms.horizon.flows.document.TenantLocationSpecificFlowDocumentLog; +import org.opennms.horizon.shared.grpc.common.LocationServerInterceptor; +import org.opennms.horizon.shared.grpc.common.TenantIDGrpcServerInterceptor; +import org.springframework.kafka.core.KafkaTemplate; + +public class SinkMessageKafkaPublisherTest { + + public static final String TEST_TENANT_ID = "opennms-opti-prime"; + public static final String TEST_LOCATION_ID = "location-uuid-0x01"; + public static final String TEST_TOPIC_NAME = "flowable"; + private final TenantIDGrpcServerInterceptor tenantIDGrpcInterceptor = mock(TenantIDGrpcServerInterceptor.class); + + private final LocationServerInterceptor locationServerInterceptor = mock(LocationServerInterceptor.class); + private final KafkaTemplate kafkaTemplate = mock(KafkaTemplate.class); + private final SinkMessageMapper mapper = mock(SinkMessageMapper.class); + private SinkMessageKafkaPublisher messagePublisher; + + @Before + public void setUp() { + messagePublisher = new SinkMessageKafkaPublisher<>( + kafkaTemplate, tenantIDGrpcInterceptor, locationServerInterceptor, mapper, TEST_TOPIC_NAME + ); + } + + @Test + public void testContextLookup() { + Mockito.when(tenantIDGrpcInterceptor.readCurrentContextTenantId()).thenReturn(TEST_TENANT_ID); + Mockito.when(locationServerInterceptor.readCurrentContextLocationId()).thenReturn(TEST_LOCATION_ID); + + var flowsLog = FlowDocumentLog.newBuilder() + .build(); + + // simulate enrichment of payload + var expectedFlowDocumentLog = TenantLocationSpecificFlowDocumentLog.newBuilder() + .setLocationId(TEST_LOCATION_ID) + .setTenantId(TEST_TENANT_ID) + .build(); + + when(mapper.map(TEST_TENANT_ID, TEST_LOCATION_ID, flowsLog)).thenReturn(expectedFlowDocumentLog); + + messagePublisher.send(flowsLog); + verify(mapper).map(TEST_TENANT_ID, TEST_LOCATION_ID, flowsLog); + verify(kafkaTemplate).send(argThat(new ProducerRecordMatcher(TEST_TOPIC_NAME, expectedFlowDocumentLog))); + verify(tenantIDGrpcInterceptor).readCurrentContextTenantId(); + verify(locationServerInterceptor).readCurrentContextLocationId(); + } + + static class ProducerRecordMatcher implements ArgumentMatcher> { + + private final String topic; + private final Message payload; + + public ProducerRecordMatcher(String topic, Message payload) { + this.topic = topic; + this.payload = payload; + } + + @Override + public boolean matches(ProducerRecord record) { + return topic.equals(record.topic()) && Arrays.equals(payload.toByteArray(), record.value()); + } + } +} diff --git a/minion-gateway/main/src/test/java/org/opennms/miniongateway/grpc/server/tasktresults/TaskResultsKafkaForwarderTest.java b/minion-gateway/main/src/test/java/org/opennms/miniongateway/grpc/server/tasktresults/TaskResultsKafkaForwarderTest.java index d966e811e8..b9002362a7 100644 --- a/minion-gateway/main/src/test/java/org/opennms/miniongateway/grpc/server/tasktresults/TaskResultsKafkaForwarderTest.java +++ b/minion-gateway/main/src/test/java/org/opennms/miniongateway/grpc/server/tasktresults/TaskResultsKafkaForwarderTest.java @@ -30,70 +30,52 @@ package org.opennms.miniongateway.grpc.server.tasktresults; import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.Message; import org.apache.kafka.clients.producer.ProducerRecord; import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; import org.mockito.ArgumentCaptor; +import org.mockito.Mock; import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; +import org.opennms.horizon.shared.flows.mapper.TenantLocationSpecificFlowDocumentLogMapper; import org.opennms.horizon.shared.grpc.common.LocationServerInterceptor; import org.opennms.horizon.shared.grpc.common.TenantIDGrpcServerInterceptor; import org.opennms.horizon.shared.protobuf.mapper.TenantLocationSpecificTaskSetResultsMapper; +import org.opennms.miniongateway.grpc.server.kafka.SinkMessageKafkaPublisher; +import org.opennms.miniongateway.grpc.server.kafka.SinkMessageKafkaPublisherFactory; +import org.opennms.miniongateway.grpc.server.kafka.SinkMessageMapper; import org.opennms.taskset.contract.TaskResult; import org.opennms.taskset.contract.TaskSetResults; import org.opennms.taskset.contract.TenantLocationSpecificTaskSetResults; import org.springframework.kafka.core.KafkaTemplate; import static org.junit.Assert.*; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.when; +@RunWith(MockitoJUnitRunner.class) public class TaskResultsKafkaForwarderTest { - public static final String TEST_AFTER_COPY_TENANT_ID = "x-tenant-after-copy-x"; - public static final String TEST_AFTER_COPY_LOCATION = "x-location-after-copy-x"; + private final String kafkaTopic = "kafkaTopic"; - private TaskResultsKafkaForwarder taskResultsKafkaForwarder; + @Mock + private SinkMessageKafkaPublisherFactory publisherFactory; + @Mock + private TenantLocationSpecificTaskSetResultsMapper mapper; + @Mock + private SinkMessageKafkaPublisher publisher; - private KafkaTemplate mockKafkaTemplate; - private TenantIDGrpcServerInterceptor mockTenantIDGrpcInterceptor; - private LocationServerInterceptor mockLocationServerInterceptor; - private TenantLocationSpecificTaskSetResultsMapper mockTenantLocationSpecificTaskSetResultsMapper; + private TaskResultsKafkaForwarder taskResultsKafkaForwarder; - private TaskSetResults testTaskSetResults; - private TaskResult testTaskResult; private TenantLocationSpecificTaskSetResults testTenantLocationSpecificTaskSetResults; @Before public void setUp() throws Exception { - mockKafkaTemplate = Mockito.mock(KafkaTemplate.class); - mockTenantIDGrpcInterceptor = Mockito.mock(TenantIDGrpcServerInterceptor.class); - mockLocationServerInterceptor = Mockito.mock(LocationServerInterceptor.class); - mockTenantLocationSpecificTaskSetResultsMapper = Mockito.mock(TenantLocationSpecificTaskSetResultsMapper.class); - - testTaskResult = - TaskResult.newBuilder() - .build(); - - testTaskSetResults = - TaskSetResults.newBuilder() - .addResults(testTaskResult) - .build(); - - testTenantLocationSpecificTaskSetResults = - TenantLocationSpecificTaskSetResults.newBuilder() - .setTenantId(TEST_AFTER_COPY_TENANT_ID) // Use a distinct tenant id, even though it is unrealistic, for test verification purposes - .setLocationId(TEST_AFTER_COPY_LOCATION) // Ditto for location - .build(); - - taskResultsKafkaForwarder = - new TaskResultsKafkaForwarder( - mockKafkaTemplate, - mockTenantIDGrpcInterceptor, - mockLocationServerInterceptor, - mockTenantLocationSpecificTaskSetResultsMapper, - "x-kafka-topic-x"); - - Mockito.when(mockTenantIDGrpcInterceptor.readCurrentContextTenantId()).thenReturn("x-tenant-x"); - Mockito.when(mockLocationServerInterceptor.readCurrentContextLocationId()).thenReturn("x-location-x"); - Mockito.when(mockTenantLocationSpecificTaskSetResultsMapper.mapBareToTenanted("x-tenant-x", "x-location-x", testTaskSetResults)).thenReturn(testTenantLocationSpecificTaskSetResults); + when(publisherFactory.create(any(SinkMessageMapper.class), eq(kafkaTopic))).thenReturn(publisher); + taskResultsKafkaForwarder = new TaskResultsKafkaForwarder(publisherFactory, mapper, kafkaTopic); } @Test @@ -102,27 +84,15 @@ public void testHandleMessage() throws InvalidProtocolBufferException { // Setup Test Data and Interactions // + TaskResult testTaskResult = TaskResult.newBuilder().build(); + + TaskSetResults testTaskSetResults = TaskSetResults.newBuilder() + .addResults(testTaskResult) + .build(); // // Execute // taskResultsKafkaForwarder.handleMessage(testTaskSetResults); - - // - // Verify the Results - // - @SuppressWarnings("unchecked") - ArgumentCaptor> producerRecordCaptor = ArgumentCaptor.forClass(ProducerRecord.class); - - Mockito.verify(mockKafkaTemplate).send(producerRecordCaptor.capture()); - ProducerRecord producerRecord = producerRecordCaptor.getValue(); - - assertNotNull(producerRecord); - assertEquals("x-kafka-topic-x", producerRecord.topic()); - - byte[] raw = producerRecord.value(); - var tenantLocationSpecificTaskSetResults = TenantLocationSpecificTaskSetResults.parseFrom(raw); - - assertEquals(TEST_AFTER_COPY_TENANT_ID, tenantLocationSpecificTaskSetResults.getTenantId()); - assertEquals(TEST_AFTER_COPY_LOCATION, tenantLocationSpecificTaskSetResults.getLocationId()); + Mockito.verify(publisher).send(testTaskSetResults); } } diff --git a/minion-gateway/main/src/test/java/org/opennms/miniongateway/grpc/server/traps/TrapsKafkaForwarderTest.java b/minion-gateway/main/src/test/java/org/opennms/miniongateway/grpc/server/traps/TrapsKafkaForwarderTest.java new file mode 100644 index 0000000000..e38ac214bf --- /dev/null +++ b/minion-gateway/main/src/test/java/org/opennms/miniongateway/grpc/server/traps/TrapsKafkaForwarderTest.java @@ -0,0 +1,52 @@ +package org.opennms.miniongateway.grpc.server.traps; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.when; + +import com.google.protobuf.Message; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; +import org.opennms.cloud.grpc.minion.Identity; +import org.opennms.horizon.grpc.traps.contract.TrapDTO; +import org.opennms.horizon.grpc.traps.contract.TrapLogDTO; +import org.opennms.horizon.shared.grpc.traps.contract.mapper.TenantLocationSpecificTrapLogDTOMapper; +import org.opennms.miniongateway.grpc.server.kafka.SinkMessageKafkaPublisher; +import org.opennms.miniongateway.grpc.server.kafka.SinkMessageKafkaPublisherFactory; +import org.opennms.miniongateway.grpc.server.kafka.SinkMessageMapper; + +@RunWith(MockitoJUnitRunner.class) +public class TrapsKafkaForwarderTest { + + private final String kafkaTopic = "kafkaTopic"; + + @Mock + private SinkMessageKafkaPublisherFactory publisherFactory; + @Mock + private TenantLocationSpecificTrapLogDTOMapper mapper; + @Mock + private SinkMessageKafkaPublisher publisher; + + private TrapsKafkaForwarder trapsKafkaForwarder; + + @Before + public void setUp() { + when(publisherFactory.create(any(SinkMessageMapper.class), eq(kafkaTopic))).thenReturn(publisher); + trapsKafkaForwarder = new TrapsKafkaForwarder(publisherFactory, mapper, kafkaTopic); + } + + @Test + public void testForward() { + var message = TrapLogDTO.newBuilder() + .setIdentity(Identity.newBuilder().setSystemId("asdf").build()) + .addTrapDTO(TrapDTO.newBuilder().setCommunity("public").build()) + .build(); + + trapsKafkaForwarder.handleMessage(message); + Mockito.verify(publisher).send(message); + } +}