Skip to content

Commit

Permalink
Feature/generic udfs (#230)
Browse files Browse the repository at this point in the history
Signed-off-by: Avik Basu <ab93@users.noreply.github.com>
  • Loading branch information
ab93 authored Jul 13, 2023
1 parent b4524c8 commit b054c4e
Show file tree
Hide file tree
Showing 13 changed files with 732 additions and 0 deletions.
Binary file added docs/assets/anomalydetection.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
45 changes: 45 additions & 0 deletions numalogic/config/_conn.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
from dataclasses import dataclass, field


@dataclass
class PrometheusConf:
server: str
pushgateway: str


@dataclass
class RedisConf:
host: str
port: int
expiry: int = 300
master_name: str = "mymaster"


@dataclass
class DruidConf:
url: str
endpoint: str


@dataclass
class Pivot:
index: str = "timestamp"
columns: list[str] = field(default_factory=list)
value: list[str] = field(default_factory=lambda: ["count"])


@dataclass
class DruidFetcherConf:
from pydruid.utils.aggregators import doublesum

datasource: str
dimensions: list[str] = field(default_factory=list)
aggregations: dict = field(default_factory=dict)
group_by: list[str] = field(default_factory=list)
pivot: Pivot = field(default_factory=lambda: Pivot())
granularity: str = "minute"
hours: float = 36

def __post_init__(self):
if not self.aggregations:
self.aggregations = {"count": self.doublesum("count")}
65 changes: 65 additions & 0 deletions numalogic/config/_metric.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
from dataclasses import dataclass, field
from enum import Enum

from omegaconf import MISSING

from numalogic.config._conn import DruidFetcherConf, RedisConf, DruidConf, PrometheusConf
from numalogic.config._numa import NumalogicConf


@dataclass
class UnifiedConf:
strategy: str = "max"
weights: list[float] = field(default_factory=list)


@dataclass
class ReTrainConf:
train_hours: int = 36
min_train_size: int = 2000
retrain_freq_hr: int = 8
resume_training: bool = False


@dataclass
class StaticThresholdConf:
upper_limit: int = 3
weight: float = 0.0


@dataclass
class MetricConf:
metric: str
retrain_conf: ReTrainConf = field(default_factory=lambda: ReTrainConf())
static_threshold: StaticThresholdConf = field(default_factory=lambda: StaticThresholdConf())
numalogic_conf: NumalogicConf = MISSING


class DataSource(str, Enum):
PROMETHEUS = "prometheus"
DRUID = "druid"


@dataclass
class DataStreamConf:
name: str = "default"
source: str = DataSource.PROMETHEUS.value
window_size: int = 12
composite_keys: list[str] = field(default_factory=list)
metrics: list[str] = field(default_factory=list)
metric_configs: list[MetricConf] = field(default_factory=list)
unified_config: UnifiedConf = field(default_factory=lambda: UnifiedConf())
druid_fetcher: DruidFetcherConf = MISSING


@dataclass
class Configs:
configs: list[DataStreamConf]


@dataclass
class PipelineConf:
redis_conf: RedisConf
# registry_conf: RegistryConf
prometheus_conf: PrometheusConf = MISSING
druid_conf: DruidConf = MISSING
79 changes: 79 additions & 0 deletions numalogic/config/_numa.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
# Copyright 2022 The Numaproj Authors.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


from dataclasses import dataclass, field
from typing import Optional, Any

from omegaconf import MISSING


@dataclass
class ModelInfo:
"""Schema for defining the model/estimator.
Args:
----
name: name of the model; this should map to a supported list of models
mentioned in the factory file
conf: kwargs for instantiating the model class
stateful: flag indicating if the model is stateful or not
"""

name: str = MISSING
conf: dict[str, Any] = field(default_factory=dict)
stateful: bool = True


@dataclass
class RegistryInfo:
"""Registry config base class.
Args:
----
name: name of the registry
conf: kwargs for instantiating the model class
"""

name: str = MISSING
conf: dict[str, Any] = field(default_factory=dict)


@dataclass
class LightningTrainerConf:
"""Schema for defining the Pytorch Lightning trainer behavior.
More details on the arguments are provided here:
https://pytorch-lightning.readthedocs.io/en/stable/common/trainer.html#trainer-class-api
"""

accelerator: str = "auto"
max_epochs: int = 100
logger: bool = False
check_val_every_n_epoch: int = 5
log_every_n_steps: int = 20
enable_checkpointing: bool = False
enable_progress_bar: bool = True
enable_model_summary: bool = True
limit_val_batches: bool = 0
callbacks: Optional[Any] = None


@dataclass
class NumalogicConf:
"""Top level config schema for numalogic."""

model: ModelInfo = field(default_factory=ModelInfo)
trainer: LightningTrainerConf = field(default_factory=LightningTrainerConf)
registry: RegistryInfo = field(default_factory=RegistryInfo)
preprocess: list[ModelInfo] = field(default_factory=list)
threshold: ModelInfo = field(default_factory=ModelInfo)
postprocess: ModelInfo = field(default_factory=ModelInfo)
4 changes: 4 additions & 0 deletions numalogic/connectors/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from numalogic.connectors.druid import DruidFetcher
from numalogic.connectors.prometheus import PrometheusDataFetcher

__all__ = ["DruidFetcher", "PrometheusDataFetcher"]
149 changes: 149 additions & 0 deletions numalogic/numaflow/entities.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
from copy import copy
from dataclasses import dataclass, field
from enum import Enum
from typing import Any, Union, TypeVar

import numpy as np
import numpy.typing as npt
import orjson
import pandas as pd

Vector = list[float]
Matrix = Union[Vector, list[Vector], npt.NDArray[float]]


class Status(str, Enum):
"""Status is the enum that is used to identify the status of the payload."""

RAW = "raw"
EXTRACTED = "extracted"
PRE_PROCESSED = "pre_processed"
INFERRED = "inferred"
THRESHOLD = "threshold_complete"
POST_PROCESSED = "post_processed"
ARTIFACT_NOT_FOUND = "artifact_not_found"
ARTIFACT_STALE = "artifact_is_stale"
RUNTIME_ERROR = "runtime_error"


class Header(str, Enum):
"""Header is the enum that is used to identify the type of payload."""

STATIC_INFERENCE = "static_threshold"
MODEL_INFERENCE = "model_inference"
TRAIN_REQUEST = "request_training"
MODEL_STALE = "model_stale"


@dataclass
class _BasePayload:
"""_BasePayload is the base data structure that is passed."""

uuid: str
composite_keys: list[str]


PayloadType = TypeVar("PayloadType", bound=_BasePayload)


@dataclass
class TrainerPayload(_BasePayload):
"""
TrainerPayload is the data structure that is passed
around in the system when a training request is made.
"""

metrics: list[str]
header: Header = Header.TRAIN_REQUEST

def to_json(self):
return orjson.dumps(self)


@dataclass(repr=False)
class StreamPayload(_BasePayload):
"""StreamPayload is the main data structure that is passed around in the system."""

data: Matrix
raw_data: Matrix
metrics: list[str]
timestamps: list[int]
status: dict[str, Status] = field(default_factory=dict)
header: dict[str, Header] = field(default_factory=dict)
metadata: dict[str, dict[str, Any]] = field(default_factory=dict)

def get_df(self, original=False) -> pd.DataFrame:
return pd.DataFrame(self.get_data(original), columns=self.metrics)

def set_data(self, arr: Matrix) -> None:
self.data = arr

def set_metric_data(self, metric: str, arr: Matrix) -> None:
_df = self.get_df().copy()
_df[metric] = arr
self.set_data(np.asarray(_df.values.tolist()))

def get_metric_arr(self, metric: str) -> npt.NDArray[float]:
return self.get_df()[metric].values

def get_data(self, original=False) -> npt.NDArray[float]:
if original:
return np.asarray(self.raw_data)
return np.asarray(self.data)

def set_status(self, metric: str, status: Status) -> None:
self.status[metric] = status

def set_header(self, metric: str, header: Header) -> None:
self.header[metric] = header

def set_metric_metadata(self, metric: str, key: str, value) -> None:
if metric in self.metadata.keys():
self.metadata[metric][key] = value
else:
self.metadata[metric] = {key: value}

def set_metadata(self, key: str, value) -> None:
self.metadata[key] = value

def get_metadata(self, key: str) -> dict[str, Any]:
return copy(self.metadata[key])

def __repr__(self) -> str:
"""Return a string representation of the object."""
return "header: {}, status: {}, composite_keys: {}, data: {}, metadata: {}}}".format(
self.header,
self.status,
self.composite_keys,
list(self.data),
self.metadata,
)

def to_json(self):
return orjson.dumps(self, option=orjson.OPT_SERIALIZE_NUMPY)


@dataclass
class InputPayload:
"""Input payload."""

start_time: int
end_time: int
data: list[dict[str, Any]]
metadata: dict[str, Any]

def to_json(self):
return orjson.dumps(self, option=orjson.OPT_SERIALIZE_NUMPY)


@dataclass
class OutputPayload:
"""Output payload."""

timestamp: int
unified_anomaly: float
data: dict[str, Any]
metadata: dict[str, Any]

def to_json(self):
return orjson.dumps(self, option=orjson.OPT_SERIALIZE_NUMPY)
Empty file added numalogic/numaflow/server.py
Empty file.
4 changes: 4 additions & 0 deletions numalogic/numaflow/udf/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from numalogic.numaflow.udf.blockinfer import InferenceBlockUDF
from numalogic.numaflow.udf.block_train import TrainBlockUDF

__all__ = ["InferenceBlockUDF", "TrainBlockUDF"]
Loading

0 comments on commit b054c4e

Please sign in to comment.