diff --git a/apps/threat-detection/src/main/java/com/akto/threat/detection/Main.java b/apps/threat-detection/src/main/java/com/akto/threat/detection/Main.java index b59fd2b897..c11ed062a1 100644 --- a/apps/threat-detection/src/main/java/com/akto/threat/detection/Main.java +++ b/apps/threat-detection/src/main/java/com/akto/threat/detection/Main.java @@ -5,6 +5,7 @@ import com.akto.threat.detection.config.kafka.KafkaConsumerConfig; import com.akto.threat.detection.config.kafka.KafkaProducerConfig; import com.akto.threat.detection.constants.KafkaTopic; +import com.akto.threat.detection.tasks.CleanupTask; import com.akto.threat.detection.tasks.FlushSampleDataTask; import com.akto.threat.detection.tasks.MaliciousTrafficDetectorTask; import com.akto.threat.detection.tasks.SendAlertsToBackend; @@ -54,6 +55,7 @@ public static void main(String[] args) throws Exception { new MaliciousTrafficDetectorTask(trafficKafka, internalKafka, createRedisClient()).run(); new FlushSampleDataTask(postgres, internalKafka, KafkaTopic.ThreatDetection.MALICIOUS_EVENTS).run(); new SendAlertsToBackend(postgres, internalKafka, KafkaTopic.ThreatDetection.ALERTS).run(); + new CleanupTask(postgres).run(); } public static RedisClient createRedisClient() { diff --git a/apps/threat-detection/src/main/java/com/akto/threat/detection/db/malicious_event/MaliciousEventDao.java b/apps/threat-detection/src/main/java/com/akto/threat/detection/db/malicious_event/MaliciousEventDao.java index 906c0785a4..2c88d3d25a 100644 --- a/apps/threat-detection/src/main/java/com/akto/threat/detection/db/malicious_event/MaliciousEventDao.java +++ b/apps/threat-detection/src/main/java/com/akto/threat/detection/db/malicious_event/MaliciousEventDao.java @@ -6,6 +6,7 @@ import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; +import java.time.LocalDate; import java.util.ArrayList; import java.util.List; @@ -88,4 +89,11 @@ public int countTotalMaliciousEventGivenActorAndFilterId(String actor, String fi } return 0; } + + public void deleteEventsBefore(LocalDate date) throws SQLException { + String sql = "DELETE FROM threat_detection.malicious_event WHERE created_at < ?"; + PreparedStatement stmt = this.conn.prepareStatement(sql); + stmt.setDate(1, java.sql.Date.valueOf(date)); + stmt.executeUpdate(); + } } diff --git a/apps/threat-detection/src/main/java/com/akto/threat/detection/tasks/CleanupTask.java b/apps/threat-detection/src/main/java/com/akto/threat/detection/tasks/CleanupTask.java new file mode 100644 index 0000000000..2b30eaf8ad --- /dev/null +++ b/apps/threat-detection/src/main/java/com/akto/threat/detection/tasks/CleanupTask.java @@ -0,0 +1,35 @@ +package com.akto.threat.detection.tasks; + +import java.sql.Connection; +import java.sql.SQLException; +import java.time.LocalDate; +import java.time.ZoneOffset; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import com.akto.threat.detection.db.malicious_event.MaliciousEventDao; + +public class CleanupTask implements Task { + + private final MaliciousEventDao maliciousEventDao; + private final ScheduledExecutorService cronExecutorService = Executors.newScheduledThreadPool(1); + + public CleanupTask(Connection conn) { + this.maliciousEventDao = new MaliciousEventDao(conn); + } + + @Override + public void run() { + this.cronExecutorService.scheduleAtFixedRate(this::cleanup, 5, 10 * 60, TimeUnit.SECONDS); + } + + private void cleanup() { + // Delete all records older than 7 days + try { + this.maliciousEventDao.deleteEventsBefore(LocalDate.now(ZoneOffset.UTC).minusDays(7)); + } catch (SQLException e) { + e.printStackTrace(); + } + } +} diff --git a/apps/threat-detection/src/main/resources/db/migration/V2__create_malicious_event_table.sql b/apps/threat-detection/src/main/resources/db/migration/V2__create_malicious_event_table.sql index a860ee20b1..a346d0bab5 100644 --- a/apps/threat-detection/src/main/resources/db/migration/V2__create_malicious_event_table.sql +++ b/apps/threat-detection/src/main/resources/db/migration/V2__create_malicious_event_table.sql @@ -1,6 +1,6 @@ -- create schema and table for malicious events create schema if not exists threat_detection; -create table if not exists threat_detection.malicious_events ( +create table if not exists threat_detection.malicious_event ( id uuid primary key default uuid_generate_v4(), actor varchar(255) not null, filter_id varchar(255) not null, @@ -13,4 +13,4 @@ create table if not exists threat_detection.malicious_events ( ); -- add index on actor and filter_id and sort data by timestamp -create index malicious_events_actor_filter_id_timestamp_idx on threat_detection.malicious_events(actor, filter_id, timestamp desc); +create index malicious_events_actor_filter_id_timestamp_idx on threat_detection.malicious_event(actor, filter_id, timestamp desc);