From a66ef63770af8aa20d6cb72817ba8514ca9fbfba Mon Sep 17 00:00:00 2001 From: Ajinkya <109141486+ag060@users.noreply.github.com> Date: Wed, 1 Jan 2025 10:26:47 +0530 Subject: [PATCH] using proto message envelope for kafka transport in threat detection client (#1874) --- .../threat/backend/service/HealthService.java | 17 -- .../detection/kafka/KafkaProtoProducer.java | 48 ++++ .../tasks/AbstractKafkaConsumerTask.java | 26 +- .../detection/tasks/FlushSampleDataTask.java | 43 ++-- .../tasks/MaliciousTrafficDetectorTask.java | 61 +++-- .../tasks/SendMaliciousEventsToBackend.java | 226 +++++++++--------- protobuf/health/service/v1/service.proto | 24 -- .../message/malicious_event/v1/message.proto | 6 + .../message/sample_request/v1/message.proto | 6 + 9 files changed, 237 insertions(+), 220 deletions(-) delete mode 100644 apps/threat-detection-backend/src/main/java/com/akto/threat/backend/service/HealthService.java create mode 100644 apps/threat-detection/src/main/java/com/akto/threat/detection/kafka/KafkaProtoProducer.java delete mode 100644 protobuf/health/service/v1/service.proto diff --git a/apps/threat-detection-backend/src/main/java/com/akto/threat/backend/service/HealthService.java b/apps/threat-detection-backend/src/main/java/com/akto/threat/backend/service/HealthService.java deleted file mode 100644 index 5469ed7d16..0000000000 --- a/apps/threat-detection-backend/src/main/java/com/akto/threat/backend/service/HealthService.java +++ /dev/null @@ -1,17 +0,0 @@ -package com.akto.threat.backend.service; - -import com.akto.proto.generated.health.service.v1.CheckRequest; -import com.akto.proto.generated.health.service.v1.CheckResponse; -import com.akto.proto.generated.health.service.v1.CheckResponse.ServingStatus; -import com.akto.proto.generated.health.service.v1.HealthServiceGrpc; -import io.grpc.stub.StreamObserver; - -public class HealthService extends HealthServiceGrpc.HealthServiceImplBase { - - @Override - public void check(CheckRequest request, StreamObserver responseObserver) { - responseObserver.onNext( - CheckResponse.newBuilder().setStatus(ServingStatus.SERVING_STATUS_SERVING).build()); - responseObserver.onCompleted(); - } -} diff --git a/apps/threat-detection/src/main/java/com/akto/threat/detection/kafka/KafkaProtoProducer.java b/apps/threat-detection/src/main/java/com/akto/threat/detection/kafka/KafkaProtoProducer.java new file mode 100644 index 0000000000..cfd51ace83 --- /dev/null +++ b/apps/threat-detection/src/main/java/com/akto/threat/detection/kafka/KafkaProtoProducer.java @@ -0,0 +1,48 @@ +package com.akto.threat.detection.kafka; + +import com.akto.kafka.KafkaConfig; +import com.google.protobuf.Message; +import java.time.Duration; +import java.util.Properties; +import org.apache.kafka.clients.producer.*; +import org.apache.kafka.common.serialization.StringSerializer; + +public class KafkaProtoProducer { + private KafkaProducer producer; + public boolean producerReady; + + public KafkaProtoProducer(KafkaConfig kafkaConfig) { + this.producer = generateProducer( + kafkaConfig.getBootstrapServers(), + kafkaConfig.getProducerConfig().getLingerMs(), + kafkaConfig.getProducerConfig().getBatchSize()); + } + + public void send(String topic, Message message) { + byte[] messageBytes = message.toByteArray(); + this.producer.send(new ProducerRecord<>(topic, messageBytes)); + } + + public void close() { + this.producerReady = false; + producer.close(Duration.ofMillis(0)); // close immediately + } + + private KafkaProducer generateProducer(String brokerIP, int lingerMS, int batchSize) { + if (producer != null) + close(); // close existing producer connection + + int requestTimeoutMs = 5000; + Properties kafkaProps = new Properties(); + kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerIP); + kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, + "org.apache.kafka.common.serialization.ByteArraySerializer"); + kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + kafkaProps.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize); + kafkaProps.put(ProducerConfig.LINGER_MS_CONFIG, lingerMS); + kafkaProps.put(ProducerConfig.RETRIES_CONFIG, 0); + kafkaProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, requestTimeoutMs); + kafkaProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, lingerMS + requestTimeoutMs); + return new KafkaProducer(kafkaProps); + } +} diff --git a/apps/threat-detection/src/main/java/com/akto/threat/detection/tasks/AbstractKafkaConsumerTask.java b/apps/threat-detection/src/main/java/com/akto/threat/detection/tasks/AbstractKafkaConsumerTask.java index cfd9ccecd6..4a0a731523 100644 --- a/apps/threat-detection/src/main/java/com/akto/threat/detection/tasks/AbstractKafkaConsumerTask.java +++ b/apps/threat-detection/src/main/java/com/akto/threat/detection/tasks/AbstractKafkaConsumerTask.java @@ -10,10 +10,12 @@ import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.StringDeserializer; -public abstract class AbstractKafkaConsumerTask implements Task { +public abstract class AbstractKafkaConsumerTask implements Task { - protected Consumer kafkaConsumer; + protected Consumer kafkaConsumer; protected KafkaConfig kafkaConfig; protected String kafkaTopic; @@ -24,9 +26,16 @@ public AbstractKafkaConsumerTask(KafkaConfig kafkaConfig, String kafkaTopic) { String kafkaBrokerUrl = kafkaConfig.getBootstrapServers(); String groupId = kafkaConfig.getGroupId(); - Properties properties = - Utils.configProperties( - kafkaBrokerUrl, groupId, kafkaConfig.getConsumerConfig().getMaxPollRecords()); + Properties properties = new Properties(); + properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBrokerUrl); + properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, + "org.apache.kafka.common.serialization.ByteArrayDeserializer"); + properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, kafkaConfig.getConsumerConfig().getMaxPollRecords()); + properties.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaConfig.getGroupId()); + properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); + this.kafkaConsumer = new KafkaConsumer<>(properties); } @@ -40,9 +49,8 @@ public void run() { () -> { // Poll data from Kafka topic while (true) { - ConsumerRecords records = - kafkaConsumer.poll( - Duration.ofMillis(kafkaConfig.getConsumerConfig().getPollDurationMilli())); + ConsumerRecords records = kafkaConsumer.poll( + Duration.ofMillis(kafkaConfig.getConsumerConfig().getPollDurationMilli())); if (records.isEmpty()) { continue; } @@ -60,5 +68,5 @@ public void run() { }); } - abstract void processRecords(ConsumerRecords records); + abstract void processRecords(ConsumerRecords records); } diff --git a/apps/threat-detection/src/main/java/com/akto/threat/detection/tasks/FlushSampleDataTask.java b/apps/threat-detection/src/main/java/com/akto/threat/detection/tasks/FlushSampleDataTask.java index 6829d6ecad..4b857c389e 100644 --- a/apps/threat-detection/src/main/java/com/akto/threat/detection/tasks/FlushSampleDataTask.java +++ b/apps/threat-detection/src/main/java/com/akto/threat/detection/tasks/FlushSampleDataTask.java @@ -3,6 +3,7 @@ import com.akto.dto.type.URLMethods; import com.akto.kafka.KafkaConfig; import com.akto.proto.generated.threat_detection.message.sample_request.v1.SampleMaliciousRequest; +import com.akto.proto.generated.threat_detection.message.sample_request.v1.SampleRequestKafkaEnvelope; import com.akto.threat.detection.db.entity.MaliciousEventEntity; import com.akto.threat.detection.dto.MessageEnvelope; import com.google.protobuf.InvalidProtocolBufferException; @@ -17,7 +18,7 @@ /* This will read sample malicious data from kafka topic and save it to DB. */ -public class FlushSampleDataTask extends AbstractKafkaConsumerTask { +public class FlushSampleDataTask extends AbstractKafkaConsumerTask { private final SessionFactory sessionFactory; @@ -27,37 +28,29 @@ public FlushSampleDataTask( this.sessionFactory = sessionFactory; } - protected void processRecords(ConsumerRecords records) { + protected void processRecords(ConsumerRecords records) { List events = new ArrayList<>(); records.forEach( r -> { - String message = r.value(); - SampleMaliciousRequest.Builder builder = SampleMaliciousRequest.newBuilder(); - MessageEnvelope m = MessageEnvelope.unmarshal(message).orElse(null); - if (m == null) { - return; - } - + SampleRequestKafkaEnvelope envelope; try { - JsonFormat.parser().merge(m.getData(), builder); + envelope = SampleRequestKafkaEnvelope.parseFrom(r.value()); + SampleMaliciousRequest evt = envelope.getMaliciousRequest(); + + events.add( + MaliciousEventEntity.newBuilder() + .setActor(envelope.getActor()) + .setFilterId(evt.getFilterId()) + .setUrl(evt.getUrl()) + .setMethod(URLMethods.Method.fromString(evt.getMethod())) + .setTimestamp(evt.getTimestamp()) + .setOrig(evt.getPayload()) + .setApiCollectionId(evt.getApiCollectionId()) + .setIp(evt.getIp()) + .build()); } catch (InvalidProtocolBufferException e) { e.printStackTrace(); - return; } - - SampleMaliciousRequest evt = builder.build(); - - events.add( - MaliciousEventEntity.newBuilder() - .setActor(m.getActor()) - .setFilterId(evt.getFilterId()) - .setUrl(evt.getUrl()) - .setMethod(URLMethods.Method.fromString(evt.getMethod())) - .setTimestamp(evt.getTimestamp()) - .setOrig(evt.getPayload()) - .setApiCollectionId(evt.getApiCollectionId()) - .setIp(evt.getIp()) - .build()); }); Session session = this.sessionFactory.openSession(); diff --git a/apps/threat-detection/src/main/java/com/akto/threat/detection/tasks/MaliciousTrafficDetectorTask.java b/apps/threat-detection/src/main/java/com/akto/threat/detection/tasks/MaliciousTrafficDetectorTask.java index d92a40ba02..090630014d 100644 --- a/apps/threat-detection/src/main/java/com/akto/threat/detection/tasks/MaliciousTrafficDetectorTask.java +++ b/apps/threat-detection/src/main/java/com/akto/threat/detection/tasks/MaliciousTrafficDetectorTask.java @@ -14,11 +14,12 @@ import com.akto.dto.test_editor.YamlTemplate; import com.akto.dto.type.URLMethods; import com.akto.hybrid_parsers.HttpCallParser; -import com.akto.kafka.Kafka; import com.akto.kafka.KafkaConfig; import com.akto.proto.generated.threat_detection.message.malicious_event.event_type.v1.EventType; +import com.akto.proto.generated.threat_detection.message.malicious_event.v1.MaliciousEventKafkaEnvelope; import com.akto.proto.generated.threat_detection.message.malicious_event.v1.MaliciousEventMessage; import com.akto.proto.generated.threat_detection.message.sample_request.v1.SampleMaliciousRequest; +import com.akto.proto.generated.threat_detection.message.sample_request.v1.SampleRequestKafkaEnvelope; import com.akto.rules.TestPlugin; import com.akto.runtime.utils.Utils; import com.akto.test_editor.execution.VariableResolver; @@ -27,6 +28,7 @@ import com.akto.threat.detection.cache.RedisBackedCounterCache; import com.akto.threat.detection.constants.KafkaTopic; import com.akto.threat.detection.dto.MessageEnvelope; +import com.akto.threat.detection.kafka.KafkaProtoProducer; import com.akto.threat.detection.smart_event_detector.window_based.WindowBasedThresholdNotifier; import com.google.protobuf.InvalidProtocolBufferException; import io.lettuce.core.RedisClient; @@ -54,7 +56,7 @@ public class MaliciousTrafficDetectorTask implements Task { private int filterLastUpdatedAt = 0; private int filterUpdateIntervalSec = 300; - private final Kafka internalKafka; + private final KafkaProtoProducer internalKafka; private static final DataActor dataActor = DataActorFactory.fetchInstance(); @@ -77,11 +79,7 @@ public MaliciousTrafficDetectorTask( new RedisBackedCounterCache(redisClient, "wbt"), new WindowBasedThresholdNotifier.Config(100, 10 * 60)); - this.internalKafka = - new Kafka( - internalConfig.getBootstrapServers(), - internalConfig.getProducerConfig().getLingerMs(), - internalConfig.getProducerConfig().getBatchSize()); + this.internalKafka = new KafkaProtoProducer(internalConfig); } public void run() { @@ -123,19 +121,20 @@ private Map getFilters() { return apiFilters; } - private boolean validateFilterForRequest(FilterConfig apiFilter, RawApi rawApi, ApiInfo.ApiInfoKey apiInfoKey, String message) { + private boolean validateFilterForRequest( + FilterConfig apiFilter, RawApi rawApi, ApiInfo.ApiInfoKey apiInfoKey, String message) { try { System.out.println("using buildFromMessageNew func"); Map varMap = apiFilter.resolveVarMap(); VariableResolver.resolveWordList( - varMap, - new HashMap>() { - { - put(apiInfoKey, Collections.singletonList(message)); - } - }, - apiInfoKey); + varMap, + new HashMap>() { + { + put(apiInfoKey, Collections.singletonList(message)); + } + }, + apiInfoKey); String filterExecutionLogId = UUID.randomUUID().toString(); ValidationResult res = @@ -160,7 +159,7 @@ private void processRecord(ConsumerRecord record) { return; } - List maliciousMessages = new ArrayList<>(); + List maliciousMessages = new ArrayList<>(); System.out.println("Total number of filters: " + filters.size()); @@ -212,13 +211,12 @@ private void processRecord(ConsumerRecord record) { .setFilterId(apiFilter.getId()) .build(); - try { - maliciousMessages.add( - MessageEnvelope.generateEnvelope( - responseParam.getAccountId(), actor, maliciousReq)); - } catch (InvalidProtocolBufferException e) { - return; - } + maliciousMessages.add( + SampleRequestKafkaEnvelope.newBuilder() + .setActor(actor) + .setAccountId(responseParam.getAccountId()) + .setMaliciousRequest(maliciousReq) + .build()); if (!isAggFilter) { generateAndPushMaliciousEventRequest( @@ -250,12 +248,7 @@ private void processRecord(ConsumerRecord record) { try { maliciousMessages.forEach( sample -> { - sample - .marshal() - .ifPresent( - data -> { - internalKafka.send(data, KafkaTopic.ThreatDetection.MALICIOUS_EVENTS); - }); + internalKafka.send(KafkaTopic.ThreatDetection.MALICIOUS_EVENTS, sample); }); } catch (Exception e) { e.printStackTrace(); @@ -281,12 +274,18 @@ private void generateAndPushMaliciousEventRequest( .setDetectedAt(responseParam.getTime()) .build(); try { - System.out.println("Pushing malicious event to kafka: "); + System.out.println("Pushing malicious event to kafka: " + maliciousEvent); + MaliciousEventKafkaEnvelope envelope = + MaliciousEventKafkaEnvelope.newBuilder() + .setActor(actor) + .setAccountId(responseParam.getAccountId()) + .setMaliciousEvent(maliciousEvent) + .build(); MessageEnvelope.generateEnvelope(responseParam.getAccountId(), actor, maliciousEvent) .marshal() .ifPresent( data -> { - internalKafka.send(data, KafkaTopic.ThreatDetection.ALERTS); + internalKafka.send(KafkaTopic.ThreatDetection.ALERTS, envelope); }); } catch (InvalidProtocolBufferException e) { e.printStackTrace(); diff --git a/apps/threat-detection/src/main/java/com/akto/threat/detection/tasks/SendMaliciousEventsToBackend.java b/apps/threat-detection/src/main/java/com/akto/threat/detection/tasks/SendMaliciousEventsToBackend.java index c2a9e7cf54..2ee173f1a0 100644 --- a/apps/threat-detection/src/main/java/com/akto/threat/detection/tasks/SendMaliciousEventsToBackend.java +++ b/apps/threat-detection/src/main/java/com/akto/threat/detection/tasks/SendMaliciousEventsToBackend.java @@ -2,6 +2,7 @@ import com.akto.kafka.KafkaConfig; import com.akto.proto.generated.threat_detection.message.malicious_event.event_type.v1.EventType; +import com.akto.proto.generated.threat_detection.message.malicious_event.v1.MaliciousEventKafkaEnvelope; import com.akto.proto.generated.threat_detection.message.malicious_event.v1.MaliciousEventMessage; import com.akto.proto.generated.threat_detection.message.sample_request.v1.SampleMaliciousRequest; import com.akto.proto.generated.threat_detection.service.malicious_alert_service.v1.RecordMaliciousEventRequest; @@ -28,133 +29,130 @@ /* This will send alerts to threat detection backend */ -public class SendMaliciousEventsToBackend extends AbstractKafkaConsumerTask { +public class SendMaliciousEventsToBackend extends AbstractKafkaConsumerTask { - private final SessionFactory sessionFactory; - private final CloseableHttpClient httpClient; + private final SessionFactory sessionFactory; + private final CloseableHttpClient httpClient; - public SendMaliciousEventsToBackend( - SessionFactory sessionFactory, KafkaConfig trafficConfig, String topic) { - super(trafficConfig, topic); - this.sessionFactory = sessionFactory; - this.httpClient = HttpClients.createDefault(); - } + public SendMaliciousEventsToBackend( + SessionFactory sessionFactory, KafkaConfig trafficConfig, String topic) { + super(trafficConfig, topic); + this.sessionFactory = sessionFactory; + this.httpClient = HttpClients.createDefault(); + } - private void markSampleDataAsSent(List ids) { - Session session = this.sessionFactory.openSession(); - Transaction txn = session.beginTransaction(); - try { - session - .createQuery( - "update MaliciousEventEntity m set m.alertedToBackend = true where m.id in :ids") - .setParameterList("ids", ids) - .executeUpdate(); - } catch (Exception ex) { - ex.printStackTrace(); - txn.rollback(); - } finally { - txn.commit(); - session.close(); + private void markSampleDataAsSent(List ids) { + Session session = this.sessionFactory.openSession(); + Transaction txn = session.beginTransaction(); + try { + session + .createQuery( + "update MaliciousEventEntity m set m.alertedToBackend = true where m.id in :ids") + .setParameterList("ids", ids) + .executeUpdate(); + } catch (Exception ex) { + ex.printStackTrace(); + txn.rollback(); + } finally { + txn.commit(); + session.close(); + } } - } - private List getSampleMaliciousRequests(String actor, String filterId) { - Session session = this.sessionFactory.openSession(); - Transaction txn = session.beginTransaction(); - try { - return session - .createQuery( - "from MaliciousEventEntity m where m.actor = :actor and m.filterId = :filterId and" - + " m.alertedToBackend = false order by m.createdAt desc", - MaliciousEventEntity.class) - .setParameter("actor", actor) - .setParameter("filterId", filterId) - .setMaxResults(50) - .getResultList(); - } catch (Exception ex) { - ex.printStackTrace(); - txn.rollback(); - } finally { - txn.commit(); - session.close(); + private List getSampleMaliciousRequests(String actor, String filterId) { + Session session = this.sessionFactory.openSession(); + Transaction txn = session.beginTransaction(); + try { + return session + .createQuery( + "from MaliciousEventEntity m where m.actor = :actor and m.filterId = :filterId and" + + " m.alertedToBackend = false order by m.createdAt desc", + MaliciousEventEntity.class) + .setParameter("actor", actor) + .setParameter("filterId", filterId) + .setMaxResults(50) + .getResultList(); + } catch (Exception ex) { + ex.printStackTrace(); + txn.rollback(); + } finally { + txn.commit(); + session.close(); + } + + return Collections.emptyList(); } - return Collections.emptyList(); - } + protected void processRecords(ConsumerRecords records) { + records.forEach( + r -> { + MaliciousEventKafkaEnvelope envelope; + try { + envelope = MaliciousEventKafkaEnvelope.parseFrom(r.value()); + } catch (InvalidProtocolBufferException e) { + e.printStackTrace(); + return; + } - protected void processRecords(ConsumerRecords records) { - records.forEach( - r -> { - String message = r.value(); - MaliciousEventMessage.Builder builder = MaliciousEventMessage.newBuilder(); - MessageEnvelope m = MessageEnvelope.unmarshal(message).orElse(null); - if (m == null) { - return; - } + if (envelope == null) { + return; + } - try { - JsonFormat.parser().merge(m.getData(), builder); - } catch (InvalidProtocolBufferException e) { - e.printStackTrace(); - return; - } + try { + MaliciousEventMessage evt = envelope.getMaliciousEvent(); - MaliciousEventMessage evt = builder.build(); + // Get sample data from postgres for this alert + List sampleData = this.getSampleMaliciousRequests(evt.getActor(), + evt.getFilterId()); + RecordMaliciousEventRequest.Builder reqBuilder = RecordMaliciousEventRequest.newBuilder() + .setMaliciousEvent(evt); + if (EventType.EVENT_TYPE_AGGREGATED.equals(evt.getEventType())) { + sampleData = this.getSampleMaliciousRequests(evt.getActor(), evt.getFilterId()); - // Get sample data from postgres for this alert - List sampleData = - this.getSampleMaliciousRequests(evt.getActor(), evt.getFilterId()); - try { - RecordMaliciousEventRequest.Builder reqBuilder = - RecordMaliciousEventRequest.newBuilder().setMaliciousEvent(evt); - if (EventType.EVENT_TYPE_AGGREGATED.equals(evt.getEventType())) { - sampleData = this.getSampleMaliciousRequests(evt.getActor(), evt.getFilterId()); + reqBuilder.addAllSampleRequests( + sampleData.stream() + .map( + d -> SampleMaliciousRequest.newBuilder() + .setUrl(d.getUrl()) + .setMethod(d.getMethod().name()) + .setTimestamp(d.getTimestamp()) + .setPayload(d.getOrig()) + .setIp(d.getIp()) + .setApiCollectionId(d.getApiCollectionId()) + .build()) + .collect(Collectors.toList())); + } - reqBuilder.addAllSampleRequests( - sampleData.stream() - .map( - d -> - SampleMaliciousRequest.newBuilder() - .setUrl(d.getUrl()) - .setMethod(d.getMethod().name()) - .setTimestamp(d.getTimestamp()) - .setPayload(d.getOrig()) - .setIp(d.getIp()) - .setApiCollectionId(d.getApiCollectionId()) - .build()) - .collect(Collectors.toList())); - } + List sampleIds = sampleData.stream().map(MaliciousEventEntity::getId) + .collect(Collectors.toList()); - List sampleIds = - sampleData.stream().map(MaliciousEventEntity::getId).collect(Collectors.toList()); + RecordMaliciousEventRequest maliciousEventRequest = reqBuilder.build(); + String url = System.getenv("AKTO_THREAT_PROTECTION_BACKEND_URL"); + String token = System.getenv("AKTO_THREAT_PROTECTION_BACKEND_TOKEN"); + ProtoMessageUtils.toString(maliciousEventRequest) + .ifPresent( + msg -> { + StringEntity requestEntity = new StringEntity(msg, + ContentType.APPLICATION_JSON); + HttpPost req = new HttpPost( + String.format("%s/api/threat_detection/record_malicious_event", + url)); + req.addHeader("Authorization", "Bearer " + token); + req.setEntity(requestEntity); + try { + System.out.println("Sending request to backend: " + msg); + this.httpClient.execute(req); + } catch (IOException e) { + e.printStackTrace(); + } - RecordMaliciousEventRequest maliciousEventRequest = reqBuilder.build(); - String url = System.getenv("AKTO_THREAT_PROTECTION_BACKEND_URL"); - String token = System.getenv("AKTO_THREAT_PROTECTION_BACKEND_TOKEN"); - ProtoMessageUtils.toString(maliciousEventRequest) - .ifPresent( - msg -> { - StringEntity requestEntity = - new StringEntity(msg, ContentType.APPLICATION_JSON); - HttpPost req = - new HttpPost( - String.format("%s/api/threat_detection/record_malicious_event", url)); - req.addHeader("Authorization", "Bearer " + token); - req.setEntity(requestEntity); - try { - System.out.println("Sending request to backend: " + msg); - this.httpClient.execute(req); - } catch (IOException e) { + if (!sampleIds.isEmpty()) { + markSampleDataAsSent(sampleIds); + } + }); + } catch (Exception e) { e.printStackTrace(); - } - - if (!sampleIds.isEmpty()) { - markSampleDataAsSent(sampleIds); - } - }); - } catch (Exception e) { - e.printStackTrace(); - } - }); - } + } + }); + } } diff --git a/protobuf/health/service/v1/service.proto b/protobuf/health/service/v1/service.proto deleted file mode 100644 index 755ed7d4c1..0000000000 --- a/protobuf/health/service/v1/service.proto +++ /dev/null @@ -1,24 +0,0 @@ -syntax = "proto3"; - -package health.service.v1; - -option java_outer_classname = "HealthServiceProto"; -option java_package = "health.service.v1"; - -message CheckRequest { - string service = 1; -} - -message CheckResponse { - enum ServingStatus { - SERVING_STATUS_UNSPECIFIED = 0; - SERVING_STATUS_SERVING = 1; - SERVING_STATUS_NOT_SERVING = 2; - SERVING_STATUS_SERVICE_UNKNOWN = 3; // Used only by the Watch method. - } - ServingStatus status = 1; -} - -service HealthService { - rpc Check(CheckRequest) returns (CheckResponse); -} \ No newline at end of file diff --git a/protobuf/threat_detection/message/malicious_event/v1/message.proto b/protobuf/threat_detection/message/malicious_event/v1/message.proto index 8c9fabff98..3850fc90ad 100644 --- a/protobuf/threat_detection/message/malicious_event/v1/message.proto +++ b/protobuf/threat_detection/message/malicious_event/v1/message.proto @@ -18,3 +18,9 @@ message MaliciousEventMessage { string latest_api_payload = 8; threat_detection.message.malicious_event.event_type.v1.EventType event_type = 9; } + +message MaliciousEventKafkaEnvelope { + string account_id = 1; + string actor = 2; + MaliciousEventMessage malicious_event = 3; +} diff --git a/protobuf/threat_detection/message/sample_request/v1/message.proto b/protobuf/threat_detection/message/sample_request/v1/message.proto index 1020e56636..4192572509 100644 --- a/protobuf/threat_detection/message/sample_request/v1/message.proto +++ b/protobuf/threat_detection/message/sample_request/v1/message.proto @@ -14,3 +14,9 @@ message SampleMaliciousRequest { string payload = 6; string filter_id = 7; } + +message SampleRequestKafkaEnvelope { + string account_id = 1; + string actor = 2; + SampleMaliciousRequest malicious_request = 3; +}