From 0b24e0ca421f17c5ebc0188f19b8ec5dee7351b3 Mon Sep 17 00:00:00 2001 From: Ajinkya Date: Wed, 8 Jan 2025 16:28:26 +0530 Subject: [PATCH] reading kafka key and value serializer from config --- .../java/com/akto/threat/backend/Main.java | 3 ++ .../service/MaliciousEventService.java | 6 ++-- .../backend/tasks/FlushMessagesToDB.java | 22 +++++++++++--- .../java/com/akto/threat/detection/Main.java | 7 ++++- .../detection/kafka/KafkaProtoProducer.java | 27 ++++++++--------- .../tasks/AbstractKafkaConsumerTask.java | 26 +++++++++-------- .../tasks/MaliciousTrafficDetectorTask.java | 28 ++++++++++-------- libs/protobuf/pom.xml | 2 +- .../src/main/java/com/akto/kafka/Kafka.java | 29 ++++++++++++++----- .../main/java/com/akto/kafka/KafkaConfig.java | 26 +++++++++++++++++ .../main/java/com/akto/kafka/Serializer.java | 26 +++++++++++++++++ 11 files changed, 148 insertions(+), 54 deletions(-) create mode 100644 libs/utils/src/main/java/com/akto/kafka/Serializer.java diff --git a/apps/threat-detection-backend/src/main/java/com/akto/threat/backend/Main.java b/apps/threat-detection-backend/src/main/java/com/akto/threat/backend/Main.java index d9fd627e8b..44d546378a 100644 --- a/apps/threat-detection-backend/src/main/java/com/akto/threat/backend/Main.java +++ b/apps/threat-detection-backend/src/main/java/com/akto/threat/backend/Main.java @@ -7,6 +7,7 @@ import com.akto.kafka.KafkaConfig; import com.akto.kafka.KafkaConsumerConfig; import com.akto.kafka.KafkaProducerConfig; +import com.akto.kafka.Serializer; import com.akto.threat.backend.client.IPLookupClient; import com.akto.threat.backend.service.MaliciousEventService; import com.akto.threat.backend.service.ThreatActorService; @@ -56,6 +57,8 @@ public static void main(String[] args) throws Exception { .build()) .setProducerConfig( KafkaProducerConfig.newBuilder().setBatchSize(100).setLingerMs(1000).build()) + .setKeySerializer(Serializer.STRING) + .setValueSerializer(Serializer.STRING) .build(); IPLookupClient ipLookupClient = new IPLookupClient(getMaxmindFile()); diff --git a/apps/threat-detection-backend/src/main/java/com/akto/threat/backend/service/MaliciousEventService.java b/apps/threat-detection-backend/src/main/java/com/akto/threat/backend/service/MaliciousEventService.java index f69640a7fa..1e40106b0c 100644 --- a/apps/threat-detection-backend/src/main/java/com/akto/threat/backend/service/MaliciousEventService.java +++ b/apps/threat-detection-backend/src/main/java/com/akto/threat/backend/service/MaliciousEventService.java @@ -33,8 +33,8 @@ public class MaliciousEventService { private final Kafka kafka; - private MongoClient mongoClient; - private IPLookupClient ipLookupClient; + private final MongoClient mongoClient; + private final IPLookupClient ipLookupClient; public MaliciousEventService( KafkaConfig kafkaConfig, MongoClient mongoClient, IPLookupClient ipLookupClient) { @@ -44,8 +44,6 @@ public MaliciousEventService( } public void recordMaliciousEvent(String accountId, RecordMaliciousEventRequest request) { - System.out.println("Received malicious event: " + request); - MaliciousEventMessage evt = request.getMaliciousEvent(); String actor = evt.getActor(); String filterId = evt.getFilterId(); diff --git a/apps/threat-detection-backend/src/main/java/com/akto/threat/backend/tasks/FlushMessagesToDB.java b/apps/threat-detection-backend/src/main/java/com/akto/threat/backend/tasks/FlushMessagesToDB.java index 2b2d99ca44..1faf433e77 100644 --- a/apps/threat-detection-backend/src/main/java/com/akto/threat/backend/tasks/FlushMessagesToDB.java +++ b/apps/threat-detection-backend/src/main/java/com/akto/threat/backend/tasks/FlushMessagesToDB.java @@ -18,8 +18,11 @@ import java.util.*; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; + +import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.serialization.StringDeserializer; public class FlushMessagesToDB { @@ -32,11 +35,22 @@ public class FlushMessagesToDB { public FlushMessagesToDB(KafkaConfig kafkaConfig, MongoClient mongoClient) { String kafkaBrokerUrl = kafkaConfig.getBootstrapServers(); - String groupId = kafkaConfig.getGroupId(); - Properties properties = - Utils.configProperties( - kafkaBrokerUrl, groupId, kafkaConfig.getConsumerConfig().getMaxPollRecords()); + Properties properties = new Properties(); + properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBrokerUrl); + properties.put( + ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, + kafkaConfig.getKeySerializer().getDeserializer()); + properties.put( + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, + kafkaConfig.getValueSerializer().getDeserializer()); + properties.put( + ConsumerConfig.MAX_POLL_RECORDS_CONFIG, + kafkaConfig.getConsumerConfig().getMaxPollRecords()); + properties.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaConfig.getGroupId()); + properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); + this.kafkaConsumer = new KafkaConsumer<>(properties); this.kafkaConfig = kafkaConfig; diff --git a/apps/threat-detection/src/main/java/com/akto/threat/detection/Main.java b/apps/threat-detection/src/main/java/com/akto/threat/detection/Main.java index 1d1b061d83..0ea77e9470 100644 --- a/apps/threat-detection/src/main/java/com/akto/threat/detection/Main.java +++ b/apps/threat-detection/src/main/java/com/akto/threat/detection/Main.java @@ -4,6 +4,7 @@ import com.akto.kafka.KafkaConfig; import com.akto.kafka.KafkaConsumerConfig; import com.akto.kafka.KafkaProducerConfig; +import com.akto.kafka.Serializer; import com.akto.threat.detection.constants.KafkaTopic; import com.akto.threat.detection.session_factory.SessionFactoryUtils; import com.akto.threat.detection.tasks.CleanupTask; @@ -19,7 +20,7 @@ public class Main { private static final String CONSUMER_GROUP_ID = "akto.threat_detection"; - public static void main(String[] args) throws Exception { + public static void main(String[] args) { runMigrations(); SessionFactory sessionFactory = SessionFactoryUtils.createFactory(); @@ -37,6 +38,8 @@ public static void main(String[] args) throws Exception { .build()) .setProducerConfig( KafkaProducerConfig.newBuilder().setBatchSize(100).setLingerMs(100).build()) + .setKeySerializer(Serializer.STRING) + .setValueSerializer(Serializer.BYTE_ARRAY) .build(); KafkaConfig internalKafka = @@ -50,6 +53,8 @@ public static void main(String[] args) throws Exception { .build()) .setProducerConfig( KafkaProducerConfig.newBuilder().setBatchSize(100).setLingerMs(100).build()) + .setKeySerializer(Serializer.STRING) + .setValueSerializer(Serializer.BYTE_ARRAY) .build(); new MaliciousTrafficDetectorTask(trafficKafka, internalKafka, createRedisClient()).run(); diff --git a/apps/threat-detection/src/main/java/com/akto/threat/detection/kafka/KafkaProtoProducer.java b/apps/threat-detection/src/main/java/com/akto/threat/detection/kafka/KafkaProtoProducer.java index cfd51ace83..913eec9a78 100644 --- a/apps/threat-detection/src/main/java/com/akto/threat/detection/kafka/KafkaProtoProducer.java +++ b/apps/threat-detection/src/main/java/com/akto/threat/detection/kafka/KafkaProtoProducer.java @@ -1,21 +1,22 @@ package com.akto.threat.detection.kafka; import com.akto.kafka.KafkaConfig; +import com.akto.kafka.Serializer; import com.google.protobuf.Message; import java.time.Duration; import java.util.Properties; import org.apache.kafka.clients.producer.*; -import org.apache.kafka.common.serialization.StringSerializer; public class KafkaProtoProducer { - private KafkaProducer producer; + private final KafkaProducer producer; public boolean producerReady; public KafkaProtoProducer(KafkaConfig kafkaConfig) { - this.producer = generateProducer( - kafkaConfig.getBootstrapServers(), - kafkaConfig.getProducerConfig().getLingerMs(), - kafkaConfig.getProducerConfig().getBatchSize()); + this.producer = + generateProducer( + kafkaConfig.getBootstrapServers(), + kafkaConfig.getProducerConfig().getLingerMs(), + kafkaConfig.getProducerConfig().getBatchSize()); } public void send(String topic, Message message) { @@ -28,21 +29,21 @@ public void close() { producer.close(Duration.ofMillis(0)); // close immediately } - private KafkaProducer generateProducer(String brokerIP, int lingerMS, int batchSize) { - if (producer != null) - close(); // close existing producer connection + private KafkaProducer generateProducer( + String brokerIP, int lingerMS, int batchSize) { + if (producer != null) close(); // close existing producer connection int requestTimeoutMs = 5000; Properties kafkaProps = new Properties(); kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerIP); - kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, - "org.apache.kafka.common.serialization.ByteArraySerializer"); - kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, Serializer.STRING.getSerializer()); + kafkaProps.put( + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, Serializer.BYTE_ARRAY.getSerializer()); kafkaProps.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize); kafkaProps.put(ProducerConfig.LINGER_MS_CONFIG, lingerMS); kafkaProps.put(ProducerConfig.RETRIES_CONFIG, 0); kafkaProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, requestTimeoutMs); kafkaProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, lingerMS + requestTimeoutMs); - return new KafkaProducer(kafkaProps); + return new KafkaProducer<>(kafkaProps); } } diff --git a/apps/threat-detection/src/main/java/com/akto/threat/detection/tasks/AbstractKafkaConsumerTask.java b/apps/threat-detection/src/main/java/com/akto/threat/detection/tasks/AbstractKafkaConsumerTask.java index 4a0a731523..dd344ba281 100644 --- a/apps/threat-detection/src/main/java/com/akto/threat/detection/tasks/AbstractKafkaConsumerTask.java +++ b/apps/threat-detection/src/main/java/com/akto/threat/detection/tasks/AbstractKafkaConsumerTask.java @@ -1,7 +1,7 @@ package com.akto.threat.detection.tasks; import com.akto.kafka.KafkaConfig; -import com.akto.runtime.utils.Utils; + import java.time.Duration; import java.util.Collections; import java.util.Properties; @@ -11,7 +11,6 @@ import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.common.serialization.StringDeserializer; public abstract class AbstractKafkaConsumerTask implements Task { @@ -23,15 +22,17 @@ public AbstractKafkaConsumerTask(KafkaConfig kafkaConfig, String kafkaTopic) { this.kafkaTopic = kafkaTopic; this.kafkaConfig = kafkaConfig; - String kafkaBrokerUrl = kafkaConfig.getBootstrapServers(); - String groupId = kafkaConfig.getGroupId(); - Properties properties = new Properties(); - properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBrokerUrl); - properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, - "org.apache.kafka.common.serialization.ByteArrayDeserializer"); - properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, kafkaConfig.getConsumerConfig().getMaxPollRecords()); + properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfig.getBootstrapServers()); + properties.put( + ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, + kafkaConfig.getValueSerializer().getDeserializer()); + properties.put( + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, + kafkaConfig.getValueSerializer().getDeserializer()); + properties.put( + ConsumerConfig.MAX_POLL_RECORDS_CONFIG, + kafkaConfig.getConsumerConfig().getMaxPollRecords()); properties.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaConfig.getGroupId()); properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); @@ -49,8 +50,9 @@ public void run() { () -> { // Poll data from Kafka topic while (true) { - ConsumerRecords records = kafkaConsumer.poll( - Duration.ofMillis(kafkaConfig.getConsumerConfig().getPollDurationMilli())); + ConsumerRecords records = + kafkaConsumer.poll( + Duration.ofMillis(kafkaConfig.getConsumerConfig().getPollDurationMilli())); if (records.isEmpty()) { continue; } diff --git a/apps/threat-detection/src/main/java/com/akto/threat/detection/tasks/MaliciousTrafficDetectorTask.java b/apps/threat-detection/src/main/java/com/akto/threat/detection/tasks/MaliciousTrafficDetectorTask.java index ae9e2ec013..3ba3603d50 100644 --- a/apps/threat-detection/src/main/java/com/akto/threat/detection/tasks/MaliciousTrafficDetectorTask.java +++ b/apps/threat-detection/src/main/java/com/akto/threat/detection/tasks/MaliciousTrafficDetectorTask.java @@ -23,7 +23,6 @@ import com.akto.proto.generated.threat_detection.message.sample_request.v1.SampleRequestKafkaEnvelope; import com.akto.proto.http_response_param.v1.HttpResponseParam; import com.akto.rules.TestPlugin; -import com.akto.runtime.utils.Utils; import com.akto.test_editor.execution.VariableResolver; import com.akto.test_editor.filter.data_operands_impl.ValidationResult; import com.akto.threat.detection.actor.SourceIPActorGenerator; @@ -38,10 +37,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.apache.commons.lang3.math.NumberUtils; -import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.*; /* Class is responsible for consuming traffic data from the Kafka topic. @@ -66,13 +62,21 @@ public MaliciousTrafficDetectorTask( KafkaConfig trafficConfig, KafkaConfig internalConfig, RedisClient redisClient) { this.kafkaConfig = trafficConfig; - String kafkaBrokerUrl = trafficConfig.getBootstrapServers(); - String groupId = trafficConfig.getGroupId(); - - this.kafkaConsumer = - new KafkaConsumer<>( - Utils.configProperties( - kafkaBrokerUrl, groupId, trafficConfig.getConsumerConfig().getMaxPollRecords())); + Properties properties = new Properties(); + properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, trafficConfig.getBootstrapServers()); + properties.put( + ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, + trafficConfig.getKeySerializer().getDeserializer()); + properties.put( + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, + trafficConfig.getValueSerializer().getDeserializer()); + properties.put( + ConsumerConfig.MAX_POLL_RECORDS_CONFIG, + trafficConfig.getConsumerConfig().getMaxPollRecords()); + properties.put(ConsumerConfig.GROUP_ID_CONFIG, trafficConfig.getGroupId()); + properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); + this.kafkaConsumer = new KafkaConsumer<>(properties); this.httpCallParser = new HttpCallParser(120, 1000); diff --git a/libs/protobuf/pom.xml b/libs/protobuf/pom.xml index 6943beca2a..9b0d640176 100644 --- a/libs/protobuf/pom.xml +++ b/libs/protobuf/pom.xml @@ -48,7 +48,7 @@ com.google.protobuf protobuf-java - 4.28.3 + 4.29.2 com.google.protobuf diff --git a/libs/utils/src/main/java/com/akto/kafka/Kafka.java b/libs/utils/src/main/java/com/akto/kafka/Kafka.java index 834782c547..87e178434e 100644 --- a/libs/utils/src/main/java/com/akto/kafka/Kafka.java +++ b/libs/utils/src/main/java/com/akto/kafka/Kafka.java @@ -1,7 +1,6 @@ package com.akto.kafka; import org.apache.kafka.clients.producer.*; -import org.apache.kafka.common.serialization.StringSerializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -17,18 +16,29 @@ public Kafka(KafkaConfig kafkaConfig) { this( kafkaConfig.getBootstrapServers(), kafkaConfig.getProducerConfig().getLingerMs(), - kafkaConfig.getProducerConfig().getBatchSize()); + kafkaConfig.getProducerConfig().getBatchSize(), + kafkaConfig.getKeySerializer(), + kafkaConfig.getValueSerializer()); } - public Kafka(String brokerIP, int lingerMS, int batchSize) { + public Kafka( + String brokerIP, + int lingerMS, + int batchSize, + Serializer keySerializer, + Serializer valueSerializer) { producerReady = false; try { - setProducer(brokerIP, lingerMS, batchSize); + setProducer(brokerIP, lingerMS, batchSize, keySerializer, valueSerializer); } catch (Exception e) { e.printStackTrace(); } } + public Kafka(String brokerIP, int lingerMS, int batchSize) { + this(brokerIP, 0, 0, Serializer.STRING, Serializer.STRING); + } + public void send(String message, String topic) { if (!this.producerReady) return; @@ -41,14 +51,19 @@ public void close() { producer.close(Duration.ofMillis(0)); // close immediately } - private void setProducer(String brokerIP, int lingerMS, int batchSize) { + private void setProducer( + String brokerIP, + int lingerMS, + int batchSize, + Serializer keySerializer, + Serializer valueSerializer) { if (producer != null) close(); // close existing producer connection int requestTimeoutMs = 5000; Properties kafkaProps = new Properties(); kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerIP); - kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer.getSerializer()); + kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer.getSerializer()); kafkaProps.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize); kafkaProps.put(ProducerConfig.LINGER_MS_CONFIG, lingerMS); kafkaProps.put(ProducerConfig.RETRIES_CONFIG, 0); diff --git a/libs/utils/src/main/java/com/akto/kafka/KafkaConfig.java b/libs/utils/src/main/java/com/akto/kafka/KafkaConfig.java index c7209871f3..9e486518d1 100644 --- a/libs/utils/src/main/java/com/akto/kafka/KafkaConfig.java +++ b/libs/utils/src/main/java/com/akto/kafka/KafkaConfig.java @@ -5,12 +5,16 @@ public class KafkaConfig { private final String groupId; private final KafkaConsumerConfig consumerConfig; private final KafkaProducerConfig producerConfig; + private final Serializer keySerializer; + private final Serializer valueSerializer; public static class Builder { private String bootstrapServers; private String groupId; private KafkaConsumerConfig consumerConfig; private KafkaProducerConfig producerConfig; + private Serializer keySerializer; + private Serializer valueSerializer; private Builder() {} @@ -34,6 +38,16 @@ public Builder setProducerConfig(KafkaProducerConfig producerConfig) { return this; } + public Builder setKeySerializer(Serializer keySerializer) { + this.keySerializer = keySerializer; + return this; + } + + public Builder setValueSerializer(Serializer valueSerializer) { + this.valueSerializer = valueSerializer; + return this; + } + public KafkaConfig build() { return new KafkaConfig(this); } @@ -44,6 +58,10 @@ private KafkaConfig(Builder builder) { this.groupId = builder.groupId; this.consumerConfig = builder.consumerConfig; this.producerConfig = builder.producerConfig; + + this.keySerializer = builder.keySerializer == null ? Serializer.STRING : builder.keySerializer; + this.valueSerializer = + builder.valueSerializer == null ? Serializer.STRING : builder.valueSerializer; } public String getBootstrapServers() { @@ -62,6 +80,14 @@ public KafkaProducerConfig getProducerConfig() { return producerConfig; } + public Serializer getKeySerializer() { + return keySerializer; + } + + public Serializer getValueSerializer() { + return valueSerializer; + } + public static Builder newBuilder() { return new Builder(); } diff --git a/libs/utils/src/main/java/com/akto/kafka/Serializer.java b/libs/utils/src/main/java/com/akto/kafka/Serializer.java new file mode 100644 index 0000000000..0e917dd645 --- /dev/null +++ b/libs/utils/src/main/java/com/akto/kafka/Serializer.java @@ -0,0 +1,26 @@ +package com.akto.kafka; + +public enum Serializer { + STRING( + "org.apache.kafka.common.serialization.StringSerializer", + "org.apache.kafka.common.serialization.StringDeserializer"), + BYTE_ARRAY( + "org.apache.kafka.common.serialization.ByteArraySerializer", + "org.apache.kafka.common.serialization.ByteArrayDeserializer"); + + private final String serializer; + private final String deserializer; + + Serializer(String serializer, String deSerializer) { + this.serializer = serializer; + this.deserializer = deSerializer; + } + + public String getDeserializer() { + return deserializer; + } + + public String getSerializer() { + return serializer; + } +}