Skip to content

Commit

Permalink
fix: add instance of Singleton to load metrics
Browse files Browse the repository at this point in the history
Signed-off-by: Kushal Batra <i.kushalbatra@gmail.com>
  • Loading branch information
s0nicboOm committed Jun 12, 2024
1 parent 0e773b1 commit bf77eab
Show file tree
Hide file tree
Showing 11 changed files with 58 additions and 38 deletions.
6 changes: 3 additions & 3 deletions numalogic/udfs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from numalogic._constants import BASE_DIR
from numalogic.udfs._base import NumalogicUDF
from numalogic.udfs._config import StreamConf, PipelineConf, MLPipelineConf, load_pipeline_conf
from numalogic.udfs._metrics_utility import MetricsSingleton
from numalogic.udfs._metrics_utility import MetricsLoader
from numalogic.udfs.factory import UDFFactory, ServerFactory
from numalogic.udfs.payloadtx import PayloadTransformer
from numalogic.udfs.inference import InferenceUDF
Expand All @@ -26,7 +26,7 @@ def set_logger() -> None:

def set_metrics(conf_file: str) -> None:
"""Sets the metrics for the UDFs."""
MetricsSingleton().load_metrics(config_file_path=conf_file)
MetricsLoader().load_metrics(config_file_path=conf_file)


__all__ = [
Expand All @@ -47,5 +47,5 @@ def set_metrics(conf_file: str) -> None:
"ServerFactory",
"set_logger",
"set_metrics",
"MetricsSingleton",
"MetricsLoader",
]
50 changes: 35 additions & 15 deletions numalogic/udfs/_metrics_utility.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
from typing import Optional, Any
from typing import Optional, Any, TypeVar

from numalogic import LOGGER
from numalogic.tools.types import Singleton
from numaprom.monitoring.metrics import BaseMetric
from numaprom.monitoring.utility import get_metric
from omegaconf import OmegaConf

metrics_t = TypeVar("metrics_t", bound=BaseMetric, covariant=True)


def create_metrics_from_config_file(config_file_path: str) -> dict[str, Any]:
config = OmegaConf.load(config_file_path)
Expand All @@ -20,7 +25,7 @@ def create_metrics_from_config_file(config_file_path: str) -> dict[str, Any]:
return metrics


class MetricsSingleton:
class MetricsLoader(metaclass=Singleton):
_instance = None
_metrics = None

Expand All @@ -35,10 +40,13 @@ def load_metrics(self, config_file_path: str):
raise ValueError("file path is required to load metrics")
self._metrics = create_metrics_from_config_file(config_file_path)

def get_metrics(self) -> dict:
def get_metrics(self) -> dict[str, metrics_t]:
return self._metrics


_METRICS_LOADER = MetricsLoader()


