diff --git a/.gitignore b/.gitignore index 685da2accc..d90e415a62 100644 --- a/.gitignore +++ b/.gitignore @@ -22,3 +22,4 @@ https: **/data-zoo-data **/data-zoo-logs **/bin +.factorypath diff --git a/apps/threat-detection-backend/src/main/java/com/akto/threat/backend/service/HealthService.java b/apps/threat-detection-backend/src/main/java/com/akto/threat/backend/service/HealthService.java deleted file mode 100644 index 5469ed7d16..0000000000 --- a/apps/threat-detection-backend/src/main/java/com/akto/threat/backend/service/HealthService.java +++ /dev/null @@ -1,17 +0,0 @@ -package com.akto.threat.backend.service; - -import com.akto.proto.generated.health.service.v1.CheckRequest; -import com.akto.proto.generated.health.service.v1.CheckResponse; -import com.akto.proto.generated.health.service.v1.CheckResponse.ServingStatus; -import com.akto.proto.generated.health.service.v1.HealthServiceGrpc; -import io.grpc.stub.StreamObserver; - -public class HealthService extends HealthServiceGrpc.HealthServiceImplBase { - - @Override - public void check(CheckRequest request, StreamObserver responseObserver) { - responseObserver.onNext( - CheckResponse.newBuilder().setStatus(ServingStatus.SERVING_STATUS_SERVING).build()); - responseObserver.onCompleted(); - } -} diff --git a/apps/threat-detection/src/main/java/com/akto/threat/detection/kafka/KafkaProtoProducer.java b/apps/threat-detection/src/main/java/com/akto/threat/detection/kafka/KafkaProtoProducer.java new file mode 100644 index 0000000000..cfd51ace83 --- /dev/null +++ b/apps/threat-detection/src/main/java/com/akto/threat/detection/kafka/KafkaProtoProducer.java @@ -0,0 +1,48 @@ +package com.akto.threat.detection.kafka; + +import com.akto.kafka.KafkaConfig; +import com.google.protobuf.Message; +import java.time.Duration; +import java.util.Properties; +import org.apache.kafka.clients.producer.*; +import org.apache.kafka.common.serialization.StringSerializer; + +public class KafkaProtoProducer { + private KafkaProducer producer; + public boolean producerReady; + + public KafkaProtoProducer(KafkaConfig kafkaConfig) { + this.producer = generateProducer( + kafkaConfig.getBootstrapServers(), + kafkaConfig.getProducerConfig().getLingerMs(), + kafkaConfig.getProducerConfig().getBatchSize()); + } + + public void send(String topic, Message message) { + byte[] messageBytes = message.toByteArray(); + this.producer.send(new ProducerRecord<>(topic, messageBytes)); + } + + public void close() { + this.producerReady = false; + producer.close(Duration.ofMillis(0)); // close immediately + } + + private KafkaProducer generateProducer(String brokerIP, int lingerMS, int batchSize) { + if (producer != null) + close(); // close existing producer connection + + int requestTimeoutMs = 5000; + Properties kafkaProps = new Properties(); + kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerIP); + kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, + "org.apache.kafka.common.serialization.ByteArraySerializer"); + kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + kafkaProps.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize); + kafkaProps.put(ProducerConfig.LINGER_MS_CONFIG, lingerMS); + kafkaProps.put(ProducerConfig.RETRIES_CONFIG, 0); + kafkaProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, requestTimeoutMs); + kafkaProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, lingerMS + requestTimeoutMs); + return new KafkaProducer(kafkaProps); + } +} diff --git a/apps/threat-detection/src/main/java/com/akto/threat/detection/tasks/AbstractKafkaConsumerTask.java b/apps/threat-detection/src/main/java/com/akto/threat/detection/tasks/AbstractKafkaConsumerTask.java index cfd9ccecd6..4a0a731523 100644 --- a/apps/threat-detection/src/main/java/com/akto/threat/detection/tasks/AbstractKafkaConsumerTask.java +++ b/apps/threat-detection/src/main/java/com/akto/threat/detection/tasks/AbstractKafkaConsumerTask.java @@ -10,10 +10,12 @@ import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.StringDeserializer; -public abstract class AbstractKafkaConsumerTask implements Task { +public abstract class AbstractKafkaConsumerTask implements Task { - protected Consumer kafkaConsumer; + protected Consumer kafkaConsumer; protected KafkaConfig kafkaConfig; protected String kafkaTopic; @@ -24,9 +26,16 @@ public AbstractKafkaConsumerTask(KafkaConfig kafkaConfig, String kafkaTopic) { String kafkaBrokerUrl = kafkaConfig.getBootstrapServers(); String groupId = kafkaConfig.getGroupId(); - Properties properties = - Utils.configProperties( - kafkaBrokerUrl, groupId, kafkaConfig.getConsumerConfig().getMaxPollRecords()); + Properties properties = new Properties(); + properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBrokerUrl); + properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, + "org.apache.kafka.common.serialization.ByteArrayDeserializer"); + properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, kafkaConfig.getConsumerConfig().getMaxPollRecords()); + properties.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaConfig.getGroupId()); + properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); + this.kafkaConsumer = new KafkaConsumer<>(properties); } @@ -40,9 +49,8 @@ public void run() { () -> { // Poll data from Kafka topic while (true) { - ConsumerRecords records = - kafkaConsumer.poll( - Duration.ofMillis(kafkaConfig.getConsumerConfig().getPollDurationMilli())); + ConsumerRecords records = kafkaConsumer.poll( + Duration.ofMillis(kafkaConfig.getConsumerConfig().getPollDurationMilli())); if (records.isEmpty()) { continue; } @@ -60,5 +68,5 @@ public void run() { }); } - abstract void processRecords(ConsumerRecords records); + abstract void processRecords(ConsumerRecords records); } diff --git a/apps/threat-detection/src/main/java/com/akto/threat/detection/tasks/FlushSampleDataTask.java b/apps/threat-detection/src/main/java/com/akto/threat/detection/tasks/FlushSampleDataTask.java index 6829d6ecad..4b857c389e 100644 --- a/apps/threat-detection/src/main/java/com/akto/threat/detection/tasks/FlushSampleDataTask.java +++ b/apps/threat-detection/src/main/java/com/akto/threat/detection/tasks/FlushSampleDataTask.java @@ -3,6 +3,7 @@ import com.akto.dto.type.URLMethods; import com.akto.kafka.KafkaConfig; import com.akto.proto.generated.threat_detection.message.sample_request.v1.SampleMaliciousRequest; +import com.akto.proto.generated.threat_detection.message.sample_request.v1.SampleRequestKafkaEnvelope; import com.akto.threat.detection.db.entity.MaliciousEventEntity; import com.akto.threat.detection.dto.MessageEnvelope; import com.google.protobuf.InvalidProtocolBufferException; @@ -17,7 +18,7 @@ /* This will read sample malicious data from kafka topic and save it to DB. */ -public class FlushSampleDataTask extends AbstractKafkaConsumerTask { +public class FlushSampleDataTask extends AbstractKafkaConsumerTask { private final SessionFactory sessionFactory; @@ -27,37 +28,29 @@ public FlushSampleDataTask( this.sessionFactory = sessionFactory; } - protected void processRecords(ConsumerRecords records) { + protected void processRecords(ConsumerRecords records) { List events = new ArrayList<>(); records.forEach( r -> { - String message = r.value(); - SampleMaliciousRequest.Builder builder = SampleMaliciousRequest.newBuilder(); - MessageEnvelope m = MessageEnvelope.unmarshal(message).orElse(null); - if (m == null) { - return; - } - + SampleRequestKafkaEnvelope envelope; try { - JsonFormat.parser().merge(m.getData(), builder); + envelope = SampleRequestKafkaEnvelope.parseFrom(r.value()); + SampleMaliciousRequest evt = envelope.getMaliciousRequest(); + + events.add( + MaliciousEventEntity.newBuilder() + .setActor(envelope.getActor()) + .setFilterId(evt.getFilterId()) + .setUrl(evt.getUrl()) + .setMethod(URLMethods.Method.fromString(evt.getMethod())) + .setTimestamp(evt.getTimestamp()) + .setOrig(evt.getPayload()) + .setApiCollectionId(evt.getApiCollectionId()) + .setIp(evt.getIp()) + .build()); } catch (InvalidProtocolBufferException e) { e.printStackTrace(); - return; } - - SampleMaliciousRequest evt = builder.build(); - - events.add( - MaliciousEventEntity.newBuilder() - .setActor(m.getActor()) - .setFilterId(evt.getFilterId()) - .setUrl(evt.getUrl()) - .setMethod(URLMethods.Method.fromString(evt.getMethod())) - .setTimestamp(evt.getTimestamp()) - .setOrig(evt.getPayload()) - .setApiCollectionId(evt.getApiCollectionId()) - .setIp(evt.getIp()) - .build()); }); Session session = this.sessionFactory.openSession(); diff --git a/apps/threat-detection/src/main/java/com/akto/threat/detection/tasks/MaliciousTrafficDetectorTask.java b/apps/threat-detection/src/main/java/com/akto/threat/detection/tasks/MaliciousTrafficDetectorTask.java index bc652e6714..bac78f4231 100644 --- a/apps/threat-detection/src/main/java/com/akto/threat/detection/tasks/MaliciousTrafficDetectorTask.java +++ b/apps/threat-detection/src/main/java/com/akto/threat/detection/tasks/MaliciousTrafficDetectorTask.java @@ -17,8 +17,10 @@ import com.akto.kafka.Kafka; import com.akto.kafka.KafkaConfig; import com.akto.proto.generated.threat_detection.message.malicious_event.event_type.v1.EventType; +import com.akto.proto.generated.threat_detection.message.malicious_event.v1.MaliciousEventKafkaEnvelope; import com.akto.proto.generated.threat_detection.message.malicious_event.v1.MaliciousEventMessage; import com.akto.proto.generated.threat_detection.message.sample_request.v1.SampleMaliciousRequest; +import com.akto.proto.generated.threat_detection.message.sample_request.v1.SampleRequestKafkaEnvelope; import com.akto.rules.TestPlugin; import com.akto.runtime.utils.Utils; import com.akto.test_editor.execution.VariableResolver; @@ -27,6 +29,7 @@ import com.akto.threat.detection.cache.RedisBackedCounterCache; import com.akto.threat.detection.constants.KafkaTopic; import com.akto.threat.detection.dto.MessageEnvelope; +import com.akto.threat.detection.kafka.KafkaProtoProducer; import com.akto.threat.detection.smart_event_detector.window_based.WindowBasedThresholdNotifier; import com.google.protobuf.InvalidProtocolBufferException; import io.lettuce.core.RedisClient; @@ -54,7 +57,7 @@ public class MaliciousTrafficDetectorTask implements Task { private int filterLastUpdatedAt = 0; private int filterUpdateIntervalSec = 300; - private final Kafka internalKafka; + private final KafkaProtoProducer internalKafka; private static final DataActor dataActor = DataActorFactory.fetchInstance(); @@ -65,23 +68,17 @@ public MaliciousTrafficDetectorTask( String kafkaBrokerUrl = trafficConfig.getBootstrapServers(); String groupId = trafficConfig.getGroupId(); - this.kafkaConsumer = - new KafkaConsumer<>( - Utils.configProperties( - kafkaBrokerUrl, groupId, trafficConfig.getConsumerConfig().getMaxPollRecords())); + this.kafkaConsumer = new KafkaConsumer<>( + Utils.configProperties( + kafkaBrokerUrl, groupId, trafficConfig.getConsumerConfig().getMaxPollRecords())); this.httpCallParser = new HttpCallParser(120, 1000); - this.windowBasedThresholdNotifier = - new WindowBasedThresholdNotifier( - new RedisBackedCounterCache(redisClient, "wbt"), - new WindowBasedThresholdNotifier.Config(100, 10 * 60)); + this.windowBasedThresholdNotifier = new WindowBasedThresholdNotifier( + new RedisBackedCounterCache(redisClient, "wbt"), + new WindowBasedThresholdNotifier.Config(100, 10 * 60)); - this.internalKafka = - new Kafka( - internalConfig.getBootstrapServers(), - internalConfig.getProducerConfig().getLingerMs(), - internalConfig.getProducerConfig().getBatchSize()); + this.internalKafka = new KafkaProtoProducer(internalConfig); } public void run() { @@ -91,9 +88,8 @@ public void run() { () -> { // Poll data from Kafka topic while (true) { - ConsumerRecords records = - kafkaConsumer.poll( - Duration.ofMillis(kafkaConfig.getConsumerConfig().getPollDurationMilli())); + ConsumerRecords records = kafkaConsumer.poll( + Duration.ofMillis(kafkaConfig.getConsumerConfig().getPollDurationMilli())); try { for (ConsumerRecord record : records) { @@ -133,8 +129,7 @@ private boolean validateFilterForRequest( int apiCollectionId = httpCallParser.createApiCollectionId(responseParam); responseParam.requestParams.setApiCollectionId(apiCollectionId); String url = responseParam.getRequestParams().getURL(); - URLMethods.Method method = - URLMethods.Method.fromString(responseParam.getRequestParams().getMethod()); + URLMethods.Method method = URLMethods.Method.fromString(responseParam.getRequestParams().getMethod()); ApiInfo.ApiInfoKey apiInfoKey = new ApiInfo.ApiInfoKey(apiCollectionId, url, method); Map varMap = apiFilter.resolveVarMap(); VariableResolver.resolveWordList( @@ -146,9 +141,8 @@ private boolean validateFilterForRequest( }, apiInfoKey); String filterExecutionLogId = UUID.randomUUID().toString(); - ValidationResult res = - TestPlugin.validateFilter( - apiFilter.getFilter().getNode(), rawApi, apiInfoKey, varMap, filterExecutionLogId); + ValidationResult res = TestPlugin.validateFilter( + apiFilter.getFilter().getNode(), rawApi, apiInfoKey, varMap, filterExecutionLogId); return res.getIsValid(); } catch (Exception e) { @@ -168,7 +162,7 @@ private void processRecord(ConsumerRecord record) { return; } - List maliciousMessages = new ArrayList<>(); + List maliciousMessages = new ArrayList<>(); System.out.println("Total number of filters: " + filters.size()); @@ -200,24 +194,22 @@ private void processRecord(ConsumerRecord record) { String groupKey = apiFilter.getId(); String aggKey = actor + "|" + groupKey; - SampleMaliciousRequest maliciousReq = - SampleMaliciousRequest.newBuilder() - .setUrl(responseParam.getRequestParams().getURL()) - .setMethod(responseParam.getRequestParams().getMethod()) - .setPayload(responseParam.getOrig()) - .setIp(actor) // For now using actor as IP - .setApiCollectionId(responseParam.getRequestParams().getApiCollectionId()) - .setTimestamp(responseParam.getTime()) - .setFilterId(apiFilter.getId()) - .build(); - - try { - maliciousMessages.add( - MessageEnvelope.generateEnvelope( - responseParam.getAccountId(), actor, maliciousReq)); - } catch (InvalidProtocolBufferException e) { - return; - } + SampleMaliciousRequest maliciousReq = SampleMaliciousRequest.newBuilder() + .setUrl(responseParam.getRequestParams().getURL()) + .setMethod(responseParam.getRequestParams().getMethod()) + .setPayload(responseParam.getOrig()) + .setIp(actor) // For now using actor as IP + .setApiCollectionId(responseParam.getRequestParams().getApiCollectionId()) + .setTimestamp(responseParam.getTime()) + .setFilterId(apiFilter.getId()) + .build(); + + maliciousMessages.add( + SampleRequestKafkaEnvelope.newBuilder() + .setActor(actor) + .setAccountId(responseParam.getAccountId()) + .setMaliciousRequest(maliciousReq) + .build()); if (!isAggFilter) { generateAndPushMaliciousEventRequest( @@ -227,8 +219,8 @@ private void processRecord(ConsumerRecord record) { // Aggregation rules for (Rule rule : aggRules.getRule()) { - WindowBasedThresholdNotifier.Result result = - this.windowBasedThresholdNotifier.shouldNotify(aggKey, maliciousReq, rule); + WindowBasedThresholdNotifier.Result result = this.windowBasedThresholdNotifier.shouldNotify(aggKey, + maliciousReq, rule); if (result.shouldNotify()) { System.out.print("Notifying for aggregation rule: " + rule); @@ -249,12 +241,7 @@ private void processRecord(ConsumerRecord record) { try { maliciousMessages.forEach( sample -> { - sample - .marshal() - .ifPresent( - data -> { - internalKafka.send(data, KafkaTopic.ThreatDetection.MALICIOUS_EVENTS); - }); + internalKafka.send(KafkaTopic.ThreatDetection.MALICIOUS_EVENTS, sample); }); } catch (Exception e) { e.printStackTrace(); @@ -267,25 +254,27 @@ private void generateAndPushMaliciousEventRequest( HttpResponseParams responseParam, SampleMaliciousRequest maliciousReq, EventType eventType) { - MaliciousEventMessage maliciousEvent = - MaliciousEventMessage.newBuilder() - .setFilterId(apiFilter.getId()) - .setActor(actor) - .setDetectedAt(responseParam.getTime()) - .setEventType(eventType) - .setLatestApiCollectionId(maliciousReq.getApiCollectionId()) - .setLatestApiIp(maliciousReq.getIp()) - .setLatestApiPayload(maliciousReq.getPayload()) - .setLatestApiMethod(maliciousReq.getMethod()) - .setDetectedAt(responseParam.getTime()) - .build(); + MaliciousEventMessage maliciousEvent = MaliciousEventMessage.newBuilder() + .setFilterId(apiFilter.getId()) + .setActor(actor) + .setDetectedAt(responseParam.getTime()) + .setEventType(eventType) + .setLatestApiCollectionId(maliciousReq.getApiCollectionId()) + .setLatestApiIp(maliciousReq.getIp()) + .setLatestApiPayload(maliciousReq.getPayload()) + .setLatestApiMethod(maliciousReq.getMethod()) + .setDetectedAt(responseParam.getTime()) + .build(); try { System.out.println("Pushing malicious event to kafka: " + maliciousEvent); + MaliciousEventKafkaEnvelope envelope = MaliciousEventKafkaEnvelope.newBuilder().setActor(actor) + .setAccountId(responseParam.getAccountId()) + .setMaliciousEvent(maliciousEvent).build(); MessageEnvelope.generateEnvelope(responseParam.getAccountId(), actor, maliciousEvent) .marshal() .ifPresent( data -> { - internalKafka.send(data, KafkaTopic.ThreatDetection.ALERTS); + internalKafka.send(KafkaTopic.ThreatDetection.ALERTS, envelope); }); } catch (InvalidProtocolBufferException e) { e.printStackTrace(); diff --git a/apps/threat-detection/src/main/java/com/akto/threat/detection/tasks/SendMaliciousEventsToBackend.java b/apps/threat-detection/src/main/java/com/akto/threat/detection/tasks/SendMaliciousEventsToBackend.java index c2a9e7cf54..2ee173f1a0 100644 --- a/apps/threat-detection/src/main/java/com/akto/threat/detection/tasks/SendMaliciousEventsToBackend.java +++ b/apps/threat-detection/src/main/java/com/akto/threat/detection/tasks/SendMaliciousEventsToBackend.java @@ -2,6 +2,7 @@ import com.akto.kafka.KafkaConfig; import com.akto.proto.generated.threat_detection.message.malicious_event.event_type.v1.EventType; +import com.akto.proto.generated.threat_detection.message.malicious_event.v1.MaliciousEventKafkaEnvelope; import com.akto.proto.generated.threat_detection.message.malicious_event.v1.MaliciousEventMessage; import com.akto.proto.generated.threat_detection.message.sample_request.v1.SampleMaliciousRequest; import com.akto.proto.generated.threat_detection.service.malicious_alert_service.v1.RecordMaliciousEventRequest; @@ -28,133 +29,130 @@ /* This will send alerts to threat detection backend */ -public class SendMaliciousEventsToBackend extends AbstractKafkaConsumerTask { +public class SendMaliciousEventsToBackend extends AbstractKafkaConsumerTask { - private final SessionFactory sessionFactory; - private final CloseableHttpClient httpClient; + private final SessionFactory sessionFactory; + private final CloseableHttpClient httpClient; - public SendMaliciousEventsToBackend( - SessionFactory sessionFactory, KafkaConfig trafficConfig, String topic) { - super(trafficConfig, topic); - this.sessionFactory = sessionFactory; - this.httpClient = HttpClients.createDefault(); - } + public SendMaliciousEventsToBackend( + SessionFactory sessionFactory, KafkaConfig trafficConfig, String topic) { + super(trafficConfig, topic); + this.sessionFactory = sessionFactory; + this.httpClient = HttpClients.createDefault(); + } - private void markSampleDataAsSent(List ids) { - Session session = this.sessionFactory.openSession(); - Transaction txn = session.beginTransaction(); - try { - session - .createQuery( - "update MaliciousEventEntity m set m.alertedToBackend = true where m.id in :ids") - .setParameterList("ids", ids) - .executeUpdate(); - } catch (Exception ex) { - ex.printStackTrace(); - txn.rollback(); - } finally { - txn.commit(); - session.close(); + private void markSampleDataAsSent(List ids) { + Session session = this.sessionFactory.openSession(); + Transaction txn = session.beginTransaction(); + try { + session + .createQuery( + "update MaliciousEventEntity m set m.alertedToBackend = true where m.id in :ids") + .setParameterList("ids", ids) + .executeUpdate(); + } catch (Exception ex) { + ex.printStackTrace(); + txn.rollback(); + } finally { + txn.commit(); + session.close(); + } } - } - private List getSampleMaliciousRequests(String actor, String filterId) { - Session session = this.sessionFactory.openSession(); - Transaction txn = session.beginTransaction(); - try { - return session - .createQuery( - "from MaliciousEventEntity m where m.actor = :actor and m.filterId = :filterId and" - + " m.alertedToBackend = false order by m.createdAt desc", - MaliciousEventEntity.class) - .setParameter("actor", actor) - .setParameter("filterId", filterId) - .setMaxResults(50) - .getResultList(); - } catch (Exception ex) { - ex.printStackTrace(); - txn.rollback(); - } finally { - txn.commit(); - session.close(); + private List getSampleMaliciousRequests(String actor, String filterId) { + Session session = this.sessionFactory.openSession(); + Transaction txn = session.beginTransaction(); + try { + return session + .createQuery( + "from MaliciousEventEntity m where m.actor = :actor and m.filterId = :filterId and" + + " m.alertedToBackend = false order by m.createdAt desc", + MaliciousEventEntity.class) + .setParameter("actor", actor) + .setParameter("filterId", filterId) + .setMaxResults(50) + .getResultList(); + } catch (Exception ex) { + ex.printStackTrace(); + txn.rollback(); + } finally { + txn.commit(); + session.close(); + } + + return Collections.emptyList(); } - return Collections.emptyList(); - } + protected void processRecords(ConsumerRecords records) { + records.forEach( + r -> { + MaliciousEventKafkaEnvelope envelope; + try { + envelope = MaliciousEventKafkaEnvelope.parseFrom(r.value()); + } catch (InvalidProtocolBufferException e) { + e.printStackTrace(); + return; + } - protected void processRecords(ConsumerRecords records) { - records.forEach( - r -> { - String message = r.value(); - MaliciousEventMessage.Builder builder = MaliciousEventMessage.newBuilder(); - MessageEnvelope m = MessageEnvelope.unmarshal(message).orElse(null); - if (m == null) { - return; - } + if (envelope == null) { + return; + } - try { - JsonFormat.parser().merge(m.getData(), builder); - } catch (InvalidProtocolBufferException e) { - e.printStackTrace(); - return; - } + try { + MaliciousEventMessage evt = envelope.getMaliciousEvent(); - MaliciousEventMessage evt = builder.build(); + // Get sample data from postgres for this alert + List sampleData = this.getSampleMaliciousRequests(evt.getActor(), + evt.getFilterId()); + RecordMaliciousEventRequest.Builder reqBuilder = RecordMaliciousEventRequest.newBuilder() + .setMaliciousEvent(evt); + if (EventType.EVENT_TYPE_AGGREGATED.equals(evt.getEventType())) { + sampleData = this.getSampleMaliciousRequests(evt.getActor(), evt.getFilterId()); - // Get sample data from postgres for this alert - List sampleData = - this.getSampleMaliciousRequests(evt.getActor(), evt.getFilterId()); - try { - RecordMaliciousEventRequest.Builder reqBuilder = - RecordMaliciousEventRequest.newBuilder().setMaliciousEvent(evt); - if (EventType.EVENT_TYPE_AGGREGATED.equals(evt.getEventType())) { - sampleData = this.getSampleMaliciousRequests(evt.getActor(), evt.getFilterId()); + reqBuilder.addAllSampleRequests( + sampleData.stream() + .map( + d -> SampleMaliciousRequest.newBuilder() + .setUrl(d.getUrl()) + .setMethod(d.getMethod().name()) + .setTimestamp(d.getTimestamp()) + .setPayload(d.getOrig()) + .setIp(d.getIp()) + .setApiCollectionId(d.getApiCollectionId()) + .build()) + .collect(Collectors.toList())); + } - reqBuilder.addAllSampleRequests( - sampleData.stream() - .map( - d -> - SampleMaliciousRequest.newBuilder() - .setUrl(d.getUrl()) - .setMethod(d.getMethod().name()) - .setTimestamp(d.getTimestamp()) - .setPayload(d.getOrig()) - .setIp(d.getIp()) - .setApiCollectionId(d.getApiCollectionId()) - .build()) - .collect(Collectors.toList())); - } + List sampleIds = sampleData.stream().map(MaliciousEventEntity::getId) + .collect(Collectors.toList()); - List sampleIds = - sampleData.stream().map(MaliciousEventEntity::getId).collect(Collectors.toList()); + RecordMaliciousEventRequest maliciousEventRequest = reqBuilder.build(); + String url = System.getenv("AKTO_THREAT_PROTECTION_BACKEND_URL"); + String token = System.getenv("AKTO_THREAT_PROTECTION_BACKEND_TOKEN"); + ProtoMessageUtils.toString(maliciousEventRequest) + .ifPresent( + msg -> { + StringEntity requestEntity = new StringEntity(msg, + ContentType.APPLICATION_JSON); + HttpPost req = new HttpPost( + String.format("%s/api/threat_detection/record_malicious_event", + url)); + req.addHeader("Authorization", "Bearer " + token); + req.setEntity(requestEntity); + try { + System.out.println("Sending request to backend: " + msg); + this.httpClient.execute(req); + } catch (IOException e) { + e.printStackTrace(); + } - RecordMaliciousEventRequest maliciousEventRequest = reqBuilder.build(); - String url = System.getenv("AKTO_THREAT_PROTECTION_BACKEND_URL"); - String token = System.getenv("AKTO_THREAT_PROTECTION_BACKEND_TOKEN"); - ProtoMessageUtils.toString(maliciousEventRequest) - .ifPresent( - msg -> { - StringEntity requestEntity = - new StringEntity(msg, ContentType.APPLICATION_JSON); - HttpPost req = - new HttpPost( - String.format("%s/api/threat_detection/record_malicious_event", url)); - req.addHeader("Authorization", "Bearer " + token); - req.setEntity(requestEntity); - try { - System.out.println("Sending request to backend: " + msg); - this.httpClient.execute(req); - } catch (IOException e) { + if (!sampleIds.isEmpty()) { + markSampleDataAsSent(sampleIds); + } + }); + } catch (Exception e) { e.printStackTrace(); - } - - if (!sampleIds.isEmpty()) { - markSampleDataAsSent(sampleIds); - } - }); - } catch (Exception e) { - e.printStackTrace(); - } - }); - } + } + }); + } } diff --git a/protobuf/health/service/v1/service.proto b/protobuf/health/service/v1/service.proto deleted file mode 100644 index 755ed7d4c1..0000000000 --- a/protobuf/health/service/v1/service.proto +++ /dev/null @@ -1,24 +0,0 @@ -syntax = "proto3"; - -package health.service.v1; - -option java_outer_classname = "HealthServiceProto"; -option java_package = "health.service.v1"; - -message CheckRequest { - string service = 1; -} - -message CheckResponse { - enum ServingStatus { - SERVING_STATUS_UNSPECIFIED = 0; - SERVING_STATUS_SERVING = 1; - SERVING_STATUS_NOT_SERVING = 2; - SERVING_STATUS_SERVICE_UNKNOWN = 3; // Used only by the Watch method. - } - ServingStatus status = 1; -} - -service HealthService { - rpc Check(CheckRequest) returns (CheckResponse); -} \ No newline at end of file diff --git a/protobuf/threat_detection/message/malicious_event/v1/message.proto b/protobuf/threat_detection/message/malicious_event/v1/message.proto index 8c9fabff98..3850fc90ad 100644 --- a/protobuf/threat_detection/message/malicious_event/v1/message.proto +++ b/protobuf/threat_detection/message/malicious_event/v1/message.proto @@ -18,3 +18,9 @@ message MaliciousEventMessage { string latest_api_payload = 8; threat_detection.message.malicious_event.event_type.v1.EventType event_type = 9; } + +message MaliciousEventKafkaEnvelope { + string account_id = 1; + string actor = 2; + MaliciousEventMessage malicious_event = 3; +} diff --git a/protobuf/threat_detection/message/sample_request/v1/message.proto b/protobuf/threat_detection/message/sample_request/v1/message.proto index 1020e56636..4192572509 100644 --- a/protobuf/threat_detection/message/sample_request/v1/message.proto +++ b/protobuf/threat_detection/message/sample_request/v1/message.proto @@ -14,3 +14,9 @@ message SampleMaliciousRequest { string payload = 6; string filter_id = 7; } + +message SampleRequestKafkaEnvelope { + string account_id = 1; + string actor = 2; + SampleMaliciousRequest malicious_request = 3; +}