Skip to content

Commit

Permalink
refactored dashboard proto messages
Browse files Browse the repository at this point in the history
  • Loading branch information
ag060 committed Dec 12, 2024
1 parent 993bd9d commit 0c5975e
Show file tree
Hide file tree
Showing 17 changed files with 145 additions and 87 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,26 @@
import com.akto.dto.type.URLMethods;
import com.akto.dto.type.URLMethods.Method;

public class DashboardMaliciousRequest {
public class DashboardMaliciousEvent {
private String id;
private String actor;
private String filter_id;
private String url;
private URLMethods.Method method;
private int apiCollectionId;
private String ip;
private String country;
private long timestamp;

public DashboardMaliciousRequest() {}
public DashboardMaliciousEvent() {}

public DashboardMaliciousRequest(
public DashboardMaliciousEvent(
String id,
String actor,
String filter,
String url,
Method method,
int apiCollectionId,
String ip,
String country,
long timestamp) {
Expand All @@ -29,6 +31,7 @@ public DashboardMaliciousRequest(
this.filter_id = filter;
this.url = url;
this.method = method;
this.apiCollectionId = apiCollectionId;
this.ip = ip;
this.country = country;
this.timestamp = timestamp;
Expand Down Expand Up @@ -97,4 +100,12 @@ public long getTimestamp() {
public void setTimestamp(long timestamp) {
this.timestamp = timestamp;
}

public int getApiCollectionId() {
return apiCollectionId;
}

public void setApiCollectionId(int apiCollectionId) {
this.apiCollectionId = apiCollectionId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@
import com.akto.dto.traffic.SuspectSampleData;
import com.akto.dto.type.URLMethods;
import com.akto.grpc.AuthToken;
import com.akto.proto.threat_protection.message.malicious_event.dashboard.v1.DashboardMaliciousEventMessage;
import com.akto.proto.threat_protection.service.dashboard_service.v1.DashboardServiceGrpc;
import com.akto.proto.threat_protection.service.dashboard_service.v1.DashboardServiceGrpc.DashboardServiceBlockingStub;
import com.akto.proto.threat_protection.service.dashboard_service.v1.FetchAlertFiltersRequest;
import com.akto.proto.threat_protection.service.dashboard_service.v1.FetchAlertFiltersResponse;
import com.akto.proto.threat_protection.service.dashboard_service.v1.ListMaliciousRequestsRequest;
import com.akto.proto.threat_protection.service.dashboard_service.v1.MaliciousRequest;
import io.grpc.Grpc;
import io.grpc.InsecureChannelCredentials;
import io.grpc.ManagedChannel;
Expand All @@ -20,7 +20,7 @@
public class SuspectSampleDataAction extends UserAction {

List<SuspectSampleData> sampleData;
List<DashboardMaliciousRequest> maliciousRequests;
List<DashboardMaliciousEvent> maliciousRequests;
int skip;
static final int LIMIT = 50;
List<String> ips;
Expand All @@ -45,25 +45,26 @@ public SuspectSampleDataAction() {
}

public String fetchSampleDataV2() {
List<MaliciousRequest> maliciousRequests =
List<DashboardMaliciousEventMessage> maliciousRequests =
this.dsServiceStub
.listMaliciousRequests(
ListMaliciousRequestsRequest.newBuilder().setPage(0).setLimit(500).build())
.getMaliciousRequestsList();
.getMaliciousEventsList();

this.maliciousRequests =
maliciousRequests.stream()
.map(
mr ->
new DashboardMaliciousRequest(
new DashboardMaliciousEvent(
mr.getId(),
mr.getActor(),
mr.getFilterId(),
mr.getUrl(),
mr.getEndpoint(),
URLMethods.Method.fromString(mr.getMethod()),
mr.getApiCollectionId(),
mr.getIp(),
mr.getCountry(),
mr.getTimestamp()))
mr.getDetectedAt()))
.collect(Collectors.toList());

return SUCCESS.toUpperCase();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@

import com.akto.dao.context.Context;
import com.akto.kafka.KafkaConfig;
import com.akto.proto.threat_protection.message.malicious_event.v1.MaliciousEvent;
import com.akto.proto.threat_protection.message.malicious_event.event_type.v1.EventType;
import com.akto.proto.threat_protection.message.malicious_event.v1.MaliciousEventMessage;
import com.akto.proto.threat_protection.message.sample_request.v1.SampleMaliciousRequest;
import com.akto.threat.detection.actor.SourceIPActorGenerator;
import com.akto.threat.detection.cache.RedisBackedCounterCache;
Expand Down Expand Up @@ -202,11 +203,7 @@ private void processRecord(ConsumerRecord<String, String> record) {

if (!isAggFilter) {
generateAndPushMaliciousEventRequest(
apiFilter,
actor,
responseParam,
maliciousReq,
MaliciousEvent.EventType.EVENT_TYPE_SINGLE);
apiFilter, actor, responseParam, maliciousReq, EventType.EVENT_TYPE_SINGLE);
return;
}

Expand All @@ -221,7 +218,7 @@ private void processRecord(ConsumerRecord<String, String> record) {
actor,
responseParam,
maliciousReq,
MaliciousEvent.EventType.EVENT_TYPE_AGGREGATED);
EventType.EVENT_TYPE_AGGREGATED);
}
}
});
Expand Down Expand Up @@ -250,9 +247,9 @@ private void generateAndPushMaliciousEventRequest(
String actor,
HttpResponseParams responseParam,
SampleMaliciousRequest maliciousReq,
MaliciousEvent.EventType eventType) {
MaliciousEvent maliciousEvent =
MaliciousEvent.newBuilder()
EventType eventType) {
MaliciousEventMessage maliciousEvent =
MaliciousEventMessage.newBuilder()
.setFilterId(apiFilter.getId())
.setActor(actor)
.setDetectedAt(responseParam.getTime())
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
package com.akto.threat.detection.tasks;

import com.akto.kafka.KafkaConfig;
import com.akto.proto.threat_protection.message.malicious_event.v1.MaliciousEvent;
import com.akto.proto.threat_protection.message.malicious_event.event_type.v1.EventType;
import com.akto.proto.threat_protection.message.malicious_event.v1.MaliciousEventMessage;
import com.akto.proto.threat_protection.message.sample_request.v1.SampleMaliciousRequest;
import com.akto.proto.threat_protection.service.malicious_alert_service.v1.MaliciousEventServiceGrpc;
import com.akto.proto.threat_protection.service.malicious_alert_service.v1.RecordMaliciousEventRequest;
Expand Down Expand Up @@ -81,7 +82,7 @@ protected void processRecords(ConsumerRecords<String, String> records) {
records.forEach(
r -> {
String message = r.value();
MaliciousEvent.Builder builder = MaliciousEvent.newBuilder();
MaliciousEventMessage.Builder builder = MaliciousEventMessage.newBuilder();
MessageEnvelope m = MessageEnvelope.unmarshal(message).orElse(null);
if (m == null) {
return;
Expand All @@ -94,13 +95,13 @@ protected void processRecords(ConsumerRecords<String, String> records) {
return;
}

MaliciousEvent evt = builder.build();
MaliciousEventMessage evt = builder.build();

// Get sample data from postgres for this alert
try {
RecordMaliciousEventRequest.Builder reqBuilder =
RecordMaliciousEventRequest.newBuilder().setMaliciousEvent(evt);
if (MaliciousEvent.EventType.EVENT_TYPE_AGGREGATED.equals(evt.getEventType())) {
if (EventType.EVENT_TYPE_AGGREGATED.equals(evt.getEventType())) {
List<MaliciousEventEntity> sampleData =
this.getSampleMaliciousRequests(evt.getActor(), evt.getFilterId());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,7 @@ public static void main(String[] args) throws Exception {
KafkaProducerConfig.newBuilder().setBatchSize(100).setLingerMs(1000).build())
.build();

new FlushMessagesToDB(
internalKafkaConfig, "akto.threat_protection.flush_events_db", threatProtectionMongo)
.run();
new FlushMessagesToDB(internalKafkaConfig, threatProtectionMongo).run();

int port =
Integer.parseInt(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package com.akto.threat.protection.constants;

public class KafkaTopic {
public static class ThreatDetection {
public static final String INTERNAL_DB_MESSAGES = "akto.threat_detection.internal_db_messages";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package com.akto.threat.protection.constants;

public class MongoDBCollection {
public static class ThreatDetection {
public static final String MALICIOUS_EVENTS = "malicious_events";
public static final String AGGREGATE_SAMPLE_MALICIOUS_REQUESTS =
"aggregate_sample_malicious_requests";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ public class MaliciousEventModel {
private String id;
private String filterId;
private String actor;
private String ip;
private String latestIp;
private String latestApiEndpoint;
private String country;
private URLMethods.Method latestApiMethod;
Expand All @@ -29,7 +29,7 @@ public MaliciousEventModel(Builder builder) {
this.id = UUID.randomUUID().toString();
this.filterId = builder.filterId;
this.actor = builder.actor;
this.ip = builder.ip;
this.latestIp = builder.ip;
this.country = builder.country;
this.latestApiEndpoint = builder.latestApiEndpoint;
this.latestApiMethod = builder.latestApiMethod;
Expand Down Expand Up @@ -118,8 +118,8 @@ public String getActor() {
return actor;
}

public String getIp() {
return ip;
public String getLatestIp() {
return latestIp;
}

public String getLatestApiEndpoint() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package com.akto.threat.protection.service;

import com.akto.proto.threat_protection.message.malicious_event.dashboard.v1.DashboardMaliciousEventMessage;
import com.akto.proto.threat_protection.service.dashboard_service.v1.DashboardServiceGrpc.DashboardServiceImplBase;
import com.akto.proto.threat_protection.service.dashboard_service.v1.FetchAlertFiltersRequest;
import com.akto.proto.threat_protection.service.dashboard_service.v1.FetchAlertFiltersResponse;
import com.akto.proto.threat_protection.service.dashboard_service.v1.ListMaliciousRequestsRequest;
import com.akto.proto.threat_protection.service.dashboard_service.v1.ListMaliciousRequestsResponse;
import com.akto.proto.threat_protection.service.dashboard_service.v1.MaliciousRequest;
import com.akto.threat.protection.constants.MongoDBCollection;
import com.akto.threat.protection.db.AggregateSampleMaliciousEventModel;
import com.akto.threat.protection.db.MaliciousEventModel;
import com.akto.threat.protection.interceptors.Constants;
import com.mongodb.BasicDBObject;
import com.mongodb.client.DistinctIterable;
Expand Down Expand Up @@ -73,33 +75,38 @@ public void listMaliciousRequests(
int limit = request.getLimit();
int skip = (page - 1) * limit;

MongoCollection<AggregateSampleMaliciousEventModel> coll =
MongoCollection<MaliciousEventModel> coll =
this.mongoClient
.getDatabase(accountId + "")
.getCollection("malicious_events", AggregateSampleMaliciousEventModel.class);
.getCollection(
MongoDBCollection.ThreatDetection.MALICIOUS_EVENTS, MaliciousEventModel.class);

BasicDBObject query = new BasicDBObject();
try (MongoCursor<AggregateSampleMaliciousEventModel> cursor =
try (MongoCursor<MaliciousEventModel> cursor =
coll.find(query).skip(skip).limit(limit).cursor()) {
List<MaliciousRequest> alerts = new ArrayList<>();
List<DashboardMaliciousEventMessage> maliciousEvents = new ArrayList<>();
while (cursor.hasNext()) {
AggregateSampleMaliciousEventModel evt = cursor.next();
alerts.add(
MaliciousRequest.newBuilder()
MaliciousEventModel evt = cursor.next();
maliciousEvents.add(
DashboardMaliciousEventMessage.newBuilder()
.setActor(evt.getActor())
.setFilterId(evt.getFilterId())
.setFilterId(evt.getFilterId())
.setId(evt.getId())
.setIp(evt.getIp())
.setIp(evt.getLatestIp())
.setCountry(evt.getCountry())
.setOrig(evt.getOrig())
.setUrl(evt.getUrl())
.setMethod(evt.getMethod().name())
.setTimestamp(evt.getRequestTime())
.setPayload(evt.getLatestApiOrig())
.setEndpoint(evt.getLatestApiEndpoint())
.setMethod(evt.getLatestApiMethod().name())
.setDetectedAt(evt.getDetectedAt())
.build());
}
ListMaliciousRequestsResponse response =
ListMaliciousRequestsResponse.newBuilder().setPage(page).setTotal(alerts.size()).build();
ListMaliciousRequestsResponse.newBuilder()
.setPage(page)
.setTotal(maliciousEvents.size())
.addAllMaliciousEvents(maliciousEvents)
.build();
responseObserver.onNext(response);
responseObserver.onCompleted();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,24 @@
import com.akto.dto.type.URLMethods;
import com.akto.kafka.Kafka;
import com.akto.kafka.KafkaConfig;
import com.akto.proto.threat_protection.message.malicious_event.v1.MaliciousEvent;
import com.akto.proto.threat_protection.message.malicious_event.event_type.v1.EventType;
import com.akto.proto.threat_protection.message.malicious_event.v1.MaliciousEventMessage;
import com.akto.proto.threat_protection.message.sample_request.v1.SampleMaliciousRequest;
import com.akto.proto.threat_protection.service.malicious_alert_service.v1.MaliciousEventServiceGrpc;
import com.akto.proto.threat_protection.service.malicious_alert_service.v1.RecordMaliciousEventRequest;
import com.akto.proto.threat_protection.service.malicious_alert_service.v1.RecordMaliciousEventResponse;
import com.akto.threat.protection.BackendServer;
import com.akto.threat.protection.constants.KafkaTopic;
import com.akto.threat.protection.constants.MongoDBCollection;
import com.akto.threat.protection.db.AggregateSampleMaliciousEventModel;
import com.akto.threat.protection.db.MaliciousEventModel;
import com.akto.threat.protection.interceptors.Constants;
import com.akto.threat.protection.utils.KafkaUtils;
import com.mongodb.BasicDBObject;
import com.mongodb.client.model.WriteModel;
import io.grpc.stub.StreamObserver;

public class MaliciousEventService extends MaliciousEventServiceGrpc.MaliciousEventServiceImplBase {

private final Kafka kafka;
private static final String kafkaTopic = "akto.threat_protection.flush_events_db";

public MaliciousEventService(KafkaConfig kafkaConfig) {
this.kafka = new Kafka(kafkaConfig);
Expand All @@ -34,16 +34,16 @@ public void recordMaliciousEvent(
RecordMaliciousEventRequest request,
StreamObserver<RecordMaliciousEventResponse> responseObserver) {

MaliciousEvent evt = request.getMaliciousEvent();
MaliciousEventMessage evt = request.getMaliciousEvent();
String actor = evt.getActor();
String filterId = evt.getFilterId();
List<WriteModel<AggregateSampleMaliciousEventModel>> bulkUpdates = new ArrayList<>();
int accountId = Constants.ACCOUNT_ID_CONTEXT_KEY.get();

MaliciousEvent.EventType eventType = evt.getEventType();
EventType eventType = evt.getEventType();

MaliciousEventModel.EventType maliciousEventType =
MaliciousEvent.EventType.EVENT_TYPE_AGGREGATED.equals(eventType)
EventType.EVENT_TYPE_AGGREGATED.equals(eventType)
? MaliciousEventModel.EventType.AGGREGATED
: MaliciousEventModel.EventType.SINGLE;

Expand Down Expand Up @@ -75,11 +75,18 @@ public void recordMaliciousEvent(
.build());
}

this.kafka.send(KafkaUtils.generateMsg(events, "maliciousEvents", accountId), kafkaTopic);
this.kafka.send(
KafkaUtils.generateMsg(
events,
MongoDBCollection.ThreatDetection.AGGREGATE_SAMPLE_MALICIOUS_REQUESTS,
accountId),
KafkaTopic.ThreatDetection.INTERNAL_DB_MESSAGES);
}

this.kafka.send(
KafkaUtils.generateMsg(maliciousEventModel, "smartEvent", accountId), kafkaTopic);
KafkaUtils.generateMsg(
maliciousEventModel, MongoDBCollection.ThreatDetection.MALICIOUS_EVENTS, accountId),
KafkaTopic.ThreatDetection.INTERNAL_DB_MESSAGES);

responseObserver.onNext(RecordMaliciousEventResponse.newBuilder().build());
responseObserver.onCompleted();
Expand Down
Loading

0 comments on commit 0c5975e

Please sign in to comment.