Skip to content

Commit

Permalink
refactored code
Browse files Browse the repository at this point in the history
  • Loading branch information
ag060 committed Nov 29, 2024
1 parent 3605d1b commit 2c60412
Show file tree
Hide file tree
Showing 31 changed files with 189 additions and 5,742 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
**/.settings
**/dist
**/gen
libs/protobuf/src/main/java/com/akto/proto
libs/protobuf/src
libawesome.dylib
temp_*
*.templates-config.json
Expand Down
10 changes: 7 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
proto-gen:
buf lint protobuf && \
rm -rf ./libs/protobuf/src/main/java/com/akto && \
buf generate protobuf
sh ./scripts/proto-gen.sh

build: proto-gen
mvn install -DskipTests

build-clean: proto-gen
mvn clean install -DskipTests
266 changes: 95 additions & 171 deletions apps/dashboard/src/main/java/com/akto/utils/jobs/CleanInventory.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package com.akto.hybrid_parsers;

import com.akto.dao.ApiCollectionsDao;
import com.akto.dao.billing.OrganizationsDao;
import com.akto.dao.context.Context;
import com.akto.dao.traffic_metrics.TrafficMetricsDao;
import com.akto.hybrid_dependency.DependencyAnalyser;
Expand Down Expand Up @@ -29,7 +27,6 @@
import com.mongodb.client.model.*;
import okhttp3.*;
import org.apache.commons.lang3.math.NumberUtils;
import org.bson.conversions.Bson;
import com.alibaba.fastjson2.*;

import java.io.IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.akto.threat.detection.config.kafka.KafkaConfig;
import com.akto.threat.detection.config.kafka.KafkaConsumerConfig;
import com.akto.threat.detection.config.kafka.KafkaProducerConfig;
import com.akto.threat.detection.constants.KafkaTopic;
import com.akto.threat.detection.tasks.FlushSampleDataTask;
import com.akto.threat.detection.tasks.MaliciousTrafficDetectorTask;
import com.akto.threat.detection.tasks.SendAlertsToBackend;
Expand All @@ -18,13 +19,15 @@