# helper functions
def _increment_counter(
counter: str, labels: Optional[dict], amount: int = 1, is_enabled=True
Expand All @@ -51,9 +59,12 @@ def _increment_counter(
labels: dict of label keys, value pair
amount: Amount to increment the counter by
"""
_metrics = MetricsSingleton().get_metrics()
if is_enabled and counter in _metrics:
_metrics[counter].increment_counter(labels=labels, amount=amount)
_metrics = _METRICS_LOADER.get_metrics()
if is_enabled:
try:
_metrics[counter].increment_counter(labels=labels, amount=amount)
except KeyError:
LOGGER.error(f"Metric {counter} not found in metrics")


def _add_info(info: str, labels: Optional[dict], data: dict, is_enabled=True) -> None:
Expand All @@ -65,9 +76,12 @@ def _add_info(info: str, labels: Optional[dict], data: dict, is_enabled=True) ->
labels: dict of label keys, value pair
data: Dictionary of data
"""
_metrics = MetricsSingleton().get_metrics()
if is_enabled and info in _metrics:
_metrics[info].add_info(labels=labels, data=data)
_metrics = _METRICS_LOADER.get_metrics()
if is_enabled:
try:
_metrics[info].add_info(labels=labels, data=data)
except KeyError:
LOGGER.error(f"Metric {info} not found in metrics")


def _add_summary(summary: str, labels: Optional[dict], data: float, is_enabled=True) -> None:
Expand All @@ -79,9 +93,12 @@ def _add_summary(summary: str, labels: Optional[dict], data: float, is_enabled=T
labels: dict of labels key, value pair
data: Summary value
"""
_metrics = MetricsSingleton().get_metrics()
if is_enabled and summary in _metrics:
_metrics[summary].add_observation(labels=labels, value=data)
_metrics = _METRICS_LOADER.get_metrics()
if is_enabled:
try:
_metrics[summary].add_observation(labels=labels, value=data)
except KeyError:
LOGGER.error(f"Metric {summary} not found in metrics")


def _set_gauge(gauge: str, labels: Optional[dict], data: float, is_enabled=True) -> None:
Expand All @@ -92,6 +109,9 @@ def _set_gauge(gauge: str, labels: Optional[dict], data: float, is_enabled=True)
labels: dict of label keys, value pair
data: data.
"""
_metrics = MetricsSingleton().get_metrics()
if is_enabled and gauge in _metrics:
_metrics[gauge].set_gauge(labels=labels, data=data)
_metrics = _METRICS_LOADER.get_metrics()
if is_enabled:
try:
_metrics[gauge].set_gauge(labels=labels, data=data)
except KeyError:
LOGGER.error(f"Metric {gauge} not found in metrics")
1 change: 0 additions & 1 deletion numalogic/udfs/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ class UDFFactory:
from numalogic.udfs.staticthresh import StaticThresholdUDF

nl_udf_t = TypeVar("nl_udf_t", bound=NumalogicUDF, covariant=True)

_UDF_MAP: ClassVar[dict[str, type[NumalogicUDF]]] = {
"mlpipeline": PayloadTransformer,
"preprocess": PreprocessUDF,
Expand Down
10 changes: 5 additions & 5 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions tests/udfs/test_inference.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@
from numalogic.models.autoencoder.variants import VanillaAE
from numalogic.registry import RedisRegistry, ArtifactData
from numalogic.tools.exceptions import RedisRegistryError
from numalogic.udfs import StreamConf, InferenceUDF, MLPipelineConf, MetricsSingleton
from numalogic.udfs import StreamConf, InferenceUDF, MLPipelineConf, MetricsLoader
from numalogic.udfs.entities import StreamPayload, Header, Status, TrainerPayload

MetricsSingleton().load_metrics(
MetricsLoader().load_metrics(
config_file_path=f"{TESTS_DIR}/udfs/resources/numalogic_udf_metrics.yaml"
)
REDIS_CLIENT = FakeStrictRedis(server=FakeServer())
Expand Down Expand Up @@ -175,6 +175,7 @@ def test_inference_cache(udf, udf_args, mocker):
),
)
msgs = udf(*udf_args)
print(MetricsLoader().get_metrics())
assert len(msgs) == 1
payload = StreamPayload(**orjson.loads(msgs[0].value))
assert Header.MODEL_INFERENCE == payload.header
Expand Down
4 changes: 2 additions & 2 deletions tests/udfs/test_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@

from numalogic._constants import TESTS_DIR
from numalogic.tools.exceptions import ConfigNotFoundError
from numalogic.udfs import MetricsSingleton
from numalogic.udfs import MetricsLoader

BASE_CONFIG_PATH = f"{TESTS_DIR}/udfs/resources/_config3.yaml"
APP_CONFIG_PATH = f"{TESTS_DIR}/udfs/resources/_config4.yaml"
REDIS_AUTH = "123"
MetricsSingleton().load_metrics(
MetricsLoader().load_metrics(
config_file_path=f"{TESTS_DIR}/udfs/resources/numalogic_udf_metrics.yaml"
)

Expand Down
4 changes: 2 additions & 2 deletions tests/udfs/test_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@
import pytest

from numalogic._constants import TESTS_DIR
from numalogic.udfs import PipelineConf, MetricsSingleton
from numalogic.udfs import PipelineConf, MetricsLoader
from numalogic.udfs.payloadtx import PayloadTransformer
from tests.udfs.utility import input_json_from_file

MetricsSingleton().load_metrics(
MetricsLoader().load_metrics(
config_file_path=f"{TESTS_DIR}/udfs/resources/numalogic_udf_metrics.yaml"
)
logging.basicConfig(level=logging.DEBUG)
Expand Down
4 changes: 2 additions & 2 deletions tests/udfs/test_postprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@
from numalogic.models.threshold import StdDevThreshold
from numalogic.registry import RedisRegistry, ArtifactData
from numalogic.transforms import TanhNorm
from numalogic.udfs import PipelineConf, MetricsSingleton
from numalogic.udfs import PipelineConf, MetricsLoader
from numalogic.udfs.entities import Header, TrainerPayload, Status, OutputPayload
from numalogic.udfs.postprocess import PostprocessUDF

MetricsSingleton().load_metrics(
MetricsLoader().load_metrics(
config_file_path=f"{TESTS_DIR}/udfs/resources/numalogic_udf_metrics.yaml"
)
logging.basicConfig(level=logging.DEBUG)
Expand Down
4 changes: 2 additions & 2 deletions tests/udfs/test_preprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from numalogic._constants import TESTS_DIR
from numalogic.registry import RedisRegistry
from numalogic.tools.exceptions import ModelKeyNotFound
from numalogic.udfs import MetricsSingleton
from numalogic.udfs import MetricsLoader
from numalogic.udfs._config import PipelineConf
from numalogic.udfs.entities import Status, Header, StreamPayload, TrainerPayload
from numalogic.udfs.preprocess import PreprocessUDF
Expand All @@ -28,7 +28,7 @@
"event_time": datetime.now(),
"watermark": datetime.now(),
}
MetricsSingleton().load_metrics(
MetricsLoader().load_metrics(
config_file_path=f"{TESTS_DIR}/udfs/resources/numalogic_udf_metrics.yaml"
)

Expand Down
4 changes: 2 additions & 2 deletions tests/udfs/test_rds_trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@
from pynumaflow.mapper import Datum
from orjson import orjson
from omegaconf import OmegaConf
from numalogic.udfs import StreamConf, PipelineConf, MLPipelineConf, MetricsSingleton
from numalogic.udfs import StreamConf, PipelineConf, MLPipelineConf, MetricsLoader
from numalogic.config import NumalogicConf, ModelInfo
from numalogic.config import TrainerConf, LightningTrainerConf
import time
from freezegun import freeze_time

REDIS_CLIENT = FakeStrictRedis(server=FakeServer())
MetricsSingleton().load_metrics(
MetricsLoader().load_metrics(
config_file_path=f"{TESTS_DIR}/udfs/resources/numalogic_udf_metrics.yaml"
)

Expand Down
4 changes: 2 additions & 2 deletions tests/udfs/test_staticthresh.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@
from pynumaflow.mapper import Datum

from numalogic._constants import TESTS_DIR
from numalogic.udfs import PipelineConf, MetricsSingleton
from numalogic.udfs import PipelineConf, MetricsLoader
from numalogic.udfs.entities import Status, Header, OutputPayload
from numalogic.udfs.staticthresh import StaticThresholdUDF

MetricsSingleton().load_metrics(
MetricsLoader().load_metrics(
config_file_path=f"{TESTS_DIR}/udfs/resources/numalogic_udf_metrics.yaml"
)
logging.basicConfig(level=logging.DEBUG)
Expand Down

0 comments on commit bf77eab

Please sign in to comment.