From 86dbfc25c314b4512b963c28259c3c1f0e35eb8b Mon Sep 17 00:00:00 2001 From: nkoppisetty Date: Mon, 24 Jul 2023 11:19:37 -0700 Subject: [PATCH] fix: config on trainer Signed-off-by: nkoppisetty --- udf/anomaly-detection/src/entities.py | 2 +- udf/anomaly-detection/src/udf/threshold.py | 5 ++++- udf/anomaly-detection/src/udsink/train.py | 8 ++++---- udf/anomaly-detection/src/watcher.py | 5 +++-- udf/anomaly-detection/tests/test_trainer.py | 4 +++- 5 files changed, 15 insertions(+), 9 deletions(-) diff --git a/udf/anomaly-detection/src/entities.py b/udf/anomaly-detection/src/entities.py index 865a5dcb..309baf42 100644 --- a/udf/anomaly-detection/src/entities.py +++ b/udf/anomaly-detection/src/entities.py @@ -34,6 +34,7 @@ class Header(str, Enum): @dataclass class _BasePayload: uuid: str + config_id: str composite_keys: List[str] @@ -51,7 +52,6 @@ def to_json(self): @dataclass(repr=False) class StreamPayload(_BasePayload): - config_id: str data: Matrix raw_data: Matrix metrics: List[str] diff --git a/udf/anomaly-detection/src/udf/threshold.py b/udf/anomaly-detection/src/udf/threshold.py index 7417c908..733ff1e9 100644 --- a/udf/anomaly-detection/src/udf/threshold.py +++ b/udf/anomaly-detection/src/udf/threshold.py @@ -109,7 +109,10 @@ def run(self, keys: List[str], datum: Datum) -> Messages: if y_score is None or header == Header.MODEL_STALE or status == Status.ARTIFACT_NOT_FOUND: train_payload = TrainerPayload( - uuid=payload.uuid, composite_keys=keys, metrics=payload.metrics + uuid=payload.uuid, + composite_keys=keys, + metrics=payload.metrics, + config_id=payload.config_id, ) _LOGGER.info( "%s - Sending Msg: { Keys: %s, Tags:%s, Payload: %s }", diff --git a/udf/anomaly-detection/src/udsink/train.py b/udf/anomaly-detection/src/udsink/train.py index e93a9ff4..bebd4199 100644 --- a/udf/anomaly-detection/src/udsink/train.py +++ b/udf/anomaly-detection/src/udsink/train.py @@ -57,7 +57,7 @@ def fetch_prometheus_data(cls, payload: TrainerPayload) -> pd.DataFrame: @classmethod def fetch_druid_data(cls, payload: TrainerPayload) -> pd.DataFrame: - stream_config = ConfigManager.get_stream_config(payload.composite_keys[0]) + stream_config = ConfigManager.get_stream_config(payload.config_id) druid_conf = ConfigManager.get_druid_config() fetcher_conf = stream_config.druid_fetcher if druid_conf is None: @@ -79,7 +79,7 @@ def fetch_druid_data(cls, payload: TrainerPayload) -> pd.DataFrame: @classmethod def fetch_data(cls, payload: TrainerPayload) -> pd.DataFrame: - stream_config = ConfigManager.get_stream_config(payload.composite_keys[0]) + stream_config = ConfigManager.get_stream_config(payload.config_id) if stream_config.source == DataSource.PROMETHEUS: return cls.fetch_prometheus_data(payload) elif stream_config.source == DataSource.DRUID: @@ -247,8 +247,8 @@ def run(self, datums: Iterator[Datum]) -> Responses: responses.append(Response.as_success(_datum.id)) continue - retrain_config = ConfigManager.get_retrain_config(payload.composite_keys[0]) - numalogic_config = ConfigManager.get_numalogic_config(payload.composite_keys[0]) + retrain_config = ConfigManager.get_retrain_config(payload.config_id) + numalogic_config = ConfigManager.get_numalogic_config(payload.config_id) try: df = self.fetch_data(payload) diff --git a/udf/anomaly-detection/src/watcher.py b/udf/anomaly-detection/src/watcher.py index d56ef8dd..ccfd407d 100644 --- a/udf/anomaly-detection/src/watcher.py +++ b/udf/anomaly-detection/src/watcher.py @@ -156,8 +156,9 @@ def __init__(self, directories=None, handler=FileSystemEventHandler()): def run(self): for directory in self.directories: - self.observer.schedule(self.handler, directory, recursive=True) - _LOGGER.info("\nWatcher Running in {}/\n".format(directory)) + if os.path.exists(directory): + self.observer.schedule(self.handler, directory, recursive=True) + _LOGGER.info("\nWatcher Running in {}/\n".format(directory)) self.observer.start() try: diff --git a/udf/anomaly-detection/tests/test_trainer.py b/udf/anomaly-detection/tests/test_trainer.py index 7dd87eb2..76d8254b 100644 --- a/udf/anomaly-detection/tests/test_trainer.py +++ b/udf/anomaly-detection/tests/test_trainer.py @@ -34,6 +34,7 @@ def as_datum(data: str | bytes | dict, msg_id="1") -> Datum: class TestTrainer(unittest.TestCase): train_payload = { "uuid": "1", + "config_id": "prometheus-config", "composite_keys": [ "sandbox_numalogic_demo", "metric_1", @@ -44,7 +45,8 @@ class TestTrainer(unittest.TestCase): train_payload2 = { "uuid": "2", - "composite_keys": ["fciAsset", "5984175597303660107"], + "config_id": "druid-config", + "composite_keys": ["5984175597303660107"], "metrics": ["failed", "degraded"], }