Skip to content

Commit

Permalink
add tests for prom fetcher
Browse files Browse the repository at this point in the history
Signed-off-by: Avik Basu <ab93@users.noreply.github.com>
  • Loading branch information
ab93 committed Sep 11, 2023
1 parent 8bc4218 commit 9ab7b9e
Show file tree
Hide file tree
Showing 2 changed files with 200 additions and 16 deletions.
36 changes: 20 additions & 16 deletions numalogic/connectors/prometheus.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,17 +89,12 @@ def fetch(
LOGGER.warning("Query returned no results")
return df

df = df[["values", *return_labels]]
df = df.explode("values", ignore_index=True)
df[["timestamp", metric_name]] = df["values"].to_list()
df.drop(columns=["values"], inplace=True)
df = df.astype({metric_name: float})

df["timestamp"] = pd.to_datetime(df["timestamp"], unit="s")
df.sort_values(by=["timestamp"], inplace=True)
df = self._consolidate_df(df, metric_name, return_labels)

if aggregate and return_labels:
df = df.groupby(by=["timestamp"]).apply(lambda x: x[[metric_name]].mean())
df = self._agg_df(df, metric_name)

df.sort_values(by=["timestamp"], inplace=True)

return df

Expand All @@ -122,19 +117,28 @@ def raw_fetch(
return_labels = [f"metric.{label}" for label in return_labels or []]
metric_name = self._extract_metric_name(df) or "metric"

df = self._consolidate_df(df, metric_name, return_labels)

if aggregate and return_labels:
df = self._agg_df(df, metric_name)

df.sort_values(by=["timestamp"], inplace=True)
return df

@staticmethod
def _agg_df(df, metric_name: str):
df = df.groupby(by=["timestamp"]).apply(lambda x: x[[metric_name]].mean())
df.reset_index(inplace=True)
return df

@staticmethod
def _consolidate_df(df: pd.DataFrame, metric_name: str, return_labels: list[str]):
df = df[["values", *return_labels]]
df = df.explode("values", ignore_index=True)

df[["timestamp", metric_name]] = df["values"].to_list()
df.drop(columns=["values"], inplace=True)
df = df.astype({metric_name: float})

df["timestamp"] = pd.to_datetime(df["timestamp"], unit="s")
df.sort_values(by=["timestamp"], inplace=True)

if aggregate and return_labels:
df = df.groupby(by=["timestamp"]).apply(lambda x: x[[metric_name]].mean())

return df

@staticmethod
Expand Down
180 changes: 180 additions & 0 deletions tests/connectors/test_prometheus.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
import logging
import unittest
from datetime import datetime, timedelta
from unittest.mock import patch, Mock, MagicMock

from orjson import orjson
from requests import Response

from numalogic.connectors import PrometheusFetcher
from numalogic.tools.exceptions import PrometheusFetcherError, PrometheusInvalidResponseError

logging.basicConfig(level=logging.DEBUG)


def _mock_response():
response = MagicMock()
response.status_code = 200
response.text = orjson.dumps(
{
"status": "success",
"data": {
"resultType": "vector",
"result": [
{
"metric": {
"__name__": "namespace_asset_pod_cpu_utilization",
"numalogic": "true",
"namespace": "sandbox-numalogic-demo",
},
"values": [
[1656334767.73, "14.744611739611193"],
[1656334797.73, "14.73040822323633"],
],
}
],
},
}
)
return response


def _mock_query_range():
return [
{
"metric": {
"__name__": "namespace_asset_pod_cpu_utilization",
"assetAlias": "sandbox.numalogic.demo",
"numalogic": "true",
"namespace": "sandbox-numalogic-demo",
},
"values": [[1656334767.73, "14.744611739611193"], [1656334797.73, "14.73040822323633"]],
}
]


def _mock_w_return_labels():
return [
{
"metric": {
"__name__": "namespace_app_rollouts_http_request_error_rate",
"assetAlias": "sandbox.numalogic.demo",
"numalogic": "true",
"namespace": "sandbox-numalogic-demo",
"rollouts_pod_template_hash": "7b4b4f9f9d",
},
"values": [[1656334767.73, "10.0"], [1656334797.73, "12.0"]],
},
{
"metric": {
"__name__": "namespace_app_rollouts_http_request_error_rate",
"assetAlias": "sandbox.numalogic.demo",
"numalogic": "true",
"namespace": "sandbox-numalogic-demo",
"rollouts_pod_template_hash": "5b4b4f9f9d",
},
"values": [[1656334767.73, "11.0"], [1656334797.73, "13.0"]],
},
]


class TestPrometheusFetcher(unittest.TestCase):
def setUp(self) -> None:
self.fetcher = PrometheusFetcher(prometheus_server="http://localhost:9090")

@patch("requests.get", Mock(return_value=_mock_response()))
def test_fetch(self):
df = self.fetcher.fetch(
metric_name="namespace_asset_pod_cpu_utilization",
start=datetime.now() - timedelta(hours=1),
filters={"namespace": "sandbox-numalogic-demo"},
)
self.assertEqual(df.shape, (2, 2))
self.assertListEqual(
df.columns.to_list(), ["timestamp", "namespace_asset_pod_cpu_utilization"]
)

@patch.object(PrometheusFetcher, "_api_query_range", Mock(return_value=_mock_w_return_labels()))
def test_fetch_return_labels(self):
metric = "namespace_app_rollouts_http_request_error_rate"
df = self.fetcher.fetch(
metric_name=metric,
start=datetime.now() - timedelta(hours=1),
filters={"namespace": "sandbox-numalogic-demo"},
return_labels=["rollouts_pod_template_hash"],
aggregate=True,
)
self.assertEqual(df.shape, (2, 2))
self.assertListEqual(df.columns.to_list(), ["timestamp", metric])
self.assertListEqual([10.5, 12.5], df[metric].to_list())

@patch.object(PrometheusFetcher, "_api_query_range", Mock(return_value=[]))
def test_fetch_no_data(self):
df = self.fetcher.fetch(
metric_name="namespace_asset_pod_cpu_utilization",
start=datetime.now() - timedelta(hours=1),
filters={"namespace": "sandbox-numalogic-demo"},
)
self.assertTrue(df.empty)

@patch("requests.get", Mock(side_effect=Exception("Test exception")))
def test_fetch_url_err(self):
with self.assertRaises(PrometheusFetcherError):
self.fetcher.fetch(
metric_name="namespace_asset_pod_cpu_utilization",
start=datetime.now() - timedelta(hours=1),
filters={"namespace": "sandbox-numalogic-demo"},
)

@patch("requests.get", Mock(return_value=Response()))
def test_fetch_response_err(self):
with self.assertRaises(PrometheusInvalidResponseError):
self.fetcher.fetch(
metric_name="namespace_asset_pod_cpu_utilization",
start=datetime.now() - timedelta(hours=1),
filters={"namespace": "sandbox-numalogic-demo"},
)

@patch.object(PrometheusFetcher, "_api_query_range", Mock(return_value=_mock_query_range()))
def test_fetch_raw(self):
df = self.fetcher.raw_fetch(
query='namespace_asset_pod_cpu_utilization{namespace="sandbox-numalogic-demo"}',
start=datetime.now() - timedelta(hours=1),
)
self.assertEqual(df.shape, (2, 2))
self.assertListEqual(
df.columns.to_list(), ["timestamp", "namespace_asset_pod_cpu_utilization"]
)

@patch.object(PrometheusFetcher, "_api_query_range", Mock(return_value=_mock_w_return_labels()))
def test_fetch_raw_return_labels(self):
metric = "namespace_app_rollouts_http_request_error_rate"
df = self.fetcher.raw_fetch(
query="namespace_app_rollouts_http_request_error_rate{namespace='sandbox-numalogic-demo'}",
start=datetime.now() - timedelta(hours=1),
return_labels=["rollouts_pod_template_hash"],
aggregate=True,
)
self.assertEqual(df.shape, (2, 2))
self.assertListEqual(df.columns.to_list(), ["timestamp", metric])
self.assertListEqual([10.5, 12.5], df[metric].to_list())

@patch.object(PrometheusFetcher, "_api_query_range", Mock(return_value=[]))
def test_fetch_raw_no_data(self):
df = self.fetcher.raw_fetch(
query='namespace_asset_pod_cpu_utilization{namespace="sandbox-numalogic-demo"}',
start=datetime.now() - timedelta(hours=1),
)
self.assertTrue(df.empty)

def test_start_end_err(self):
with self.assertRaises(ValueError):
self.fetcher.fetch(
metric_name="namespace_asset_pod_cpu_utilization",
start=datetime.now() - timedelta(hours=1),
end=datetime.now() - timedelta(hours=2),
)


if __name__ == "__main__":
unittest.main()

0 comments on commit 9ab7b9e

Please sign in to comment.