diff --git a/numalogic/udfs/__init__.py b/numalogic/udfs/__init__.py index 6c725846..5d5c4fdb 100644 --- a/numalogic/udfs/__init__.py +++ b/numalogic/udfs/__init__.py @@ -5,7 +5,7 @@ from numalogic._constants import BASE_DIR from numalogic.udfs._base import NumalogicUDF from numalogic.udfs._config import StreamConf, PipelineConf, MLPipelineConf, load_pipeline_conf -from numalogic.udfs._metrics_utility import MetricsSingleton +from numalogic.udfs._metrics_utility import MetricsLoader from numalogic.udfs.factory import UDFFactory, ServerFactory from numalogic.udfs.payloadtx import PayloadTransformer from numalogic.udfs.inference import InferenceUDF @@ -26,7 +26,7 @@ def set_logger() -> None: def set_metrics(conf_file: str) -> None: """Sets the metrics for the UDFs.""" - MetricsSingleton().load_metrics(config_file_path=conf_file) + MetricsLoader().load_metrics(config_file_path=conf_file) __all__ = [ @@ -47,5 +47,5 @@ def set_metrics(conf_file: str) -> None: "ServerFactory", "set_logger", "set_metrics", - "MetricsSingleton", + "MetricsLoader", ] diff --git a/numalogic/udfs/_metrics_utility.py b/numalogic/udfs/_metrics_utility.py index 5e212817..96ccc14f 100644 --- a/numalogic/udfs/_metrics_utility.py +++ b/numalogic/udfs/_metrics_utility.py @@ -1,8 +1,13 @@ -from typing import Optional, Any +from typing import Optional, Any, TypeVar +from numalogic import LOGGER +from numalogic.tools.types import Singleton +from numaprom.monitoring.metrics import BaseMetric from numaprom.monitoring.utility import get_metric from omegaconf import OmegaConf +metrics_t = TypeVar("metrics_t", bound=BaseMetric, covariant=True) + def create_metrics_from_config_file(config_file_path: str) -> dict[str, Any]: config = OmegaConf.load(config_file_path) @@ -20,7 +25,7 @@ def create_metrics_from_config_file(config_file_path: str) -> dict[str, Any]: return metrics -class MetricsSingleton: +class MetricsLoader(metaclass=Singleton): _instance = None _metrics = None @@ -35,10 +40,13 @@ def load_metrics(self, config_file_path: str): raise ValueError("file path is required to load metrics") self._metrics = create_metrics_from_config_file(config_file_path) - def get_metrics(self) -> dict: + def get_metrics(self) -> dict[str, metrics_t]: return self._metrics +_METRICS_LOADER = MetricsLoader() + + # helper functions def _increment_counter( counter: str, labels: Optional[dict], amount: int = 1, is_enabled=True @@ -51,9 +59,12 @@ def _increment_counter( labels: dict of label keys, value pair amount: Amount to increment the counter by """ - _metrics = MetricsSingleton().get_metrics() - if is_enabled and counter in _metrics: - _metrics[counter].increment_counter(labels=labels, amount=amount) + _metrics = _METRICS_LOADER.get_metrics() + if is_enabled: + try: + _metrics[counter].increment_counter(labels=labels, amount=amount) + except KeyError: + LOGGER.error(f"Metric {counter} not found in metrics") def _add_info(info: str, labels: Optional[dict], data: dict, is_enabled=True) -> None: @@ -65,9 +76,12 @@ def _add_info(info: str, labels: Optional[dict], data: dict, is_enabled=True) -> labels: dict of label keys, value pair data: Dictionary of data """ - _metrics = MetricsSingleton().get_metrics() - if is_enabled and info in _metrics: - _metrics[info].add_info(labels=labels, data=data) + _metrics = _METRICS_LOADER.get_metrics() + if is_enabled: + try: + _metrics[info].add_info(labels=labels, data=data) + except KeyError: + LOGGER.error(f"Metric {info} not found in metrics") def _add_summary(summary: str, labels: Optional[dict], data: float, is_enabled=True) -> None: @@ -79,9 +93,12 @@ def _add_summary(summary: str, labels: Optional[dict], data: float, is_enabled=T labels: dict of labels key, value pair data: Summary value """ - _metrics = MetricsSingleton().get_metrics() - if is_enabled and summary in _metrics: - _metrics[summary].add_observation(labels=labels, value=data) + _metrics = _METRICS_LOADER.get_metrics() + if is_enabled: + try: + _metrics[summary].add_observation(labels=labels, value=data) + except KeyError: + LOGGER.error(f"Metric {summary} not found in metrics") def _set_gauge(gauge: str, labels: Optional[dict], data: float, is_enabled=True) -> None: @@ -92,6 +109,9 @@ def _set_gauge(gauge: str, labels: Optional[dict], data: float, is_enabled=True) labels: dict of label keys, value pair data: data. """ - _metrics = MetricsSingleton().get_metrics() - if is_enabled and gauge in _metrics: - _metrics[gauge].set_gauge(labels=labels, data=data) + _metrics = _METRICS_LOADER.get_metrics() + if is_enabled: + try: + _metrics[gauge].set_gauge(labels=labels, data=data) + except KeyError: + LOGGER.error(f"Metric {gauge} not found in metrics") diff --git a/numalogic/udfs/factory.py b/numalogic/udfs/factory.py index 096a181f..c6244ea7 100644 --- a/numalogic/udfs/factory.py +++ b/numalogic/udfs/factory.py @@ -28,7 +28,6 @@ class UDFFactory: from numalogic.udfs.staticthresh import StaticThresholdUDF nl_udf_t = TypeVar("nl_udf_t", bound=NumalogicUDF, covariant=True) - _UDF_MAP: ClassVar[dict[str, type[NumalogicUDF]]] = { "mlpipeline": PayloadTransformer, "preprocess": PreprocessUDF, diff --git a/poetry.lock b/poetry.lock index ce8e4a05..2271be82 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1128,18 +1128,18 @@ devel = ["colorama", "json-spec", "jsonschema", "pylint", "pytest", "pytest-benc [[package]] name = "filelock" -version = "3.14.0" +version = "3.15.1" description = "A platform independent file lock." optional = false python-versions = ">=3.8" files = [ - {file = "filelock-3.14.0-py3-none-any.whl", hash = "sha256:43339835842f110ca7ae60f1e1c160714c5a6afd15a2873419ab185334975c0f"}, - {file = "filelock-3.14.0.tar.gz", hash = "sha256:6ea72da3be9b8c82afd3edcf99f2fffbb5076335a5ae4d03248bb5b6c3eae78a"}, + {file = "filelock-3.15.1-py3-none-any.whl", hash = "sha256:71b3102950e91dfc1bb4209b64be4dc8854f40e5f534428d8684f953ac847fac"}, + {file = "filelock-3.15.1.tar.gz", hash = "sha256:58a2549afdf9e02e10720eaa4d4470f56386d7a6f72edd7d0596337af8ed7ad8"}, ] [package.extras] docs = ["furo (>=2023.9.10)", "sphinx (>=7.2.6)", "sphinx-autodoc-typehints (>=1.25.2)"] -testing = ["covdefaults (>=2.3)", "coverage (>=7.3.2)", "diff-cover (>=8.0.1)", "pytest (>=7.4.3)", "pytest-cov (>=4.1)", "pytest-mock (>=3.12)", "pytest-timeout (>=2.2)"] +testing = ["covdefaults (>=2.3)", "coverage (>=7.3.2)", "diff-cover (>=8.0.1)", "pytest (>=7.4.3)", "pytest-asyncio (>=0.21)", "pytest-cov (>=4.1)", "pytest-mock (>=3.12)", "pytest-timeout (>=2.2)"] typing = ["typing-extensions (>=4.8)"] [[package]] @@ -3116,7 +3116,7 @@ setuptools = "^70.0.0" type = "git" url = "git@github.com:numaproj/numalogic-prometheus.git" reference = "metrics" -resolved_reference = "e02f957916757fd785d217abbd72399b465898f2" +resolved_reference = "7cd87ca8f3ed9b460b1ebde99608885d178b2377" [[package]] name = "numba" diff --git a/tests/udfs/test_inference.py b/tests/udfs/test_inference.py index 670fe4af..da49ea4e 100644 --- a/tests/udfs/test_inference.py +++ b/tests/udfs/test_inference.py @@ -19,10 +19,10 @@ from numalogic.models.autoencoder.variants import VanillaAE from numalogic.registry import RedisRegistry, ArtifactData from numalogic.tools.exceptions import RedisRegistryError -from numalogic.udfs import StreamConf, InferenceUDF, MLPipelineConf, MetricsSingleton +from numalogic.udfs import StreamConf, InferenceUDF, MLPipelineConf, MetricsLoader from numalogic.udfs.entities import StreamPayload, Header, Status, TrainerPayload -MetricsSingleton().load_metrics( +MetricsLoader().load_metrics( config_file_path=f"{TESTS_DIR}/udfs/resources/numalogic_udf_metrics.yaml" ) REDIS_CLIENT = FakeStrictRedis(server=FakeServer()) @@ -175,6 +175,7 @@ def test_inference_cache(udf, udf_args, mocker): ), ) msgs = udf(*udf_args) + print(MetricsLoader().get_metrics()) assert len(msgs) == 1 payload = StreamPayload(**orjson.loads(msgs[0].value)) assert Header.MODEL_INFERENCE == payload.header diff --git a/tests/udfs/test_main.py b/tests/udfs/test_main.py index 19339050..137c9e2f 100644 --- a/tests/udfs/test_main.py +++ b/tests/udfs/test_main.py @@ -6,12 +6,12 @@ from numalogic._constants import TESTS_DIR from numalogic.tools.exceptions import ConfigNotFoundError -from numalogic.udfs import MetricsSingleton +from numalogic.udfs import MetricsLoader BASE_CONFIG_PATH = f"{TESTS_DIR}/udfs/resources/_config3.yaml" APP_CONFIG_PATH = f"{TESTS_DIR}/udfs/resources/_config4.yaml" REDIS_AUTH = "123" -MetricsSingleton().load_metrics( +MetricsLoader().load_metrics( config_file_path=f"{TESTS_DIR}/udfs/resources/numalogic_udf_metrics.yaml" ) diff --git a/tests/udfs/test_pipeline.py b/tests/udfs/test_pipeline.py index 8533b24e..46db32e9 100644 --- a/tests/udfs/test_pipeline.py +++ b/tests/udfs/test_pipeline.py @@ -6,11 +6,11 @@ import pytest from numalogic._constants import TESTS_DIR -from numalogic.udfs import PipelineConf, MetricsSingleton +from numalogic.udfs import PipelineConf, MetricsLoader from numalogic.udfs.payloadtx import PayloadTransformer from tests.udfs.utility import input_json_from_file -MetricsSingleton().load_metrics( +MetricsLoader().load_metrics( config_file_path=f"{TESTS_DIR}/udfs/resources/numalogic_udf_metrics.yaml" ) logging.basicConfig(level=logging.DEBUG) diff --git a/tests/udfs/test_postprocess.py b/tests/udfs/test_postprocess.py index d206ea06..ad78f753 100644 --- a/tests/udfs/test_postprocess.py +++ b/tests/udfs/test_postprocess.py @@ -15,11 +15,11 @@ from numalogic.models.threshold import StdDevThreshold from numalogic.registry import RedisRegistry, ArtifactData from numalogic.transforms import TanhNorm -from numalogic.udfs import PipelineConf, MetricsSingleton +from numalogic.udfs import PipelineConf, MetricsLoader from numalogic.udfs.entities import Header, TrainerPayload, Status, OutputPayload from numalogic.udfs.postprocess import PostprocessUDF -MetricsSingleton().load_metrics( +MetricsLoader().load_metrics( config_file_path=f"{TESTS_DIR}/udfs/resources/numalogic_udf_metrics.yaml" ) logging.basicConfig(level=logging.DEBUG) diff --git a/tests/udfs/test_preprocess.py b/tests/udfs/test_preprocess.py index 5ce49fdf..1aa6944a 100644 --- a/tests/udfs/test_preprocess.py +++ b/tests/udfs/test_preprocess.py @@ -13,7 +13,7 @@ from numalogic._constants import TESTS_DIR from numalogic.registry import RedisRegistry from numalogic.tools.exceptions import ModelKeyNotFound -from numalogic.udfs import MetricsSingleton +from numalogic.udfs import MetricsLoader from numalogic.udfs._config import PipelineConf from numalogic.udfs.entities import Status, Header, StreamPayload, TrainerPayload from numalogic.udfs.preprocess import PreprocessUDF @@ -28,7 +28,7 @@ "event_time": datetime.now(), "watermark": datetime.now(), } -MetricsSingleton().load_metrics( +MetricsLoader().load_metrics( config_file_path=f"{TESTS_DIR}/udfs/resources/numalogic_udf_metrics.yaml" ) diff --git a/tests/udfs/test_rds_trainer.py b/tests/udfs/test_rds_trainer.py index 99ecfda6..0768cc61 100644 --- a/tests/udfs/test_rds_trainer.py +++ b/tests/udfs/test_rds_trainer.py @@ -21,14 +21,14 @@ from pynumaflow.mapper import Datum from orjson import orjson from omegaconf import OmegaConf -from numalogic.udfs import StreamConf, PipelineConf, MLPipelineConf, MetricsSingleton +from numalogic.udfs import StreamConf, PipelineConf, MLPipelineConf, MetricsLoader from numalogic.config import NumalogicConf, ModelInfo from numalogic.config import TrainerConf, LightningTrainerConf import time from freezegun import freeze_time REDIS_CLIENT = FakeStrictRedis(server=FakeServer()) -MetricsSingleton().load_metrics( +MetricsLoader().load_metrics( config_file_path=f"{TESTS_DIR}/udfs/resources/numalogic_udf_metrics.yaml" ) diff --git a/tests/udfs/test_staticthresh.py b/tests/udfs/test_staticthresh.py index 9b0e8f11..7b3c0b25 100644 --- a/tests/udfs/test_staticthresh.py +++ b/tests/udfs/test_staticthresh.py @@ -9,11 +9,11 @@ from pynumaflow.mapper import Datum from numalogic._constants import TESTS_DIR -from numalogic.udfs import PipelineConf, MetricsSingleton +from numalogic.udfs import PipelineConf, MetricsLoader from numalogic.udfs.entities import Status, Header, OutputPayload from numalogic.udfs.staticthresh import StaticThresholdUDF -MetricsSingleton().load_metrics( +MetricsLoader().load_metrics( config_file_path=f"{TESTS_DIR}/udfs/resources/numalogic_udf_metrics.yaml" ) logging.basicConfig(level=logging.DEBUG)