From 56ce22adca7d884a991ac0251e3c12df42a586a0 Mon Sep 17 00:00:00 2001
From: Ajinkya <109141486+ag060@users.noreply.github.com>
Date: Thu, 16 Jan 2025 12:05:21 +0530
Subject: [PATCH] using local redis instead of centralized redis (#1979)
Earlier we were using local caffine cache and syncing the changes to
centralized redis.
Now we are using local redis only
Assumption is now that we get all the data for particular user key on
one machine.
This will be handled by producer
---
apps/threat-detection/pom.xml | 5 -
.../java/com/akto/threat/detection/Main.java | 8 +-
.../cache/RedisBackedCounterCache.java | 126 ------------------
.../detection/cache/RedisCounterCache.java | 64 +++++++++
.../tasks/MaliciousTrafficDetectorTask.java | 6 +-
5 files changed, 72 insertions(+), 137 deletions(-)
delete mode 100644 apps/threat-detection/src/main/java/com/akto/threat/detection/cache/RedisBackedCounterCache.java
create mode 100644 apps/threat-detection/src/main/java/com/akto/threat/detection/cache/RedisCounterCache.java
diff --git a/apps/threat-detection/pom.xml b/apps/threat-detection/pom.xml
index bd143c2dc5..e5e5cd3e2c 100644
--- a/apps/threat-detection/pom.xml
+++ b/apps/threat-detection/pom.xml
@@ -95,11 +95,6 @@
6.4.0.RELEASE
-
- com.github.ben-manes.caffeine
- caffeine
- 2.9.3
-
com.fasterxml.jackson.core
jackson-databind
diff --git a/apps/threat-detection/src/main/java/com/akto/threat/detection/Main.java b/apps/threat-detection/src/main/java/com/akto/threat/detection/Main.java
index 0ea77e9470..888b2eba3b 100644
--- a/apps/threat-detection/src/main/java/com/akto/threat/detection/Main.java
+++ b/apps/threat-detection/src/main/java/com/akto/threat/detection/Main.java
@@ -57,7 +57,9 @@ public static void main(String[] args) {
.setValueSerializer(Serializer.BYTE_ARRAY)
.build();
- new MaliciousTrafficDetectorTask(trafficKafka, internalKafka, createRedisClient()).run();
+ RedisClient localRedis = createLocalRedisClient();
+
+ new MaliciousTrafficDetectorTask(trafficKafka, internalKafka, localRedis).run();
new FlushSampleDataTask(
sessionFactory, internalKafka, KafkaTopic.ThreatDetection.MALICIOUS_EVENTS)
.run();
@@ -67,8 +69,8 @@ public static void main(String[] args) {
new CleanupTask(sessionFactory).run();
}
- public static RedisClient createRedisClient() {
- return RedisClient.create(System.getenv("AKTO_THREAT_DETECTION_REDIS_URI"));
+ public static RedisClient createLocalRedisClient() {
+ return RedisClient.create(System.getenv("AKTO_THREAT_DETECTION_LOCAL_REDIS_URI"));
}
public static void runMigrations() {
diff --git a/apps/threat-detection/src/main/java/com/akto/threat/detection/cache/RedisBackedCounterCache.java b/apps/threat-detection/src/main/java/com/akto/threat/detection/cache/RedisBackedCounterCache.java
deleted file mode 100644
index 2182be09c6..0000000000
--- a/apps/threat-detection/src/main/java/com/akto/threat/detection/cache/RedisBackedCounterCache.java
+++ /dev/null
@@ -1,126 +0,0 @@
-package com.akto.threat.detection.cache;
-
-import com.github.benmanes.caffeine.cache.Cache;
-import com.github.benmanes.caffeine.cache.Caffeine;
-import io.lettuce.core.ExpireArgs;
-import io.lettuce.core.RedisClient;
-import io.lettuce.core.api.StatefulRedisConnection;
-import java.util.HashSet;
-import java.util.Optional;
-import java.util.Set;
-import java.util.concurrent.*;
-
-public class RedisBackedCounterCache implements CounterCache {
-
- static class Op {
- private final String key;
- private final long value;
-
- public Op(String key, long value) {
- this.key = key;
- this.value = value;
- }
-
- public String getKey() {
- return key;
- }
-
- public long getValue() {
- return value;
- }
- }
-
- private final StatefulRedisConnection redis;
-
- private final Cache localCache;
-
- private final ConcurrentLinkedQueue pendingIncOps;
- private final ConcurrentMap deletedKeys;
- private final String prefix;
-
- public RedisBackedCounterCache(RedisClient redisClient, String prefix) {
- this.prefix = prefix;
- this.redis = redisClient.connect(new LongValueCodec());
- this.localCache = Caffeine.newBuilder().maximumSize(100000).expireAfterWrite(3, TimeUnit.HOURS).build();
-
- ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
- executor.scheduleAtFixedRate(this::syncToRedis, 60, 5, TimeUnit.SECONDS);
-
- this.pendingIncOps = new ConcurrentLinkedQueue<>();
- this.deletedKeys = new ConcurrentHashMap<>();
- }
-
- private String addPrefixToKey(String key) {
- return new StringBuilder().append(prefix).append("|").append(key).toString();
- }
-
- @Override
- public void increment(String key) {
- incrementBy(key, 1);
- }
-
- @Override
- public void incrementBy(String key, long val) {
- String _key = addPrefixToKey(key);
- localCache.asMap().merge(_key, val, Long::sum);
- pendingIncOps.add(new Op(_key, val));
- }
-
- @Override
- public long get(String key) {
- return Optional.ofNullable(this.localCache.getIfPresent(addPrefixToKey(key))).orElse(0L);
- }
-
- @Override
- public boolean exists(String key) {
- return localCache.asMap().containsKey(addPrefixToKey(key));
- }
-
- @Override
- public void clear(String key) {
- String _key = addPrefixToKey(key);
- localCache.invalidate(_key);
- this.deletedKeys.put(_key, true);
- redis.async().del(_key);
- }
-
- private void setExpiryIfNotSet(String key, long seconds) {
- // We only set expiry for redis entry. For local cache we have lower expiry for
- // all entries.
- ExpireArgs args = ExpireArgs.Builder.nx();
- redis.async().expire(addPrefixToKey(key), seconds, args);
- }
-
- private void syncToRedis() {
- Set _keys = new HashSet<>();
- while (!pendingIncOps.isEmpty()) {
- Op op = pendingIncOps.poll();
- String key = op.getKey();
- long val = op.getValue();
-
- if (this.deletedKeys.containsKey(key)) {
- continue;
- }
-
- redis
- .async()
- .incrby(key, val)
- .whenComplete(
- (result, ex) -> {
- if (ex != null) {
- ex.printStackTrace();
- }
-
- _keys.add(key);
-
- if (result != null) {
- localCache.asMap().put(key, result);
- }
- });
- }
-
- _keys.forEach(key -> setExpiryIfNotSet(key, 3 * 60 * 60));
-
- this.deletedKeys.clear();
- }
-}
diff --git a/apps/threat-detection/src/main/java/com/akto/threat/detection/cache/RedisCounterCache.java b/apps/threat-detection/src/main/java/com/akto/threat/detection/cache/RedisCounterCache.java
new file mode 100644
index 0000000000..63d20312d8
--- /dev/null
+++ b/apps/threat-detection/src/main/java/com/akto/threat/detection/cache/RedisCounterCache.java
@@ -0,0 +1,64 @@
+package com.akto.threat.detection.cache;
+
+import io.lettuce.core.RedisClient;
+import io.lettuce.core.api.StatefulRedisConnection;
+
+public class RedisCounterCache implements CounterCache {
+
+ static class Op {
+ private final String key;
+ private final long value;
+
+ public Op(String key, long value) {
+ this.key = key;
+ this.value = value;
+ }
+
+ public String getKey() {
+ return key;
+ }
+
+ public long getValue() {
+ return value;
+ }
+ }
+
+ private final StatefulRedisConnection redis;
+
+ private final String prefix;
+
+ public RedisCounterCache(RedisClient redisClient, String prefix) {
+ this.prefix = prefix;
+ this.redis = redisClient.connect(new LongValueCodec());
+ }
+
+ private String addPrefixToKey(String key) {
+ return new StringBuilder().append(prefix).append("|").append(key).toString();
+ }
+
+ @Override
+ public void increment(String key) {
+ incrementBy(key, 1);
+ }
+
+ @Override
+ public void incrementBy(String key, long val) {
+ redis.async().incrby(addPrefixToKey(key), val);
+ }
+
+ @Override
+ public long get(String key) {
+ return redis.sync().get(addPrefixToKey(key));
+ }
+
+ @Override
+ public boolean exists(String key) {
+ return redis.sync().exists(addPrefixToKey(key)) > 0;
+ }
+
+ @Override
+ public void clear(String key) {
+ redis.async().del(addPrefixToKey(key));
+ }
+
+}
diff --git a/apps/threat-detection/src/main/java/com/akto/threat/detection/tasks/MaliciousTrafficDetectorTask.java b/apps/threat-detection/src/main/java/com/akto/threat/detection/tasks/MaliciousTrafficDetectorTask.java
index 37672bd831..8905338ab1 100644
--- a/apps/threat-detection/src/main/java/com/akto/threat/detection/tasks/MaliciousTrafficDetectorTask.java
+++ b/apps/threat-detection/src/main/java/com/akto/threat/detection/tasks/MaliciousTrafficDetectorTask.java
@@ -26,7 +26,7 @@
import com.akto.test_editor.execution.VariableResolver;
import com.akto.test_editor.filter.data_operands_impl.ValidationResult;
import com.akto.threat.detection.actor.SourceIPActorGenerator;
-import com.akto.threat.detection.cache.RedisBackedCounterCache;
+import com.akto.threat.detection.cache.RedisCounterCache;
import com.akto.threat.detection.constants.KafkaTopic;
import com.akto.threat.detection.kafka.KafkaProtoProducer;
import com.akto.threat.detection.smart_event_detector.window_based.WindowBasedThresholdNotifier;
@@ -82,14 +82,14 @@ public MaliciousTrafficDetectorTask(
this.windowBasedThresholdNotifier =
new WindowBasedThresholdNotifier(
- new RedisBackedCounterCache(redisClient, "wbt"),
+ new RedisCounterCache(redisClient, "wbt"),
new WindowBasedThresholdNotifier.Config(100, 10 * 60));
this.internalKafka = new KafkaProtoProducer(internalConfig);
}
public void run() {
- this.kafkaConsumer.subscribe(Collections.singletonList("akto.api.logs"));
+ this.kafkaConsumer.subscribe(Collections.singletonList("akto.api.logs2"));
ExecutorService pollingExecutor = Executors.newSingleThreadExecutor();
pollingExecutor.execute(
() -> {