Skip to content

Commit

Permalink
Merge pull request #105 from TeamEpochGithub/v0.2
Browse files Browse the repository at this point in the history
V0.2
  • Loading branch information
tolgakopar authored Apr 2, 2024
2 parents 896ee00 + 4c09b30 commit 6501a33
Show file tree
Hide file tree
Showing 21 changed files with 1,022 additions and 303 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/main-branch-testing.yml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ jobs:
venv/bin/python -m pip install --upgrade pip
venv/bin/python -m pip install pytest
venv/bin/python -m pip install -r requirements.txt
venv/bin/python -m pip install pytest-cov coverage
- name: Test with pytest
run: |
venv/bin/python -m pytest tests
venv/bin/python -m pytest --cov=epochalyst --cov-branch --cov-fail-under=95 tests
3 changes: 2 additions & 1 deletion .github/workflows/version-branch-testing.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,11 @@ jobs:
venv/bin/python -m pip install --upgrade pip
venv/bin/python -m pip install pytest
venv/bin/python -m pip install -r requirements.txt
venv/bin/python -m pip install pytest-cov coverage
- name: Test with pytest
run: |
venv/bin/python -m pytest tests
venv/bin/python -m pytest --cov=epochalyst --cov-branch --cov-fail-under=95 tests
build:
runs-on: ubuntu-latest
Expand Down
1 change: 1 addition & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -84,4 +84,5 @@ repos:
- einops
- pytest
- agogos
- polars
args: [ --disallow-any-generics, --disallow-untyped-defs, --disable-error-code=import-untyped]
13 changes: 12 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,20 @@ poetry add epochalyst
To generate pytest coverage report run

```python
python -m pytest --cov=epochalyst --cov-report=html:coverage_re
python -m pytest --cov=epochalyst --cov-branch --cov-report=html:coverage_re
```

## Imports

### Caching

For caching some imports are only required, these have to be manually installed when needed
- dask >= 2023.12.0 & dask-expr
- pandas >= 1.3.0
- polars
- pyarrow >= 6.0.0 (Read parquet files)
- annotated-types >= 0.6.0

## Documentation

