From ed81ef016ed8eba54bcc1e61bdd7ebe766becc0f Mon Sep 17 00:00:00 2001 From: Avik Basu <3485425+ab93@users.noreply.github.com> Date: Wed, 18 Sep 2024 12:27:15 -0700 Subject: [PATCH] feat: wavefront fetcher (#413) Introducing a wavefront data fetcher. Supports simple metric based queries as well as raw queries. --------- Signed-off-by: Avik Basu Co-authored-by: Avik Basu --- numalogic/connectors/__init__.py | 2 + numalogic/connectors/wavefront.py | 159 ++++++++++++++++++++++++ numalogic/tools/exceptions.py | 6 + poetry.lock | 19 ++- pyproject.toml | 1 + tests/connectors/test_wf.py | 197 ++++++++++++++++++++++++++++++ 6 files changed, 383 insertions(+), 1 deletion(-) create mode 100644 numalogic/connectors/wavefront.py create mode 100644 tests/connectors/test_wf.py diff --git a/numalogic/connectors/__init__.py b/numalogic/connectors/__init__.py index 8c8e2d19..2ca81b57 100644 --- a/numalogic/connectors/__init__.py +++ b/numalogic/connectors/__init__.py @@ -11,6 +11,7 @@ RDSFetcherConf, ) from numalogic.connectors.prometheus import PrometheusFetcher +from numalogic.connectors.wavefront import WavefrontFetcher __all__ = [ "RedisConf", @@ -23,6 +24,7 @@ "RDSFetcher", "RDSConf", "RDSFetcherConf", + "WavefrontFetcher", ] if find_spec("boto3"): diff --git a/numalogic/connectors/wavefront.py b/numalogic/connectors/wavefront.py new file mode 100644 index 00000000..703165eb --- /dev/null +++ b/numalogic/connectors/wavefront.py @@ -0,0 +1,159 @@ +import os +from datetime import datetime +from typing import Optional + +import pandas as pd +from wavefront_api_client import Configuration, QueryApi, ApiClient + +from numalogic.connectors._base import DataFetcher +from numalogic.tools.exceptions import WavefrontFetcherError + +import logging + +LOGGER = logging.getLogger(__name__) + + +class WavefrontFetcher(DataFetcher): + """ + Fetches data from Wavefront. + + Args: + url (str): Wavefront URL. + api_token (str): Wavefront API token. + + Raises + ------ + ValueError: If API token is not provided. + WavefrontFetcherError: If there is an error fetching data from Wavefront. + """ + + def __init__(self, url: str, api_token: Optional[str] = None): + super().__init__(url) + api_token = api_token or os.getenv("WAVEFRONT_API_TOKEN") + if not api_token: + raise ValueError("WAVEFRONT API token is not provided") + configuration = Configuration() + configuration.host = url + configuration.api_key["X-AUTH-TOKEN"] = api_token + self.api_client = QueryApi( + ApiClient( + configuration, + header_name="Authorization", + header_value=f"Bearer {api_token}", + ) + ) + + def _call_api( + self, query: str, start: int, end: Optional[int], granularity: str + ) -> pd.DataFrame: + """Calls the Wavefront API to fetch data.""" + return self.api_client.query_api( + query, start, granularity, e=end, include_obsolete_metrics=True, use_raw_qk=True + ) + + @staticmethod + def _format_results(res: dict) -> pd.DataFrame: + """Validates and formats the results from the API.""" + if res.get("error_type") is not None: + raise WavefrontFetcherError( + f"Error fetching data from Wavefront: " + f"{res.get('error_type')}: {res.get('error_message')}" + ) + if res.get("timeseries") is None: + raise WavefrontFetcherError("No timeseries data found for the query") + dfs = [] + for ts in res["timeseries"]: + dfs.append(pd.DataFrame(ts["data"], columns=["timestamp", "value"])) + df = pd.concat(dfs) + df["timestamp"] = pd.to_datetime(df["timestamp"], unit="s") + return df.set_index("timestamp").sort_index() + + def fetch( + self, + metric: str, + start: datetime, + filters: Optional[dict] = None, + end: Optional[datetime] = None, + granularity: str = "m", + ) -> pd.DataFrame: + """ + Fetches data from Wavefront as a single metric. + + Args: + metric (str): Metric to fetch. Example: 'system.cpu.usage'. + Do not include the 'ts()' function. + start (datetime): Start time. + filters (dict): Filters to apply to the query. + end (datetime): End time. Set to None to fetch data until now. + granularity (str): Granularity of the data. Default is 'm' (minute). + + Returns + ------- + Dataframe with the fetched data in the format: timestamp (index), value (column). + + Raises + ------ + WavefrontFetcherError: If there is an error fetching data from Wavefront + """ + start = int(start.timestamp()) + if end: + end = int(end.timestamp()) + if filters: + _filters = " and ".join([f'{key}="{value}"' for key, value in filters.items()]) + query = f"ts({metric}, {_filters})" + else: + query = f"ts({metric}" + LOGGER.info("Fetching data from Wavefront for query: %s", query) + res = self._call_api(query, start, end, granularity) + return self._format_results(res.to_dict()) + + def raw_fetch( + self, + query: str, + start: datetime, + filters: Optional[dict] = None, + end: Optional[datetime] = None, + granularity: str = "m", + ) -> pd.DataFrame: + """ + Fetches data from Wavefront using a raw query, allowing for more complex queries. + + Args: + query (str): Raw query to fetch data. + start (datetime): Start time. + filters (dict): Filters to apply to the query. + end (datetime): End time. Set to None to fetch data until now. + granularity (str): Granularity of the data. Default is 'm' (minute). + + Returns + ------- + Dataframe with the fetched data in the format: timestamp (index), value (column). + + Raises + ------ + WavefrontFetcherError: + - If there is an error fetching data from Wavefront + - If there is a key error in the query. + + >>> from datetime import datetime, timedelta + ... + >>> fetcher = WavefrontFetcher(url="https://miata.wavefront.com", api_token="6spd-manual") + >>> df = fetcher.raw_fetch( + ... query="rawsum(ts(engine.rpm, gear='{gear}' and track='{track}'))", + ... start=datetime.now() - timedelta(minutes=5), + ... filters={"gear": "1", "track": "laguna_seca"}, + ... end=datetime.now(), + ... ) + """ + start = start.timestamp() + if end: + end = end.timestamp() + + try: + query = query.format(**filters) + except KeyError as key_err: + raise WavefrontFetcherError(f"Key error in query: {key_err}") from key_err + + LOGGER.info("Fetching data from Wavefront for query: %s", query) + qres = self._call_api(query, start, granularity, end) + return self._format_results(qres.to_dict()) diff --git a/numalogic/tools/exceptions.py b/numalogic/tools/exceptions.py index bd7d001d..6688f5c8 100644 --- a/numalogic/tools/exceptions.py +++ b/numalogic/tools/exceptions.py @@ -122,3 +122,9 @@ class RDSFetcherError(Exception): """Base class for all exceptions raised by the RDSFetcher class.""" pass + + +class WavefrontFetcherError(Exception): + """Base class for all exceptions raised by the WavefrontFetcher class.""" + + pass diff --git a/poetry.lock b/poetry.lock index c436040c..d0851ea2 100644 --- a/poetry.lock +++ b/poetry.lock @@ -5355,6 +5355,23 @@ platformdirs = ">=3.9.1,<5" docs = ["furo (>=2023.7.26)", "proselint (>=0.13)", "sphinx (>=7.1.2,!=7.3)", "sphinx-argparse (>=0.4)", "sphinxcontrib-towncrier (>=0.2.1a0)", "towncrier (>=23.6)"] test = ["covdefaults (>=2.3)", "coverage (>=7.2.7)", "coverage-enable-subprocess (>=1)", "flaky (>=3.7)", "packaging (>=23.1)", "pytest (>=7.4)", "pytest-env (>=0.8.2)", "pytest-freezer (>=0.4.8)", "pytest-mock (>=3.11.1)", "pytest-randomly (>=3.12)", "pytest-timeout (>=2.1)", "setuptools (>=68)", "time-machine (>=2.10)"] +[[package]] +name = "wavefront-api-client" +version = "2.202.2" +description = "Wavefront REST API Documentation" +optional = false +python-versions = "*" +files = [ + {file = "wavefront-api-client-2.202.2.tar.gz", hash = "sha256:2b5a5820856a85b3a79c2a825267ceb04c50f2058543f8ed710eefa46daf244b"}, + {file = "wavefront_api_client-2.202.2-py3-none-any.whl", hash = "sha256:aac03ad91b26e8abea73b8bfc53015ad27fb2e122783db34fb4b93868739be09"}, +] + +[package.dependencies] +certifi = ">=2017.4.17" +python-dateutil = ">=2.1" +six = ">=1.10" +urllib3 = ">=1.23" + [[package]] name = "wcwidth" version = "0.2.13" @@ -5660,4 +5677,4 @@ redis = ["redis"] [metadata] lock-version = "2.0" python-versions = ">=3.9, <3.13" -content-hash = "006c1d7de481e87646a35f4ef723c641206574b4e04fea4d4474c47d78162b6e" +content-hash = "3c367e71a1173849e67aea4ebfc57e43c31e18934bc3e67750568e5bd9f2f490" diff --git a/pyproject.toml b/pyproject.toml index 4e45d9bc..cbacf5d7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -36,6 +36,7 @@ pynumaflow = "~0.8" prometheus_client = "^0.18.0" structlog = "^24.1.0" numalogic-prometheus = { version = "^0.8", allow-prereleases = true } +wavefront-api-client = "^2.202.2" # extras mlflow-skinny = { version = "^2.0", optional = true } diff --git a/tests/connectors/test_wf.py b/tests/connectors/test_wf.py new file mode 100644 index 00000000..f08b9e16 --- /dev/null +++ b/tests/connectors/test_wf.py @@ -0,0 +1,197 @@ +from copy import copy +from datetime import datetime, timedelta + +import pytest +from wavefront_api_client import QueryResult + +from numalogic.connectors import WavefrontFetcher +from numalogic.tools.exceptions import WavefrontFetcherError + +DUMMY_URL = "https://dummy.wavefront.com" +DUMMY_TOKEN = "dummy_token" +DUMMY_OUT = QueryResult( + **{ + "dimensions": None, + "error_message": None, + "error_type": None, + "events": None, + "granularity": 60, + "name": "ts(iks.namespace.kube.hpa.status.desired.replicas, " + "cluster='fdp-prd-usw2-k8s' and " + "namespace='fdp-documentservice-usw2-prd') - " + "ts(iks.namespace.app.pod.count, cluster='fdp-prd-usw2-k8s' and " + "namespace='fdp-documentservice-usw2-prd')", + "query": "ts(iks.namespace.kube.hpa.status.desired.replicas, " + "cluster='fdp-prd-usw2-k8s' and " + "namespace='fdp-documentservice-usw2-prd') - " + "ts(iks.namespace.app.pod.count, cluster='fdp-prd-usw2-k8s' and " + "namespace='fdp-documentservice-usw2-prd')", + "spans": None, + "stats": { + "buffer_keys": 72, + "cached_compacted_keys": None, + "compacted_keys": 3, + "compacted_points": 357, + "cpu_ns": 398618692, + "distributions": 0, + "edges": 0, + "hosts_used": None, + "keys": 73, + "latency": 413, + "metrics": 427, + "metrics_used": None, + "points": 427, + "queries": 17, + "query_tasks": 0, + "s3_keys": 0, + "skipped_compacted_keys": 4, + "spans": 0, + "summaries": 427, + }, + "timeseries": [ + { + "data": [ + [1726533000.0, 0.0], + [1726533060.0, 0.0], + [1726533120.0, 0.0], + [1726533180.0, 0.0], + [1726533240.0, 0.0], + [1726533300.0, 0.0], + [1726533360.0, 0.0], + [1726533420.0, 0.0], + [1726533480.0, 0.0], + [1726533540.0, 0.0], + [1726533600.0, 0.0], + [1726533660.0, 0.0], + [1726533720.0, 0.0], + [1726533780.0, 0.0], + [1726533840.0, 0.0], + [1726533900.0, 0.0], + [1726533960.0, 0.0], + [1726534020.0, 0.0], + ], + "host": "10.176.157.157:8080", + "label": "iks.namespace.kube.hpa.status.desired.replicas", + "tags": { + "assetId": "4615081310646958673", + "bu": "ip", + "cluster": "fdp-prd-usw2-k8s", + "container": "kube-state-metrics", + "endpoint": "http-metrics", + "env": "prod", + "horizontalpodautoscaler": "document-service-rollout-hpa", + "job": "kube-state-metrics-v2", + "namespace": "fdp-documentservice-usw2-prd", + "pod": "kube-state-metrics-v2-fc68fc5fb-kjzdc", + "prometheus": "addon-metricset-ns/k8s-prometheus", + "prometheus.replica": "prometheus-k8s-prometheus-0", + "service": "kube-state-metrics-v2", + }, + } + ], + "trace_dimensions": [], + "traces": None, + "warnings": None, + } +) + +DUMMY_OUT_ERR = copy(DUMMY_OUT) +DUMMY_OUT_ERR.error_type = "QuerySyntaxError" +DUMMY_OUT_ERR.error_message = "Invalid query" + +DUMMY_OUT_NO_TS = copy(DUMMY_OUT) +DUMMY_OUT_NO_TS.timeseries = None + + +@pytest.fixture +def wavefront_fetcher(): + return WavefrontFetcher( + url=DUMMY_URL, + api_token=DUMMY_TOKEN, + ) + + +def test_init(): + with pytest.raises(ValueError): + WavefrontFetcher(url=DUMMY_URL) + + +def test_fetch_01(wavefront_fetcher, mocker): + mocker.patch.object(wavefront_fetcher, "_call_api", return_value=DUMMY_OUT) + + df = wavefront_fetcher.fetch( + metric="iks.namespace.kube.hpa.status.desired.replicas", + start=datetime.now() - timedelta(days=1), + filters={"cluster": "fdp-prd-usw2-k8s", "namespace": "fdp-documentservice-usw2-prd"}, + end=datetime.now(), + ) + assert df.shape == (18, 1) + assert df.columns == ["value"] + assert df.index.name == "timestamp" + + +def test_fetch_02(wavefront_fetcher, mocker): + mocker.patch.object(wavefront_fetcher, "_call_api", return_value=DUMMY_OUT) + + df = wavefront_fetcher.fetch( + metric="iks.namespace.kube.hpa.status.desired.replicas", + start=datetime.now() - timedelta(days=1), + end=datetime.now(), + ) + assert df.shape == (18, 1) + assert df.columns == ["value"] + assert df.index.name == "timestamp" + + +def test_raw_fetch(wavefront_fetcher, mocker): + mocker.patch.object(wavefront_fetcher, "_call_api", return_value=DUMMY_OUT) + + df = wavefront_fetcher.raw_fetch( + query="ts(iks.namespace.kube.hpa.status.desired.replicas, cluster='{cluster}' and " + "namespace='{namespace}') - ts(iks.namespace.app.pod.count, cluster='{cluster}' and " + "namespace='{namespace}')", + start=datetime.now() - timedelta(minutes=5), + filters={"cluster": "fdp-prd-usw2-k8s", "namespace": "fdp-documentservice-usw2-prd"}, + end=datetime.now(), + ) + assert df.shape == (18, 1) + assert df.columns == ["value"] + assert df.index.name == "timestamp" + + +def test_fetch_err_01(wavefront_fetcher, mocker): + mocker.patch.object(wavefront_fetcher, "_call_api", return_value=DUMMY_OUT_ERR) + + with pytest.raises(WavefrontFetcherError): + wavefront_fetcher.fetch( + metric="some_metric", + start=datetime.now() - timedelta(days=1), + filters={"cluster": "some_cluster", "namespace": "some_ns"}, + end=datetime.now(), + ) + + +def test_fetch_err_02(wavefront_fetcher, mocker): + mocker.patch.object(wavefront_fetcher, "_call_api", return_value=DUMMY_OUT_NO_TS) + + with pytest.raises(WavefrontFetcherError): + wavefront_fetcher.fetch( + metric="some_metric", + start=datetime.now() - timedelta(days=1), + filters={"cluster": "some_cluster", "namespace": "some_ns"}, + end=datetime.now(), + ) + + +def test_raw_fetch_err(wavefront_fetcher, mocker): + mocker.patch.object(wavefront_fetcher, "_call_api", return_value=DUMMY_OUT) + + with pytest.raises(WavefrontFetcherError): + wavefront_fetcher.raw_fetch( + query="ts(iks.namespace.kube.hpa.status.desired.replicas, cluster='{cluster}' and " + "namespace='{namespace}') - ts(iks.namespace.app.pod.count, cluster='{cluster}' and " + "namespace='{namespace}')", + start=datetime.now() - timedelta(minutes=5), + filters={"randomkey": "fdp-prd-usw2-k8s", "namespace": "fdp-documentservice-usw2-prd"}, + end=datetime.now(), + )