Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: add test cases for payload Transformer #397

Merged
merged 3 commits into from
Jun 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion numalogic/udfs/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
BASE_CONF_FILE_PATH: Final[str] = os.getenv("BASE_CONF_PATH", default=DEFAULT_BASE_CONF_PATH)
APP_CONF_FILE_PATH: Final[str] = os.getenv("APP_CONF_PATH", default=DEFAULT_APP_CONF_PATH)
METRICS_PORT: Final[int] = int(os.getenv("METRICS_PORT", default=DEFAULT_METRICS_PORT))
METRICS_ENABLED: Final[bool] = bool(int(os.getenv("METRICS_ENABLED", default="1")))
METRICS_ENABLED: Final[bool] = bool(int(os.getenv("METRICS_ENABLED", default="0")))
METRICS_CONF_PATH: Final[str] = os.getenv("METRICS_CONF_PATH", default=DEFAULT_METRICS_CONF_PATH)


Expand Down
2 changes: 1 addition & 1 deletion numalogic/udfs/inference.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
LOCAL_CACHE_TTL = int(os.getenv("LOCAL_CACHE_TTL", "3600"))
LOCAL_CACHE_SIZE = int(os.getenv("LOCAL_CACHE_SIZE", "10000"))
LOAD_LATEST = os.getenv("LOAD_LATEST", "false").lower() == "true"
METRICS_ENABLED = bool(int(os.getenv("METRICS_ENABLED", default="1")))
METRICS_ENABLED = bool(int(os.getenv("METRICS_ENABLED", default="0")))
nkoppisetty marked this conversation as resolved.
Show resolved Hide resolved


class InferenceUDF(NumalogicUDF):
Expand Down
8 changes: 4 additions & 4 deletions numalogic/udfs/payloadtx.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from numalogic.udfs._logger import configure_logger, log_data_payload_values
from numalogic.udfs._metrics_utility import _increment_counter

METRICS_ENABLED = bool(int(os.getenv("METRICS_ENABLED", default="1")))
METRICS_ENABLED = bool(int(os.getenv("METRICS_ENABLED", default="0")))

_struct_log = configure_logger()

Expand Down Expand Up @@ -58,19 +58,19 @@ def exec(self, keys: list[str], datum: Datum) -> Messages:
logger.exception("Error while decoding input json")
return Messages(Message.to_drop())

_stream_conf = self.get_stream_conf(data_payload["config_id"])

_metric_label_values = {
"vertex": self._vtx,
"composite_key": ":".join(keys),
"config_id": data_payload["config_id"],
"pipeline_id": data_payload["pipeline_id"],
}

_increment_counter(
counter="MSG_IN_COUNTER",
labels=_metric_label_values,
is_enabled=METRICS_ENABLED,
)
_stream_conf = self.get_stream_conf(data_payload["config_id"])

# create a new message for each ML pipeline
messages = Messages()
for pipeline in _stream_conf.ml_pipelines:
Expand Down
2 changes: 1 addition & 1 deletion numalogic/udfs/postprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
LOCAL_CACHE_TTL = int(os.getenv("LOCAL_CACHE_TTL", "3600"))
LOCAL_CACHE_SIZE = int(os.getenv("LOCAL_CACHE_SIZE", "10000"))
LOAD_LATEST = os.getenv("LOAD_LATEST", "false").lower() == "true"
METRICS_ENABLED = bool(int(os.getenv("METRICS_ENABLED", default="1")))
METRICS_ENABLED = bool(int(os.getenv("METRICS_ENABLED", default="0")))
SCORE_PREFIX = os.getenv("SCORE_PREFIX", "unified")

_struct_log = configure_logger()
Expand Down
2 changes: 1 addition & 1 deletion numalogic/udfs/preprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
LOCAL_CACHE_TTL = int(os.getenv("LOCAL_CACHE_TTL", "3600"))
LOCAL_CACHE_SIZE = int(os.getenv("LOCAL_CACHE_SIZE", "10000"))
LOAD_LATEST = os.getenv("LOAD_LATEST", "false").lower() == "true"
METRICS_ENABLED = bool(int(os.getenv("METRICS_ENABLED", default="1")))
METRICS_ENABLED = bool(int(os.getenv("METRICS_ENABLED", default="0")))
_struct_log = configure_logger()


Expand Down
2 changes: 1 addition & 1 deletion numalogic/udfs/staticthresh.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from numalogic.udfs.entities import StreamPayload, OutputPayload
from numalogic.udfs.tools import _update_gauge_metric

METRICS_ENABLED = bool(int(os.getenv("METRICS_ENABLED", default="1")))
METRICS_ENABLED = bool(int(os.getenv("METRICS_ENABLED", default="0")))
SCORE_PREFIX = os.getenv("SCORE_PREFIX", "unified")
_struct_log = configure_logger()

Expand Down
2 changes: 1 addition & 1 deletion numalogic/udfs/tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from numalogic.udfs._metrics_utility import _set_gauge, _increment_counter, _add_info
from numalogic.udfs.entities import StreamPayload, TrainerPayload

