Skip to content

Commit

Permalink
feat: adding required publisher fields
Browse files Browse the repository at this point in the history
Signed-off-by: nkoppisetty <nandita.iitkgp@gmail.com>
  • Loading branch information
nkoppisetty committed Jul 21, 2023
1 parent 6fbff1c commit fe5f48b
Show file tree
Hide file tree
Showing 21 changed files with 241 additions and 288 deletions.
188 changes: 94 additions & 94 deletions udf/anomaly-detection/poetry.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion udf/anomaly-detection/src/_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class DataSource(str, Enum):

@dataclass
class StreamConf:
name: str = "default"
config_id: str = "default"
source: str = DataSource.PROMETHEUS.value
window_size: int = 12
composite_keys: List[str] = field(default_factory=list)

Check failure on line 42 in udf/anomaly-detection/src/_config.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (UP006)

udf/anomaly-detection/src/_config.py:42:21: UP006 Use `list` instead of `List` for type annotation
Expand Down
2 changes: 1 addition & 1 deletion udf/anomaly-detection/src/connectors/sentinel.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ def get_redis_client(
else:
SENTINEL_CLIENT = sentinel.slave_for(mastername)
_LOGGER.info(
"Sentinel redis params: %S, master_node: %s",conn_kwargs,master_node)
"Sentinel redis params: %s, master_node: %s", conn_kwargs, master_node)
return SENTINEL_CLIENT


Expand Down
5 changes: 5 additions & 0 deletions udf/anomaly-detection/src/entities.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ def to_json(self):

@dataclass(repr=False)
class StreamPayload(_BasePayload):
config_id: str
data: Matrix
raw_data: Matrix
metrics: List[str]
Expand Down Expand Up @@ -102,6 +103,8 @@ def to_json(self):

@dataclass
class InputPayload:
uuid: str
config_id: str
start_time: int
end_time: int
data: list[dict[str, Any]]
Expand All @@ -113,6 +116,8 @@ def to_json(self):

@dataclass
class OutputPayload:
uuid: str
config_id: str
timestamp: int
unified_anomaly: float
data: dict[str, Any]
Expand Down
6 changes: 3 additions & 3 deletions udf/anomaly-detection/src/udf/inference.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def _run_inference(
artifact_data: ArtifactData,
) -> np.ndarray:
model = artifact_data.artifact
win_size = ConfigManager.get_stream_config(config_name=keys[0]).window_size
win_size = ConfigManager.get_stream_config(config_id=payload.config_id).window_size
data_arr = payload.get_data()
stream_loader = DataLoader(StreamingDataset(data_arr, win_size))

