Skip to content

Commit

Permalink
proto changes
Browse files Browse the repository at this point in the history
  • Loading branch information
ayushaga14 committed Jan 1, 2025
1 parent c90ab69 commit 0d2bb58
Show file tree
Hide file tree
Showing 34 changed files with 10,028 additions and 25 deletions.
12 changes: 12 additions & 0 deletions apps/threat-detection/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,13 @@
<artifactId>threat-detection</artifactId>
<packaging>jar</packaging>

<repositories>
<repository>
<id>confluent</id>
<url>https://packages.confluent.io/maven/</url>
</repository>
</repositories>

<dependencies>
<dependency>
<groupId>org.apache.commons</groupId>
Expand Down Expand Up @@ -69,6 +76,11 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-protobuf-serializer</artifactId>
<version>7.5.0</version> <!-- Use the version matching your Confluent platform -->
</dependency>

<dependency>
<groupId>com.akto.libs.protobuf</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
import com.akto.data_actor.DataActor;
import com.akto.data_actor.DataActorFactory;
import com.akto.dto.ApiInfo;
import com.akto.dto.HttpRequestParams;
import com.akto.dto.HttpResponseParams;
import com.akto.dto.OriginalHttpRequest;
import com.akto.dto.RawApi;
import com.akto.dto.api_protection_parse_layer.AggregationRules;
import com.akto.dto.api_protection_parse_layer.Condition;
Expand All @@ -15,6 +17,9 @@
import com.akto.dto.type.URLMethods;
import com.akto.hybrid_parsers.HttpCallParser;
import com.akto.kafka.KafkaConfig;
import com.akto.proto.generated.threat_detection.message.http_response_param.v1.HttpResponseParam;
import com.akto.proto.generated.threat_detection.message.http_response_param.v1.HttpResponseParamProto;
import com.akto.proto.generated.threat_detection.message.http_response_param.v1.StringList;
import com.akto.proto.generated.threat_detection.message.malicious_event.event_type.v1.EventType;
import com.akto.proto.generated.threat_detection.message.malicious_event.v1.MaliciousEventKafkaEnvelope;
import com.akto.proto.generated.threat_detection.message.malicious_event.v1.MaliciousEventMessage;
Expand All @@ -30,12 +35,20 @@
import com.akto.threat.detection.dto.MessageEnvelope;
import com.akto.threat.detection.kafka.KafkaProtoProducer;
import com.akto.threat.detection.smart_event_detector.window_based.WindowBasedThresholdNotifier;
import com.akto.util.HttpRequestResponseUtils;
import com.akto.util.JSONUtils;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.util.JsonFormat;

import io.lettuce.core.RedisClient;

