Skip to content

Commit

Permalink
fix: config on trainer
Browse files Browse the repository at this point in the history
Signed-off-by: nkoppisetty <nandita.iitkgp@gmail.com>
  • Loading branch information
nkoppisetty committed Jul 24, 2023
1 parent 602a852 commit 86dbfc2
Show file tree
Hide file tree
Showing 5 changed files with 15 additions and 9 deletions.
2 changes: 1 addition & 1 deletion udf/anomaly-detection/src/entities.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ class Header(str, Enum):
@dataclass
class _BasePayload:
uuid: str
config_id: str
composite_keys: List[str]


Expand All @@ -51,7 +52,6 @@ def to_json(self):

@dataclass(repr=False)
class StreamPayload(_BasePayload):
config_id: str
data: Matrix
raw_data: Matrix
metrics: List[str]
Expand Down
5 changes: 4 additions & 1 deletion udf/anomaly-detection/src/udf/threshold.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,10 @@ def run(self, keys: List[str], datum: Datum) -> Messages:

if y_score is None or header == Header.MODEL_STALE or status == Status.ARTIFACT_NOT_FOUND:
train_payload = TrainerPayload(
uuid=payload.uuid, composite_keys=keys, metrics=payload.metrics
uuid=payload.uuid,
composite_keys=keys,
metrics=payload.metrics,
config_id=payload.config_id,
)
_LOGGER.info(
"%s - Sending Msg: { Keys: %s, Tags:%s, Payload: %s }",
Expand Down
8 changes: 4 additions & 4 deletions udf/anomaly-detection/src/udsink/train.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def fetch_prometheus_data(cls, payload: TrainerPayload) -> pd.DataFrame:

@classmethod
def fetch_druid_data(cls, payload: TrainerPayload) -> pd.DataFrame:
stream_config = ConfigManager.get_stream_config(payload.composite_keys[0])
stream_config = ConfigManager.get_stream_config(payload.config_id)
druid_conf = ConfigManager.get_druid_config()
fetcher_conf = stream_config.druid_fetcher
if druid_conf is None:
Expand All @@ -79,7 +79,7 @@ def fetch_druid_data(cls, payload: TrainerPayload) -> pd.DataFrame:

@classmethod
def fetch_data(cls, payload: TrainerPayload) -> pd.DataFrame:
stream_config = ConfigManager.get_stream_config(payload.composite_keys[0])
stream_config = ConfigManager.get_stream_config(payload.config_id)
if stream_config.source == DataSource.PROMETHEUS:
return cls.fetch_prometheus_data(payload)
elif stream_config.source == DataSource.DRUID:
Expand Down Expand Up @@ -247,8 +247,8 @@ def run(self, datums: Iterator[Datum]) -> Responses:
responses.append(Response.as_success(_datum.id))
continue

retrain_config = ConfigManager.get_retrain_config(payload.composite_keys[0])
numalogic_config = ConfigManager.get_numalogic_config(payload.composite_keys[0])
retrain_config = ConfigManager.get_retrain_config(payload.config_id)
numalogic_config = ConfigManager.get_numalogic_config(payload.config_id)

try:
df = self.fetch_data(payload)
Expand Down
5 changes: 3 additions & 2 deletions udf/anomaly-detection/src/watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,9 @@ def __init__(self, directories=None, handler=FileSystemEventHandler()):

def run(self):
for directory in self.directories:
self.observer.schedule(self.handler, directory, recursive=True)
_LOGGER.info("\nWatcher Running in {}/\n".format(directory))
if os.path.exists(directory):
self.observer.schedule(self.handler, directory, recursive=True)
_LOGGER.info("\nWatcher Running in {}/\n".format(directory))

self.observer.start()
try:
Expand Down
4 changes: 3 additions & 1 deletion udf/anomaly-detection/tests/test_trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ def as_datum(data: str | bytes | dict, msg_id="1") -> Datum:
class TestTrainer(unittest.TestCase):
train_payload = {
"uuid": "1",
"config_id": "prometheus-config",
"composite_keys": [
"sandbox_numalogic_demo",
"metric_1",
Expand All @@ -44,7 +45,8 @@ class TestTrainer(unittest.TestCase):

train_payload2 = {
"uuid": "2",
"composite_keys": ["fciAsset", "5984175597303660107"],
"config_id": "druid-config",
"composite_keys": ["5984175597303660107"],
"metrics": ["failed", "degraded"],
}

Expand Down

0 comments on commit 86dbfc2

Please sign in to comment.