Expand Down Expand Up @@ -71,8 +71,8 @@ def inference(
return static_response

# Load config
retrain_config = ConfigManager.get_retrain_config(config_name=keys[0])
numalogic_conf = ConfigManager.get_numalogic_config(config_name=keys[0])
retrain_config = ConfigManager.get_retrain_config(config_id=payload.config_id)
numalogic_conf = ConfigManager.get_numalogic_config(config_id=payload.config_id)

# Load inference artifact
try:
Expand Down
19 changes: 13 additions & 6 deletions udf/anomaly-detection/src/udf/postprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
class Postprocess:
@classmethod
def postprocess(cls, keys: list[str], payload: StreamPayload) -> (float, dict):
static_thresh = ConfigManager.get_static_threshold_config(config_name=keys[0])
postprocess_conf = ConfigManager.get_postprocess_config(config_name=keys[0])
static_thresh = ConfigManager.get_static_threshold_config(config_id=payload.config_id)
postprocess_conf = ConfigManager.get_postprocess_config(config_id=payload.config_id)

# Compute static threshold score if header is static inference
metric_arr = payload.get_data()
Expand Down Expand Up @@ -49,13 +49,13 @@ def postprocess(cls, keys: list[str], payload: StreamPayload) -> (float, dict):
for i in range(len(payload.metrics)):
metric_scores[payload.metrics[i]] = final_score[i]

return cls.get_unified_anomaly(keys, final_score.tolist(), payload), metric_scores
return cls.get_unified_anomaly(final_score.tolist(), payload), metric_scores

@classmethod
def get_unified_anomaly(
cls, keys: List[str], scores: list[float], payload: StreamPayload
cls, scores: list[float], payload: StreamPayload
) -> float:
unified_config = ConfigManager.get_stream_config(config_name=keys[0]).unified_config
unified_config = ConfigManager.get_stream_config(config_id=payload.config_id).unified_config
unified_weights = unified_config.weights
if unified_weights:
weighted_anomalies = np.multiply(scores, unified_weights)
Expand Down Expand Up @@ -91,11 +91,18 @@ def run(self, keys: List[str], datum: Datum) -> Messages:
# Perform postprocess
final_score, metric_scores = self.postprocess(keys, payload)

stream_conf = ConfigManager.get_stream_config(config_id=payload.config_id)
metadata = payload.metadata
metadata["composite_keys"] = list(stream_conf.composite_keys)

# Construct output payload
out_payload = OutputPayload(
uuid=payload.uuid,
config_id=payload.config_id,
timestamp=payload.timestamps[-1],
unified_anomaly=final_score,
data=metric_scores,
metadata=payload.metadata,
metadata=metadata,
)

_LOGGER.info("%s - Sending Msg: { Keys: %s, Payload: %s }", payload.uuid, keys, out_payload)
Expand Down
13 changes: 6 additions & 7 deletions udf/anomaly-detection/src/udf/preprocess.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import json
import os
import time
import uuid

import numpy as np
import pandas as pd
Expand Down Expand Up @@ -48,7 +47,7 @@ def get_df(cls, data_payload: dict, features: List[str]) -> (pd.DataFrame, List[
def preprocess(
self, keys: List[str], payload: StreamPayload
) -> (np.ndarray, Status):
preprocess_cfgs = ConfigManager.get_preprocess_config(config_name=keys[0])
preprocess_cfgs = ConfigManager.get_preprocess_config(config_id=keys[0])

local_cache = LocalLRUCache(ttl=LOCAL_CACHE_TTL)
model_registry = RedisRegistry(client=get_redis_client_from_conf(master_node=False), cache_registry=local_cache)
Expand Down Expand Up @@ -95,25 +94,25 @@ def run(self, keys: List[str], datum: Datum) -> Messages:
_start_time = time.perf_counter()
_ = datum.event_time
_ = datum.watermark
_uuid = uuid.uuid4().hex
_LOGGER.info("%s - Received Msg: { Keys: %s, Value: %s }", _uuid, keys, datum.value)
_LOGGER.info("Received Msg: { Keys: %s, Value: %s }", keys, datum.value)

messages = Messages()
try:
data_payload = json.loads(datum.value)
_LOGGER.info("%s - Data payload: %s", _uuid, data_payload)
_LOGGER.info("%s - Data payload: %s", data_payload["uuid"], data_payload)
except Exception as e:
_LOGGER.error("%s - Error while reading input json %r", e)
messages.append(Message.to_drop())
return messages

# Load config
stream_conf = ConfigManager.get_stream_config(config_name=keys[0])
stream_conf = ConfigManager.get_stream_config(config_id=data_payload["config_id"])
raw_df, timestamps = self.get_df(data_payload, stream_conf.metrics)

# Prepare payload for forwarding
payload = StreamPayload(
uuid=_uuid,
uuid=data_payload["uuid"],
config_id=data_payload["config_id"],
composite_keys=keys,
data=np.asarray(raw_df.values.tolist()),
raw_data=np.asarray(raw_df.values.tolist()),
Expand Down
4 changes: 2 additions & 2 deletions udf/anomaly-detection/src/udf/threshold.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ def threshold(
metric_arr = payload.get_data()

# Load config
static_thresh = ConfigManager.get_static_threshold_config(config_name=keys[0])
thresh_cfg = ConfigManager.get_threshold_config(config_name=keys[0])
static_thresh = ConfigManager.get_static_threshold_config(config_id=payload.config_id)
thresh_cfg = ConfigManager.get_threshold_config(config_id=payload.config_id)

# Check if metric needs static inference
if payload.header == Header.STATIC_INFERENCE:
Expand Down
4 changes: 1 addition & 3 deletions udf/anomaly-detection/src/udsink/train.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
import os
import time

import numpy as np
import orjson
import pandas as pd
from typing import List, Iterator
Expand Down Expand Up @@ -70,7 +68,7 @@ def fetch_druid_data(cls, payload: TrainerPayload) -> pd.DataFrame:
return data_fetcher.fetch_data(
datasource=fetcher_conf.datasource,
filter_keys=stream_config.composite_keys,
filter_values=payload.composite_keys[1:], # skip config name and add metric name
filter_values=payload.composite_keys,
dimensions=OmegaConf.to_container(fetcher_conf.dimensions),
granularity=fetcher_conf.granularity,
aggregations=OmegaConf.to_container(fetcher_conf.aggregations),
Expand Down
46 changes: 23 additions & 23 deletions udf/anomaly-detection/src/watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,29 +47,29 @@ def update_configs(cls):

cls.config["user_configs"] = dict()
for _config in user_configs:
cls.config["user_configs"][_config.name] = _config
cls.config["user_configs"][_config.config_id] = _config

cls.config["default_configs"] = dict(map(lambda c: (c.name, c), default_configs))
cls.config["default_configs"] = dict(map(lambda c: (c.config_id, c), default_configs))
cls.config["default_numalogic"] = default_numalogic
cls.config["pipeline_config"] = pipeline_config

_LOGGER.info("Successfully updated configs - %s", cls.config)
return cls.config

@classmethod
def get_stream_config(cls, config_name: str) -> StreamConf:
def get_stream_config(cls, config_id: str) -> StreamConf:
if not cls.config:
cls.update_configs()

stream_conf = None

# search and load from user configs
if config_name in cls.config["user_configs"]:
stream_conf = cls.config["user_configs"][config_name]
if config_id in cls.config["user_configs"]:
stream_conf = cls.config["user_configs"][config_id]

# if not search and load from default configs
if not stream_conf and config_name in cls.config["default_configs"]:
stream_conf = cls.config["default_configs"][config_name]
if not stream_conf and config_id in cls.config["default_configs"]:
stream_conf = cls.config["default_configs"][config_id]

# if not in default configs, initialize conf with default values
if not stream_conf:
Expand All @@ -82,8 +82,8 @@ def get_stream_config(cls, config_name: str) -> StreamConf:
return stream_conf

@classmethod
def get_unified_config(cls, config_name: str) -> UnifiedConf:
stream_conf = cls.get_stream_config(config_name)
def get_unified_config(cls, config_id: str) -> UnifiedConf:
stream_conf = cls.get_stream_config(config_id)
return stream_conf.unified_config

@classmethod
Expand All @@ -105,32 +105,32 @@ def get_druid_config(cls) -> Optional[DruidConf]:
return None

@classmethod
def get_numalogic_config(cls, config_name: str):
return cls.get_stream_config(config_name=config_name).numalogic_conf
def get_numalogic_config(cls, config_id: str):
return cls.get_stream_config(config_id=config_id).numalogic_conf

@classmethod
def get_preprocess_config(cls, config_name: str):
return cls.get_numalogic_config(config_name=config_name).preprocess
def get_preprocess_config(cls, config_id: str):
return cls.get_numalogic_config(config_id=config_id).preprocess

@classmethod
def get_retrain_config(cls, config_name: str, ):
return cls.get_stream_config(config_name=config_name).retrain_conf
def get_retrain_config(cls, config_id: str, ):
return cls.get_stream_config(config_id=config_id).retrain_conf

@classmethod
def get_static_threshold_config(cls, config_name: str):
return cls.get_stream_config(config_name=config_name).static_threshold
def get_static_threshold_config(cls, config_id: str):
return cls.get_stream_config(config_id=config_id).static_threshold

@classmethod
def get_threshold_config(cls, config_name: str):
return cls.get_stream_config(config_name=config_name).numalogic_conf.threshold
def get_threshold_config(cls, config_id: str):
return cls.get_stream_config(config_id=config_id).numalogic_conf.threshold

@classmethod
def get_postprocess_config(cls, config_name: str):
return cls.get_numalogic_config(config_name=config_name).postprocess
def get_postprocess_config(cls, config_id: str):
return cls.get_numalogic_config(config_id=config_id).postprocess

@classmethod
def get_trainer_config(cls, config_name: str):
return cls.get_numalogic_config(config_name=config_name).trainer
def get_trainer_config(cls, config_id: str):
return cls.get_numalogic_config(config_id=config_id).trainer


class ConfigHandler(FileSystemEventHandler):
Expand Down
29 changes: 9 additions & 20 deletions udf/anomaly-detection/tests/resources/configs/default_config.yaml
Original file line number Diff line number Diff line change
@@ -1,33 +1,22 @@
configs:
- name: "fciAsset"
- config_id: "druid-config"
source: "druid"
composite_keys: [ "assetId" ]
metrics: [ "failed" , "degraded" ]
static_threshold:
weight: [ 0.7,0.3 ]
upper_limit: [ 3, 3 ]
druid_fetcher:
dimensions: [ "ciStatus" ]
datasource: "tech-ip-customer-interaction-metrics"
datasource: "druid-metrics-table-name"
group_by: [ "timestamp", "ciStatus" ]
pivot:
columns:
- "ciStatus"
static_threshold:
weight: [ 0.7,0.3 ]
upper_limit: [ 3, 3 ]
- name: "service-mesh"
composite_keys: [ "destination_asset_id" ]
- config_id: "prometheus-config"
source: "prometheus"
composite_keys: [ "namespace", "name", "rollout_pod_template_hash" ]
metrics: [ "error_rate" , "error_count" ]
static_threshold:
weight: [ 0.7,0.3 ]
upper_limit: [ 3, 3 ]
- name: "service-mesh-s2s"
composite_keys: ["source_asset_id, destination_asset_id"]
metrics: [ "error_rate", "error_count" ]
static_threshold:
weight: [ 0.7,0.3 ]
upper_limit: [ 3, 3 ]
- name: "argo-rollouts"
composite_keys: [ "namespace", "name", "app", "rollouts_pod_template_hash" ]
metrics: [ "namespace_app_rollouts_http_request_error_rate"]
static_threshold:
weight: [0.7]
upper_limit: [3]
upper_limit: [ 3, 3 ]
17 changes: 10 additions & 7 deletions udf/anomaly-detection/tests/resources/configs/users_config.yaml
Original file line number Diff line number Diff line change
@@ -1,19 +1,22 @@
configs:
- name: "sandbox_numalogic_demo1"
composite_keys: [ "namespace", "name", "hash_id" ]
metrics: ["rollout_error_rate"]
- config_id: "app1-config"
source: "prometheus"
composite_keys: [ "namespace", "name", "rollout_pod_template_hash" ]
metrics: ["error_rate"]
static_threshold:
weight: [0.6]
upper_limit: [3]
- name: "sandbox_numalogic_demo2"
composite_keys: [ "namespace", "name", "hash_id" ]
metrics: ["rollout_error_rate"]
- config_id: "app2-config"
source: "prometheus"
composite_keys: [ "namespace", "name", "rollout_pod_template_hash" ]
metrics: ["error_rate", "error_count"]
static_threshold:
weight: [0.7, 0.3]
upper_limit: [3, 3]
unified_config:
weights: [0.7, 0.3]
- name: "sandbox_numalogic_demo3"
- config_id: "app2-config"
source: "prometheus"
metrics: ["namespace_app_rollouts_http_request_error_rate", "namespace_app_rollouts_http_request_latency"]
composite_keys: [ "namespace", "name", "app", "rollouts_pod_template_hash" ]
static_threshold:
Expand Down
Loading

0 comments on commit fe5f48b

Please sign in to comment.