-
Notifications
You must be signed in to change notification settings - Fork 28
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Signed-off-by: Avik Basu <ab93@users.noreply.github.com>
- Loading branch information
Showing
9 changed files
with
530 additions
and
0 deletions.
There are no files selected for viewing
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,45 @@ | ||
from dataclasses import dataclass, field | ||
|
||
|
||
@dataclass | ||
class PrometheusConf: | ||
server: str | ||
pushgateway: str | ||
|
||
|
||
@dataclass | ||
class RedisConf: | ||
host: str | ||
port: int | ||
expiry: int = 300 | ||
master_name: str = "mymaster" | ||
|
||
|
||
@dataclass | ||
class DruidConf: | ||
url: str | ||
endpoint: str | ||
|
||
|
||
@dataclass | ||
class Pivot: | ||
index: str = "timestamp" | ||
columns: list[str] = field(default_factory=list) | ||
value: list[str] = field(default_factory=lambda: ["count"]) | ||
|
||
|
||
@dataclass | ||
class DruidFetcherConf: | ||
from pydruid.utils.aggregators import doublesum | ||
|
||
datasource: str | ||
dimensions: list[str] = field(default_factory=list) | ||
aggregations: dict = field(default_factory=dict) | ||
group_by: list[str] = field(default_factory=list) | ||
pivot: Pivot = field(default_factory=lambda: Pivot()) | ||
granularity: str = "minute" | ||
hours: float = 36 | ||
|
||
def __post_init__(self): | ||
if not self.aggregations: | ||
self.aggregations = {"count": self.doublesum("count")} | ||
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,65 @@ | ||
from dataclasses import dataclass, field | ||
from enum import Enum | ||
|
||
from omegaconf import MISSING | ||
|
||
from numalogic.config._conn import DruidFetcherConf, RedisConf, DruidConf, PrometheusConf | ||
from numalogic.config._numa import NumalogicConf | ||
|
||
|
||
@dataclass | ||
class UnifiedConf: | ||
strategy: str = "max" | ||
weights: list[float] = field(default_factory=list) | ||
|
||
|
||
@dataclass | ||
class ReTrainConf: | ||
train_hours: int = 36 | ||
min_train_size: int = 2000 | ||
retrain_freq_hr: int = 8 | ||
resume_training: bool = False | ||
|
||
|
||
@dataclass | ||
class StaticThresholdConf: | ||
upper_limit: int = 3 | ||
weight: float = 0.0 | ||
|
||
|
||
@dataclass | ||
class MetricConf: | ||
metric: str | ||
retrain_conf: ReTrainConf = field(default_factory=lambda: ReTrainConf()) | ||
static_threshold: StaticThresholdConf = field(default_factory=lambda: StaticThresholdConf()) | ||
numalogic_conf: NumalogicConf = MISSING | ||
|
||
|
||
class DataSource(str, Enum): | ||
PROMETHEUS = "prometheus" | ||
DRUID = "druid" | ||
|
||
|
||
@dataclass | ||
class DataStreamConf: | ||
name: str = "default" | ||
source: str = DataSource.PROMETHEUS.value | ||
window_size: int = 12 | ||
composite_keys: list[str] = field(default_factory=list) | ||
metrics: list[str] = field(default_factory=list) | ||
metric_configs: list[MetricConf] = field(default_factory=list) | ||
unified_config: UnifiedConf = field(default_factory=lambda: UnifiedConf()) | ||
druid_fetcher: DruidFetcherConf = MISSING | ||
|
||
|
||
@dataclass | ||
class Configs: | ||
configs: list[DataStreamConf] | ||
|
||
|
||
@dataclass | ||
class PipelineConf: | ||
redis_conf: RedisConf | ||
# registry_conf: RegistryConf | ||
prometheus_conf: PrometheusConf = MISSING | ||
druid_conf: DruidConf = MISSING | ||
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,79 @@ | ||
# Copyright 2022 The Numaproj Authors. | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
|
||
|
||
from dataclasses import dataclass, field | ||
from typing import Optional, Any | ||
|
||
from omegaconf import MISSING | ||
|
||
|
||
@dataclass | ||
class ModelInfo: | ||
"""Schema for defining the model/estimator. | ||
Args: | ||
---- | ||
name: name of the model; this should map to a supported list of models | ||
mentioned in the factory file | ||
conf: kwargs for instantiating the model class | ||
stateful: flag indicating if the model is stateful or not | ||
""" | ||
|
||
name: str = MISSING | ||
conf: dict[str, Any] = field(default_factory=dict) | ||
stateful: bool = True | ||
|
||
|
||
@dataclass | ||
class RegistryInfo: | ||
"""Registry config base class. | ||
Args: | ||
---- | ||
name: name of the registry | ||
conf: kwargs for instantiating the model class | ||
""" | ||
|
||
name: str = MISSING | ||
conf: dict[str, Any] = field(default_factory=dict) | ||
|
||
|
||
@dataclass | ||
class LightningTrainerConf: | ||
"""Schema for defining the Pytorch Lightning trainer behavior. | ||
More details on the arguments are provided here: | ||
https://pytorch-lightning.readthedocs.io/en/stable/common/trainer.html#trainer-class-api | ||
""" | ||
|
||
accelerator: str = "auto" | ||
max_epochs: int = 100 | ||
logger: bool = False | ||
check_val_every_n_epoch: int = 5 | ||
log_every_n_steps: int = 20 | ||
enable_checkpointing: bool = False | ||
enable_progress_bar: bool = True | ||
enable_model_summary: bool = True | ||
limit_val_batches: bool = 0 | ||
callbacks: Optional[Any] = None | ||
|
||
|
||
@dataclass | ||
class NumalogicConf: | ||
"""Top level config schema for numalogic.""" | ||
|
||
model: ModelInfo = field(default_factory=ModelInfo) | ||
trainer: LightningTrainerConf = field(default_factory=LightningTrainerConf) | ||
registry: RegistryInfo = field(default_factory=RegistryInfo) | ||
preprocess: list[ModelInfo] = field(default_factory=list) | ||
threshold: ModelInfo = field(default_factory=ModelInfo) | ||
postprocess: ModelInfo = field(default_factory=ModelInfo) | ||
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
from numalogic.connectors.druid import DruidFetcher | ||
from numalogic.connectors.prometheus import PrometheusDataFetcher | ||
|
||
__all__ = ["DruidFetcher", "PrometheusDataFetcher"] | ||
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
# Anomaly Detection | ||
|
||
![anomalydetection](../../docs/assets/anomalydetection.png) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,149 @@ | ||
from copy import copy | ||
from dataclasses import dataclass, field | ||
from enum import Enum | ||
from typing import Any, Union, TypeVar | ||
|
||
import numpy as np | ||
import numpy.typing as npt | ||
import orjson | ||
import pandas as pd | ||
|
||
Vector = list[float] | ||
Matrix = Union[Vector, list[Vector], npt.NDArray[float]] | ||
|
||
|
||
class Status(str, Enum): | ||
"""Status is the enum that is used to identify the status of the payload.""" | ||
|
||
RAW = "raw" | ||
EXTRACTED = "extracted" | ||
PRE_PROCESSED = "pre_processed" | ||
INFERRED = "inferred" | ||
THRESHOLD = "threshold_complete" | ||
POST_PROCESSED = "post_processed" | ||
ARTIFACT_NOT_FOUND = "artifact_not_found" | ||
ARTIFACT_STALE = "artifact_is_stale" | ||
RUNTIME_ERROR = "runtime_error" | ||
|
||
|
||
class Header(str, Enum): | ||
"""Header is the enum that is used to identify the type of payload.""" | ||
|
||
STATIC_INFERENCE = "static_threshold" | ||
MODEL_INFERENCE = "model_inference" | ||
TRAIN_REQUEST = "request_training" | ||
MODEL_STALE = "model_stale" | ||
|
||
|
||
@dataclass | ||
class _BasePayload: | ||
"""_BasePayload is the base data structure that is passed.""" | ||
|
||
uuid: str | ||
composite_keys: list[str] | ||
|
||
|
||
PayloadType = TypeVar("PayloadType", bound=_BasePayload) | ||
|
||
|
||
@dataclass | ||
class TrainerPayload(_BasePayload): | ||
""" | ||
TrainerPayload is the data structure that is passed | ||
around in the system when a training request is made. | ||
""" | ||
|
||
metric: str | ||
header: Header = Header.TRAIN_REQUEST | ||
|
||
def to_json(self): | ||
return orjson.dumps(self) | ||
|
||
|
||
@dataclass(repr=False) | ||
class StreamPayload(_BasePayload): | ||
"""StreamPayload is the main data structure that is passed around in the system.""" | ||
|
||
data: Matrix | ||
raw_data: Matrix | ||
metrics: list[str] | ||
timestamps: list[int] | ||
status: dict[str, Status] = field(default_factory=dict) | ||
header: dict[str, Header] = field(default_factory=dict) | ||
metadata: dict[str, dict[str, Any]] = field(default_factory=dict) | ||
|
||
def get_df(self, original=False) -> pd.DataFrame: | ||
return pd.DataFrame(self.get_data(original), columns=self.metrics) | ||
|
||
def set_data(self, arr: Matrix) -> None: | ||
self.data = arr | ||
|
||
def set_metric_data(self, metric: str, arr: Matrix) -> None: | ||
_df = self.get_df().copy() | ||
_df[metric] = arr | ||
self.set_data(np.asarray(_df.values.tolist())) | ||
|
||
def get_metric_arr(self, metric: str) -> npt.NDArray[float]: | ||
return self.get_df()[metric].values | ||
|
||
def get_data(self, original=False) -> npt.NDArray[float]: | ||
if original: | ||
return np.asarray(self.raw_data) | ||
return np.asarray(self.data) | ||
|
||
def set_status(self, metric: str, status: Status) -> None: | ||
self.status[metric] = status | ||
|
||
def set_header(self, metric: str, header: Header) -> None: | ||
self.header[metric] = header | ||
|
||
def set_metric_metadata(self, metric: str, key: str, value) -> None: | ||
if metric in self.metadata.keys(): | ||
self.metadata[metric][key] = value | ||
else: | ||
self.metadata[metric] = {key: value} | ||
|
||
def set_metadata(self, key: str, value) -> None: | ||
self.metadata[key] = value | ||
|
||
def get_metadata(self, key: str) -> dict[str, Any]: | ||
return copy(self.metadata[key]) | ||
|
||
def __repr__(self) -> str: | ||
"""Return a string representation of the object.""" | ||
return "header: {}, status: {}, composite_keys: {}, data: {}, metadata: {}}}".format( | ||
self.header, | ||
self.status, | ||
self.composite_keys, | ||
list(self.data), | ||
self.metadata, | ||
) | ||
|
||
def to_json(self): | ||
return orjson.dumps(self, option=orjson.OPT_SERIALIZE_NUMPY) | ||
|
||
|
||
@dataclass | ||
class InputPayload: | ||
"""Input payload.""" | ||
|
||
start_time: int | ||
end_time: int | ||
data: list[dict[str, Any]] | ||
metadata: dict[str, Any] | ||
|
||
def to_json(self): | ||
return orjson.dumps(self, option=orjson.OPT_SERIALIZE_NUMPY) | ||
|
||
|
||
@dataclass | ||
class OutputPayload: | ||
"""Output payload.""" | ||
|
||
timestamp: int | ||
unified_anomaly: float | ||
data: dict[str, Any] | ||
metadata: dict[str, Any] | ||
|
||
def to_json(self): | ||
return orjson.dumps(self, option=orjson.OPT_SERIALIZE_NUMPY) | ||
Empty file.
Oops, something went wrong.