Documentation is generated using [Sphinx](https://www.sphinx-doc.org/en/master/).
Expand Down
93 changes: 62 additions & 31 deletions epochalyst/_core/_caching/_cacher.py
Original file line number Diff line number Diff line change
@@ -1,46 +1,69 @@
import glob
import os
import pickle
from epochalyst._core._logging._logger import _Logger
from typing import Any
from typing import Any, TypedDict, Literal

import dask.array as da
import dask.dataframe as dd
import numpy as np
import os
import pandas as pd
import polars as pl

from epochalyst._core._logging._logger import _Logger


class _Cacher(_Logger):
"""The cacher is a flexible class that allows for caching of any data.
class _CacheArgs(TypedDict):
"""The cache arguments.
The cacher uses cache_args to determine if the data is already cached and if so, return the cached data.
cache_args is a dictionary that contains the arguments to determine if the data is already cached. Currently listed cache_args are
supported if more are required create a new issue on the github repository.
Currently listed cache_args are supported. If more are required, create a new GitHub issue.
cache_args supports the following keys:
The following keys are supported:
- output_data_type: The type of the output data.
- "dask_array": The output data is a dask array.
- "numpy_array": The output data is a numpy array.
- "pandas_dataframe": The output data is a pandas dataframe.
- "dask_dataframe": The output data is a dask dataframe.
- "dask_array": The output data is a Dask array.
- "numpy_array": The output data is a NumPy array.
- "pandas_dataframe": The output data is a Pandas dataframe.
- "dask_dataframe": The output data is a Dask dataframe.
- "polars_dataframe": The output data is a Polars dataframe.
- storage_type: The type of the storage.
- ".npy": The storage type is a numpy file.
- ".parquet": The storage type is a parquet file.
- ".csv": The storage type is a csv file.
- ".npy_stack": The storage type is a numpy stack.
- ".npy": The storage type is a NumPy file.
- ".parquet": The storage type is a Parquet file.
- ".csv": The storage type is a CSV file.
- ".npy_stack": The storage type is a NumPy stack.
- ".pkl": The storage type is a pickle file
- storage_path: The path to the storage.
### Methods:
```python
def _cache_exists(name: str, cache_args: dict[str, Any] = {}) -> bool: # Check if the cache exists
:param output_data_type: The type of the output data.
:param storage_type: The type of the storage.
:param storage_path: The path to the storage.
"""

def _get_cache(name: str, cache_args: dict[str, Any] = {}) -> Any: # Load the cache
output_data_type: Literal[
"dask_array",
"numpy_array",
"pandas_dataframe",
"dask_dataframe",
"polars_dataframe",
]
storage_type: Literal[".npy", ".parquet", ".csv", ".npy_stack", ".pkl"]
storage_path: str # TODO(Jeffrey) Allow str | bytes | os.PathLike[str] | os.PathLike[bytes] instead of just str

def _store_cache(name: str, data: Any, cache_args: dict[str, Any] = {}) -> None: # Store data
```

class _Cacher(_Logger):
"""The cacher is a flexible class that allows for caching of any data.
The cacher uses cache_args to determine if the data is already cached and if so, return the cached data.
cache_args is a dictionary that contains the arguments to determine if the data is already cached.
Methods:
.. code-block:: python
def _cache_exists(name: str, cache_args: _CacheArgs | None = None) -> bool: # Check if the cache exists
def _get_cache(name: str, cache_args: _CacheArgs | None = None) -> Any: # Load the cache
def _store_cache(name: str, data: Any, cache_args: _CacheArgs | None = None) -> None: # Store data
"""

def _cache_exists(self, name: str, cache_args: dict[str, Any] = {}) -> bool:
def _cache_exists(self, name: str, cache_args: _CacheArgs | None = None) -> bool:
"""Check if the cache exists.
:param cache_args: The cache arguments.
Expand Down Expand Up @@ -88,7 +111,7 @@ def _cache_exists(self, name: str, cache_args: dict[str, Any] = {}) -> bool:

return path_exists

def _get_cache(self, name: str, cache_args: dict[str, Any] = {}) -> Any:
def _get_cache(self, name: str, cache_args: _CacheArgs | None = None) -> Any:
"""Load the cache.
:param name: The name of the cache.
Expand Down Expand Up @@ -144,12 +167,14 @@ def _get_cache(self, name: str, cache_args: dict[str, Any] = {}) -> Any:
return pd.read_parquet(storage_path + name + ".parquet").to_numpy()
elif output_data_type == "dask_array":
return dd.read_parquet(storage_path + name + ".parquet").to_dask_array()
elif output_data_type == "polars_dataframe":
return pl.read_parquet(storage_path + name + ".parquet")
else:
self.log_to_debug(
f"Invalid output data type: {output_data_type}, for loading .parquet file."
)
raise ValueError(
"output_data_type must be pandas_dataframe, dask_dataframe, numpy_array or dask_array, other types not supported yet"
"output_data_type must be pandas_dataframe, dask_dataframe, numpy_array, dask_array, or polars_dataframe, other types not supported yet"
)
elif storage_type == ".csv":
# Check if output_data_type is supported and load cache to output_data_type
Expand All @@ -158,12 +183,14 @@ def _get_cache(self, name: str, cache_args: dict[str, Any] = {}) -> Any:
return pd.read_csv(storage_path + name + ".csv")
elif output_data_type == "dask_dataframe":
return dd.read_csv(storage_path + name + "/*.part")
elif output_data_type == "polars_dataframe":
return pl.read_csv(storage_path + name + ".csv")
else:
self.log_to_debug(
f"Invalid output data type: {output_data_type}, for loading .csv file."
)
raise ValueError(
"output_data_type must be pandas_dataframe or dask_dataframe, other types not supported yet"
"output_data_type must be pandas_dataframe, dask_dataframe, or polars_dataframe, other types not supported yet"
)
elif storage_type == ".npy_stack":
# Check if output_data_type is supported and load cache to output_data_type
Expand All @@ -186,11 +213,11 @@ def _get_cache(self, name: str, cache_args: dict[str, Any] = {}) -> Any:
else:
self.log_to_debug(f"Invalid storage type: {storage_type}")
raise ValueError(
"storage_type must be .npy, .parquet, .csv or .npy_stack, other types not supported yet"
"storage_type must be .npy, .parquet, .csv, or .npy_stack, other types not supported yet"
)

def _store_cache(
self, name: str, data: Any, cache_args: dict[str, Any] = {}
self, name: str, data: Any, cache_args: _CacheArgs | None = None
) -> None:
"""Store one set of data.
Expand Down Expand Up @@ -251,12 +278,14 @@ def _store_cache(
columns={col: str(col) for col in new_dd.columns}
)
new_dd.to_parquet(storage_path + name + ".parquet")
elif output_data_type == "polars_dataframe":
data.write_parquet(storage_path + name + ".parquet")
else:
self.log_to_debug(
f"Invalid output data type: {output_data_type}, for storing .parquet file."
)
raise ValueError(
"output_data_type must be pandas_dataframe, dask_dataframe, numpy_array or dask_array, other types not supported yet"
"output_data_type must be pandas_dataframe, dask_dataframe, numpy_array, dask_array, or polars_dataframe, other types not supported yet"
)
elif storage_type == ".csv":
# Check if output_data_type is supported and store cache to output_data_type
Expand All @@ -265,12 +294,14 @@ def _store_cache(
data.to_csv(storage_path + name + ".csv", index=False)
elif output_data_type == "dask_dataframe":
data.to_csv(storage_path + name, index=False)
elif output_data_type == "polars_dataframe":
data.write_csv(storage_path + name + ".csv")
else:
self.log_to_debug(
f"Invalid output data type: {output_data_type}, for storing .csv file."
)
raise ValueError(
"output_data_type must be pandas_dataframe or dask_dataframe, other types not supported yet"
"output_data_type must be pandas_dataframe, dask_dataframe, or polars_dataframe, other types not supported yet"
)
elif storage_type == ".npy_stack":
# Check if output_data_type is supported and store cache to output_data_type
Expand Down
28 changes: 14 additions & 14 deletions epochalyst/_core/_logging/_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,25 +6,25 @@
class _Logger:
"""Logger abstract class for logging methods.
### Methods:
```python
@abstractmethod
def log_to_terminal(self, message: str) -> None: # Logs to terminal if implemented
Methods:
.. code-block:: python
@abstractmethod
def log_to_terminal(self, message: str) -> None: # Logs to terminal if implemented
@abstractmethod
def log_to_debug(self, message: str) -> None: # Logs to debugger if implemented
@abstractmethod
def log_to_debug(self, message: str) -> None: # Logs to debugger if implemented
@abstractmethod
def log_to_warning(self, message: str) -> None: # Logs to warning if implemented
@abstractmethod
def log_to_warning(self, message: str) -> None: # Logs to warning if implemented
@abstractmethod
def log_to_external(self, message: dict[str, Any], **kwargs: Any) -> None: # Logs to external site
@abstractmethod
def log_to_external(self, message: dict[str, Any], **kwargs: Any) -> None: # Logs to external site
@abstractmethod
def external_define_metric(self, metric: str, metric_type: str) -> None: # Defines an external metric
@abstractmethod
def external_define_metric(self, metric: str, metric_type: str) -> None: # Defines an external metric
@abstractmethod
def log_section_separator(self, message: str) -> None: # Logs a section separator
@abstractmethod
def log_section_separator(self, message: str) -> None: # Logs a section separator
"""

@abstractmethod
Expand Down
88 changes: 88 additions & 0 deletions epochalyst/pipeline/ensemble.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
from agogos.training import ParallelTrainingSystem
from typing import Any
from epochalyst._core._caching._cacher import _CacheArgs


class EnsemblePipeline(ParallelTrainingSystem):
"""EnsemblePipeline is the class used to create the pipeline for the model. (Currently same implementation as agogos pipeline)
:param steps: Trainers to ensemble
"""

def get_x_cache_exists(self, cache_args: _CacheArgs) -> bool:
"""Get status of x
:param cache_args: Cache arguments
:return: Whether cache exists
"""
if len(self.steps) == 0:
return False

for step in self.steps:
if not step.get_x_cache_exists(cache_args):
return False

return True

def get_y_cache_exists(self, cache_args: _CacheArgs) -> bool:
"""Get status of y cache
:param cache_args: Cache arguments
:return: Whether cache exists
"""
if len(self.steps) == 0:
return False

for step in self.steps:
if not step.get_y_cache_exists(cache_args):
return False

return True

def concat(
self, original_data: Any, data_to_concat: Any, weight: float = 1.0
) -> Any:
"""Concatenate the trained data.
:param original_data: First input data
:param data_to_concat: Second input data
:param weight: Weight of data to concat
:return: Concatenated data
"""
if original_data is None:
if data_to_concat is None:
return None
return data_to_concat * weight

return original_data + data_to_concat * weight

# def __post_init__(self) -> None:
# """Post init method for the EnsemblePipeline class.
#
# Currently does nothing."""
# return super().__post_init__()
#
# def train(self, x: Any, y: Any, **train_args: Any) -> tuple[Any, Any]:
# """Train the system.
#
# :param x: The input to the system.
# :param y: The expected output of the system.
# :return: The input and output of the system.
# """
# return super().train(x, y, **train_args)
#
#
# def predict(self, x: Any, **pred_args: Any) -> Any:
# """Predict the output of the system.
#
# :param x: The input to the system.
# :return: The output of the system.
# """
# return super().predict(x, **pred_args)
#
# def concat(
# self, original_data: Any, data_to_concat: Any, weight: float = 1.0
# ) -> Any:
# if data_to_concat is None:
# return original_data
# return original_data + data_to_concat * weight
24 changes: 23 additions & 1 deletion epochalyst/pipeline/model/model.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from typing import Any
from agogos.pipeline import Pipeline
from agogos.training import Pipeline
from epochalyst._core._caching._cacher import _CacheArgs


class ModelPipeline(Pipeline):
Expand Down Expand Up @@ -34,3 +35,24 @@ def predict(self, x: Any, **pred_args: Any) -> Any:
:return: The output of the system.
"""
return super().predict(x, **pred_args)

def get_x_cache_exists(self, cache_args: _CacheArgs) -> bool:
"""Get status of x
:param cache_args: Cache arguments
:return: Whether cache exists
"""
if self.x_sys is None:
return False
return self.x_sys._cache_exists(self.x_sys.get_hash(), cache_args)

def get_y_cache_exists(self, cache_args: _CacheArgs) -> bool:
"""Get status of y cache
:param cache_args: Cache arguments
:return: Whether cache exists
"""
if self.y_sys is None:
return False

return self.y_sys._cache_exists(self.y_sys.get_hash(), cache_args)
Loading

0 comments on commit 6501a33

Please sign in to comment.