public class Main {

private static final String CONSUMER_GROUP_ID = "akto.threat_detection";

public static void main(String[] args) throws Exception {
runMigrations();

DaoInit.init(new ConnectionString(System.getenv("AKTO_MONGO_CONN")));
KafkaConfig trafficKafka = KafkaConfig.newBuilder()
.setGroupId("akto.threat.detection")
.setBootstrapServers("localhost:29092")
.setGroupId(CONSUMER_GROUP_ID)
.setBootstrapServers(System.getenv("AKTO_TRAFFIC_KAFKA_BOOTSTRAP_SERVER"))
.setConsumerConfig(
KafkaConsumerConfig.newBuilder()
.setMaxPollRecords(100)
Expand All @@ -35,8 +38,8 @@ public static void main(String[] args) throws Exception {
.build();

KafkaConfig internalKafka = KafkaConfig.newBuilder()
.setGroupId("akto.threat.detection")
.setBootstrapServers("localhost:29092")
.setGroupId(CONSUMER_GROUP_ID)
.setBootstrapServers(System.getenv("AKTO_INTERNAL_KAFKA_BOOTSTRAP_SERVER"))
.setConsumerConfig(
KafkaConsumerConfig.newBuilder()
.setMaxPollRecords(100)
Expand All @@ -49,8 +52,8 @@ public static void main(String[] args) throws Exception {
Connection postgres = createPostgresConnection();

new MaliciousTrafficDetectorTask(trafficKafka, internalKafka, createRedisClient()).run();
new FlushSampleDataTask(postgres, internalKafka, "akto.threat_detection.malicious").run();
new SendAlertsToBackend(postgres, internalKafka, "akto.threat_detection.alerts").run();
new FlushSampleDataTask(postgres, internalKafka, KafkaTopic.ThreatDetection.MALICIOUS_EVENTS).run();
new SendAlertsToBackend(postgres, internalKafka, KafkaTopic.ThreatDetection.ALERTS).run();
}

public static RedisClient createRedisClient() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package com.akto.threat.detection.constants;

public class KafkaTopic {
public static class ThreatDetection {
public static final String MALICIOUS_EVENTS = "akto.threat_detection.malicious_events";
public static final String ALERTS = "akto.threat_detection.alerts";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public MaliciousEventDao(Connection conn) {

public void batchInsert(List<MaliciousEventModel> events) throws SQLException {
String sql =
"INSERT INTO threat_detection.malicious_event (id, actor, filter_id, url, method, timestamp, data, ip, country) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)";
"INSERT INTO threat_detection.malicious_event (id, actor, filter_id, url, method, timestamp, orig, ip, country) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)";
conn.setAutoCommit(false);
for (int i = 0; i < events.size(); i++) {
MaliciousEventModel event = events.get(i);
Expand Down Expand Up @@ -64,8 +64,9 @@ public List<MaliciousEventModel> findGivenActorAndFilterId(
.setUrl(rs.getString("url"))
.setMethod(URLMethods.Method.fromString(rs.getString("method")))
.setTimestamp(rs.getLong("timestamp"))
.setOrig(rs.getString("data"))
.setOrig(rs.getString("orig"))
.setIp(rs.getString("ip"))
.setCreatedAt(rs.getDate("created_at").toLocalDate())
.build();
models.add(model);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import com.akto.dto.type.URLMethods;

import java.util.UUID;
import java.time.LocalDate;

public class MaliciousEventModel {

Expand All @@ -17,17 +17,20 @@ public class MaliciousEventModel {
// Geo location data
private String ip;

private LocalDate createdAt;

public MaliciousEventModel() {}

public MaliciousEventModel(Builder builder) {
this.id = builder.id == null ? UUID.randomUUID().toString() : builder.id;
this.id = builder.id;
this.actor = builder.actorId;
this.filterId = builder.filterId;
this.url = builder.url;
this.method = builder.method;
this.timestamp = builder.timestamp;
this.orig = builder.orig;
this.ip = builder.ip;
this.createdAt = builder.createdAt;
}

public static class Builder {
Expand All @@ -39,6 +42,7 @@ public static class Builder {
private long timestamp;
private String orig;
private String ip;
private LocalDate createdAt;

public Builder setId(String id) {
this.id = id;
Expand Down Expand Up @@ -83,6 +87,11 @@ public Builder setIp(String ip) {
public MaliciousEventModel build() {
return new MaliciousEventModel(this);
}

public Builder setCreatedAt(LocalDate createdAt) {
this.createdAt = createdAt;
return this;
}
}

public static Builder newBuilder() {
Expand Down Expand Up @@ -121,6 +130,10 @@ public String getIp() {
return ip;
}

public LocalDate getCreatedAt() {
return createdAt;
}

@Override
public String toString() {
return "MaliciousEventModel{"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.akto.threat.detection.actor.SourceIPActorGenerator;
import com.akto.threat.detection.cache.RedisBackedCounterCache;
import com.akto.threat.detection.config.kafka.KafkaConfig;
import com.akto.threat.detection.constants.KafkaTopic;
import com.akto.dao.monitoring.FilterYamlTemplateDao;
import com.akto.data_actor.DataActor;
import com.akto.data_actor.DataActorFactory;
Expand Down Expand Up @@ -216,7 +217,7 @@ private void processRecord(ConsumerRecord<String, String> record) {
.marshal()
.ifPresent(
data -> {
internalKafka.send(data, "akto.threat_detection.alerts");
internalKafka.send(data, KafkaTopic.ThreatDetection.ALERTS);
});
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
Expand All @@ -236,7 +237,7 @@ private void processRecord(ConsumerRecord<String, String> record) {
.marshal()
.ifPresent(
data -> {
internalKafka.send(data, "akto.threat_detection.malicious");
internalKafka.send(data, KafkaTopic.ThreatDetection.MALICIOUS_EVENTS);
});
});
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package com.akto.threat.detection.tasks;

import com.akto.proto.threat_protection.message.smart_event.v1.SmartEvent;
import com.akto.proto.threat_protection.service.consumer_service.v1.ConsumerServiceGrpc;
import com.akto.proto.threat_protection.service.consumer_service.v1.RecordAlertRequest;
import com.akto.proto.threat_protection.service.consumer_service.v1.RecordAlertResponse;
import com.akto.proto.threat_protection.service.consumer_service.v1.SampleMaliciousEvent;
import com.akto.proto.threat_protection.service.malicious_alert_service.v1.MaliciousAlertServiceGrpc;
import com.akto.proto.threat_protection.service.malicious_alert_service.v1.MaliciousAlertServiceGrpc.MaliciousAlertServiceStub;
import com.akto.proto.threat_protection.service.malicious_alert_service.v1.RecordAlertRequest;
import com.akto.proto.threat_protection.service.malicious_alert_service.v1.RecordAlertResponse;
import com.akto.proto.threat_protection.service.malicious_alert_service.v1.SampleMaliciousEvent;
import com.akto.threat.detection.config.kafka.KafkaConfig;
import com.akto.threat.detection.db.malicious_event.MaliciousEventDao;
import com.akto.threat.detection.db.malicious_event.MaliciousEventModel;
Expand All @@ -31,7 +32,7 @@ public class SendAlertsToBackend extends AbstractKafkaConsumerTask {

private final MaliciousEventDao maliciousEventDao;

private final ConsumerServiceGrpc.ConsumerServiceStub consumerServiceStub;
private final MaliciousAlertServiceStub consumerServiceStub;

public SendAlertsToBackend(Connection conn, KafkaConfig trafficConfig, String topic) {
super(trafficConfig, topic);
Expand All @@ -41,7 +42,7 @@ public SendAlertsToBackend(Connection conn, KafkaConfig trafficConfig, String to
ManagedChannel channel =
Grpc.newChannelBuilder(target, InsecureChannelCredentials.create()).build();
this.consumerServiceStub =
ConsumerServiceGrpc.newStub(channel)
MaliciousAlertServiceGrpc.newStub(channel)
.withCallCredentials(
new AuthToken(System.getenv("AKTO_THREAT_PROTECTION_BACKEND_TOKEN")));
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-- enabling uuid extension
create extension if not exists "uuid-ossp";
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
-- create schema and table for malicious events
create schema if not exists threat_detection;
create table if not exists threat_detection.malicious_events (
id uuid primary key default uuid_generate_v4(),
actor varchar(255) not null,
filter_id varchar(255) not null,
url varchar(1024),
ip varchar(255),
method varchar(255),
timestamp bigint not null,
orig text not null,
created_at timestamp default (timezone('utc', now()))
);

-- add index on actor and filter_id and sort data by timestamp
create index malicious_events_actor_filter_id_timestamp_idx on threat_detection.malicious_events(actor, filter_id, timestamp desc);
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public BackendServer(int port, MongoClient mongoClient) {
this.port = port;
this.server =
ServerBuilder.forPort(port)
.addService(new ConsumerMaliciousEventService(mongoClient))
.addService(new MaliciousAlertService(mongoClient))
.intercept(new AuthenticationInterceptor())
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
import java.util.ArrayList;
import java.util.List;

import com.akto.proto.threat_protection.service.consumer_service.v1.ConsumerServiceGrpc;
import com.akto.proto.threat_protection.service.consumer_service.v1.RecordAlertRequest;
import com.akto.proto.threat_protection.service.consumer_service.v1.RecordAlertResponse;
import com.akto.proto.threat_protection.service.malicious_alert_service.v1.MaliciousAlertServiceGrpc.MaliciousAlertServiceImplBase;
import com.akto.proto.threat_protection.service.malicious_alert_service.v1.RecordAlertRequest;
import com.akto.proto.threat_protection.service.malicious_alert_service.v1.RecordAlertResponse;
import com.akto.threat.protection.db.MaliciousEventModel;
import com.akto.threat.protection.db.SmartEventModel;
import com.akto.threat.protection.interceptors.Constants;
Expand All @@ -16,11 +16,11 @@

import io.grpc.stub.StreamObserver;

public class ConsumerMaliciousEventService extends ConsumerServiceGrpc.ConsumerServiceImplBase {
public class MaliciousAlertService extends MaliciousAlertServiceImplBase {

private final MongoClient mongoClient;

public ConsumerMaliciousEventService(MongoClient mongoClient) {
public MaliciousAlertService(MongoClient mongoClient) {
this.mongoClient = mongoClient;
}

Expand Down
Loading

0 comments on commit 2c60412

Please sign in to comment.