import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.apache.commons.lang3.math.NumberUtils;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
Expand All @@ -47,14 +60,14 @@
*/
public class MaliciousTrafficDetectorTask implements Task {

private final Consumer<String, String> kafkaConsumer;
private final Consumer<String, byte[]> kafkaConsumer;
private final KafkaConfig kafkaConfig;
private final HttpCallParser httpCallParser;
private final WindowBasedThresholdNotifier windowBasedThresholdNotifier;

private Map<String, FilterConfig> apiFilters;
private int filterLastUpdatedAt = 0;
private int filterUpdateIntervalSec = 300;
private int filterUpdateIntervalSec = 900;

private final KafkaProtoProducer internalKafka;

Expand Down Expand Up @@ -89,13 +102,15 @@ public void run() {
() -> {
// Poll data from Kafka topic
while (true) {
ConsumerRecords<String, String> records =
ConsumerRecords<String, byte[]> records =
kafkaConsumer.poll(
Duration.ofMillis(kafkaConfig.getConsumerConfig().getPollDurationMilli()));

try {
for (ConsumerRecord<String, String> record : records) {
processRecord(record);
for (ConsumerRecord<String, byte[]> record : records) {
System.out.println(record.value());
HttpResponseParam httpResponseParam = HttpResponseParam.parseFrom(record.value());
processRecord(httpResponseParam);
}

if (!records.isEmpty()) {
Expand Down Expand Up @@ -149,9 +164,27 @@ private boolean validateFilterForRequest(
return false;
}

private void processRecord(ConsumerRecord<String, String> record) {
private void processRecord(HttpResponseParam record) throws Exception {
System.out.println("Kafka record: found - ");
HttpResponseParams responseParam = HttpCallParser.parseKafkaMessage(record.value());
HttpResponseParam.Builder builder = HttpResponseParam.newBuilder();

HttpResponseParams responseParam = buildHttpResponseParam(record);

// HttpResponseParams responseParam;
// try {
// // Parse JSON into protobuf message
// JsonFormat.parser().merge(record.value(), builder);

// HttpResponseParam httpResponseParamProto = builder.build();
// responseParam = buildHttpResponseParam(httpResponseParamProto, record.value());
// System.out.println("Parsed Protobuf Message: ");

// } catch (Exception e) {
// e.printStackTrace();
// return;
// }

//HttpResponseParams responseParam = HttpCallParser.parseKafkaMessage(record);
Context.accountId.set(Integer.parseInt(responseParam.getAccountId()));
Map<String, FilterConfig> filters = this.getFilters();
if (filters.isEmpty()) {
Expand All @@ -164,7 +197,7 @@ private void processRecord(ConsumerRecord<String, String> record) {
System.out.println("Total number of filters: " + filters.size());

String message = responseParam.getOrig();
RawApi rawApi = RawApi.buildFromMessageNew(message);
RawApi rawApi = RawApi.buildFromMessageNew(responseParam);
int apiCollectionId = httpCallParser.createApiCollectionId(responseParam);
responseParam.requestParams.setApiCollectionId(apiCollectionId);
String url = responseParam.getRequestParams().getURL();
Expand Down Expand Up @@ -291,4 +324,57 @@ private void generateAndPushMaliciousEventRequest(
e.printStackTrace();
}
}

public static HttpResponseParams buildHttpResponseParam(HttpResponseParam httpResponseParamProto) {

String apiCollectionIdStr = httpResponseParamProto.getAktoVxlanId();
int apiCollectionId = 0;
if (NumberUtils.isDigits(apiCollectionIdStr)) {
apiCollectionId = NumberUtils.toInt(apiCollectionIdStr, 0);
}

String requestPayload = HttpRequestResponseUtils.rawToJsonString(httpResponseParamProto.getRequestPayload(), null);

Map<String, List<String>> reqHeaders = new HashMap<>();

for (Map.Entry<String, StringList> entry : httpResponseParamProto.getRequestHeadersMap().entrySet()) {
ArrayList<String> list = new ArrayList<>(entry.getValue().getValuesList());
reqHeaders.put(entry.getKey(), list);
}

HttpRequestParams requestParams =
new HttpRequestParams(httpResponseParamProto.getMethod(), httpResponseParamProto.getPath(), httpResponseParamProto.getType(),
reqHeaders, requestPayload, apiCollectionId);

String responsePayload = HttpRequestResponseUtils.rawToJsonString(httpResponseParamProto.getResponsePayload(), null);

String sourceStr = httpResponseParamProto.getSource();
if (sourceStr == null || sourceStr == "") {
sourceStr = HttpResponseParams.Source.OTHER.name();
}

HttpResponseParams.Source source = HttpResponseParams.Source.valueOf(sourceStr);
Map<String, List<String>> respHeaders = new HashMap<>();

for (Map.Entry<String, StringList> entry : httpResponseParamProto.getResponseHeadersMap().entrySet()) {
ArrayList<String> list = new ArrayList<>(entry.getValue().getValuesList());
respHeaders.put(entry.getKey(), list);
}

return new HttpResponseParams(
httpResponseParamProto.getType(),
httpResponseParamProto.getStatusCode(),
httpResponseParamProto.getStatus(),
respHeaders,
responsePayload,
requestParams,
httpResponseParamProto.getTime(),
httpResponseParamProto.getAktoAccountId(),
httpResponseParamProto.getIsPending(),
source,
"",
httpResponseParamProto.getIp(),
httpResponseParamProto.getDestIp(),
httpResponseParamProto.getDirection());
}
}
12 changes: 6 additions & 6 deletions libs/dao/src/main/java/com/akto/dto/OriginalHttpRequest.java
Original file line number Diff line number Diff line change
Expand Up @@ -83,22 +83,22 @@ public void buildFromSampleMessage(String message) {
this.headers = buildHeadersMap(json, "requestHeaders");
}

public void buildFromSampleMessageNew(JSONObject json) {
String rawUrl = (String) json.get("path");
public void buildFromSampleMessageNew(HttpResponseParams responseParam) {
String rawUrl = responseParam.getRequestParams().getURL();
String[] rawUrlArr = rawUrl.split("\\?");
this.url = rawUrlArr[0];
if (rawUrlArr.length > 1) {
this.queryParams = rawUrlArr[1];
}

this.type = (String) json.get("type");
this.type = responseParam.getRequestParams().type;

this.method = (String) json.get("method");
this.method = responseParam.getRequestParams().getMethod();

String requestPayload = (String) json.get("requestPayload");
String requestPayload = responseParam.getRequestParams().getPayload();
this.body = requestPayload.trim();

this.headers = buildHeadersMap(json, "requestHeaders");
this.headers = responseParam.getRequestParams().getHeaders();
}

public String getJsonRequestBody() {
Expand Down
8 changes: 4 additions & 4 deletions libs/dao/src/main/java/com/akto/dto/OriginalHttpResponse.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,11 @@ public void buildFromSampleMessage(String message) {
this.statusCode = Integer.parseInt(json.get("statusCode").toString());
}

public void buildFromSampleMessageNew(JSONObject json) {
String responsePayload = (String) json.get("responsePayload");
public void buildFromSampleMessageNew(HttpResponseParams responseParam) {
String responsePayload = responseParam.getPayload();
this.body = responsePayload != null ? responsePayload.trim() : null;
this.headers = OriginalHttpRequest.buildHeadersMap(json, "responseHeaders");
this.statusCode = Integer.parseInt(json.get("statusCode").toString());
this.headers = responseParam.getHeaders();
this.statusCode = responseParam.getStatusCode();
}

public void addHeaderFromLine(String line) {
Expand Down
9 changes: 4 additions & 5 deletions libs/dao/src/main/java/com/akto/dto/RawApi.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,14 @@ public static RawApi buildFromMessage(String message) {
return new RawApi(request, response, message);
}

public static RawApi buildFromMessageNew(String message) {
JSONObject json = JSON.parseObject(message);
public static RawApi buildFromMessageNew(HttpResponseParams responseParam) {
OriginalHttpRequest request = new OriginalHttpRequest();
request.buildFromSampleMessageNew(json);
request.buildFromSampleMessageNew(responseParam);

OriginalHttpResponse response = new OriginalHttpResponse();
response.buildFromSampleMessageNew(json);
response.buildFromSampleMessageNew(responseParam);

return new RawApi(request, response, message);
return new RawApi(request, response, "");
}

public BasicDBObject fetchReqPayload() {
Expand Down
Loading

0 comments on commit 0d2bb58

Please sign in to comment.