METRICS_ENABLED = bool(int(os.getenv("METRICS_ENABLED", default="1")))
METRICS_ENABLED = bool(int(os.getenv("METRICS_ENABLED", default="0")))

_struct_log = configure_logger()

Expand Down
2 changes: 1 addition & 1 deletion numalogic/udfs/trainer/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from numalogic.udfs.tools import TrainMsgDeduplicator
import torch

METRICS_ENABLED = bool(int(os.getenv("METRICS_ENABLED", default="1")))
METRICS_ENABLED = bool(int(os.getenv("METRICS_ENABLED", default="0")))
_struct_log = configure_logger()


Expand Down
2 changes: 1 addition & 1 deletion numalogic/udfs/trainer/_druid.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from numalogic.udfs.trainer._base import TrainerUDF
from numalogic.udfs._metrics_utility import _increment_counter, _add_summary

METRICS_ENABLED = bool(int(os.getenv("METRICS_ENABLED", default="1")))
METRICS_ENABLED = bool(int(os.getenv("METRICS_ENABLED", default="0")))
_struct_log = configure_logger()


Expand Down
2 changes: 1 addition & 1 deletion numalogic/udfs/trainer/_prom.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from numalogic.udfs._metrics_utility import _increment_counter, _add_summary
from numalogic.udfs.trainer._base import TrainerUDF

METRICS_ENABLED = bool(int(os.getenv("METRICS_ENABLED", default="1")))
METRICS_ENABLED = bool(int(os.getenv("METRICS_ENABLED", default="0")))
_struct_log = configure_logger()


Expand Down
2 changes: 1 addition & 1 deletion numalogic/udfs/trainer/_rds.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from datetime import datetime, timedelta
import pytz

METRICS_ENABLED = bool(int(os.getenv("METRICS_ENABLED", default="1")))
METRICS_ENABLED = bool(int(os.getenv("METRICS_ENABLED", default="0")))
_LOGGER = logging.getLogger(__name__)


Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "numalogic"
version = "0.12.0"
version = "0.12.1"
description = "Collection of operational Machine Learning models and tools."
authors = ["Numalogic Developers"]
packages = [{ include = "numalogic" }]
Expand Down
97 changes: 97 additions & 0 deletions tests/udfs/test_payloadtx.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
import logging
import os
from datetime import datetime

import pytest
from fakeredis import FakeServer, FakeStrictRedis
from omegaconf import OmegaConf
from orjson import orjson

from pynumaflow.mapper import Datum

from numalogic._constants import TESTS_DIR
from numalogic.udfs import MetricsLoader, PayloadTransformer
from numalogic.udfs._config import PipelineConf
from tests.udfs.utility import input_json_from_file

logging.basicConfig(level=logging.DEBUG)
REDIS_CLIENT = FakeStrictRedis(server=FakeServer())
KEYS = ["service-mesh", "1", "2"]
DATUM = input_json_from_file(os.path.join(TESTS_DIR, "udfs", "resources", "data", "stream.json"))

DATUM_KW = {
"event_time": datetime.now(),
"watermark": datetime.now(),
}
MetricsLoader().load_metrics(
config_file_path=f"{TESTS_DIR}/udfs/resources/numalogic_udf_metrics.yaml"
)

DATA = {
"uuid": "dd7dfb43-532b-49a3-906e-f78f82ad9c4b",
"config_id": "druid-config",
"composite_keys": ["service-mesh", "1", "2"],
"data": [],
"raw_data": [
[17.0, 4.0],
[22.0, 13.0],
[17.0, 7.0],
[23.0, 18.0],
[15.0, 15.0],
[16.0, 9.0],
[10.0, 10.0],
[3.0, 12.0],
[6.0, 21.0],
[5.0, 7.0],
[10.0, 8.0],
[0.0, 0.0],
],
"metrics": ["failed", "degraded"],
"timestamps": [
1691623200000,
1691623260000,
1691623320000,
1691623380000,
1691623440000,
1691623500000,
1691623560000,
1691623620000,
1691623680000,
1691623740000,
1691623800000,
1691623860000,
],
"metadata": {
"tags": {
"asset_alias": "some-alias",
"asset_id": "362557362191815079",
"env": "prd",
},
},
}


@pytest.fixture()
def udf_args():
return KEYS, Datum(
keys=KEYS,
value=orjson.dumps(DATA),
**DATUM_KW,
)


@pytest.fixture
def udf():
_given_conf = OmegaConf.load(os.path.join(TESTS_DIR, "udfs", "resources", "_config.yaml"))
schema = OmegaConf.structured(PipelineConf)
pl_conf = PipelineConf(**OmegaConf.merge(schema, _given_conf))
udf = PayloadTransformer(pl_conf=pl_conf)
udf.register_conf("druid-config", pl_conf.stream_confs["druid-config"])
yield udf


def test_payloadtx(udf, udf_args):
msgs = udf(*udf_args)
assert len(msgs) == 2
assert orjson.loads(msgs[0].value)["pipeline_id"] == "pipeline1"
assert orjson.loads(msgs[1].value)["pipeline_id"] == "pipeline2"
Loading