Skip to content

Commit

Permalink
Merge pull request #60 from TeamEpochGithub/v0.1.3
Browse files Browse the repository at this point in the history
v0.1.3
  • Loading branch information
hjdeheer authored Mar 8, 2024
2 parents eb4d38f + 7e6011c commit 40fbe08
Show file tree
Hide file tree
Showing 13 changed files with 150 additions and 31 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -216,4 +216,4 @@ plots/images/train
coverage_re/

# Ignore vscode settings
.vscode/
.vscode/
6 changes: 5 additions & 1 deletion epochalyst/_core/_caching/_cacher.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,11 @@ def _store_cache(
elif storage_type == ".pkl":
# Store the pickle file
self.log_to_debug(f"Storing pickle file to {storage_path + name + '.pkl'}")
pickle.dump(data, open(storage_path + name + ".pkl", "wb"))
pickle.dump(
data,
open(storage_path + name + ".pkl", "wb"),
protocol=pickle.HIGHEST_PROTOCOL,
)
else:
self.log_to_debug(f"Invalid storage type: {storage_type}")
raise ValueError(
Expand Down
90 changes: 69 additions & 21 deletions epochalyst/pipeline/model/training/torch_trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,30 +24,49 @@
class TorchTrainer(TrainingBlock):
"""Abstract class for torch trainers, override necessary functions for custom implementation.
To use this block, you must inherit from it and implement the following methods:
- `log_to_terminal`
- `log_to_debug`
- `log_to_warning`
- `log_to_external`
- `external_define_metric`
:param model: The model to train.
:param optimizer: Optimizer to use for training.
:param criterion: Criterion to use for training.
:param scheduler: Scheduler to use for training.
:param epochs: Number of epochs
:param batch_size: Batch size
:param patience: Patience for early stopping
:param test_size: Relative size of the test set
### Parameters:
- `model` (nn.Module): The model to train.
- `optimizer` (functools.partial[Optimizer]): Optimizer to use for training.
- `criterion` (nn.Module): Criterion to use for training.
- `scheduler` (Callable[[Optimizer], LRScheduler] | None): Scheduler to use for training.
- `epochs` (int): Number of epochs
- `batch_size` (int): Batch size
- `patience` (int): Patience for early stopping
- `test_size` (float): Relative size of the test set
### Methods:
```python
------------------------------
@abstractmethod
def log_to_terminal(self, message: str) -> None:
# Logs to terminal if implemented
@abstractmethod
def log_to_debug(self, message: str) -> None:
# Logs to debugger if implemented
@abstractmethod
def log_to_warning(self, message: str) -> None:
# Logs to warning if implemented
@abstractmethod
def log_to_external(self, message: dict[str, Any], **kwargs: Any) -> None:
# Logs to external site
def train(x: npt.NDArray[np.float32], y: npt.NDArray[np.float32], train_indices: list[int], test_indices: list[int], cache_size: int = -1, save_model: bool = True) -> tuple[npt.NDArray[np.float32], npt.NDArray[np.float32]]:
# Train the model.
@abstractmethod
def external_define_metric(self, metric: str, metric_type: str) -> None:
# Defines an external metric
def predict(x: npt.NDArray[np.float32]) -> npt.NDArray[np.float32]:
# Predict on the test data.
def train(self, x: Any, y: Any, cache_args: dict[str, Any] = {}, **train_args: Any) -> tuple[Any, Any]:
# Applies caching and calls custom_train, overridding removes caching
def predict(self, x: Any, cache_args: dict[str, Any] = {}, **pred_args: Any) -> Any:
# Applies caching and calls custom_predict, overridding removes caching
def custom_train(self, x: Any, y: Any, **train_args: Any) -> tuple[Any, Any]:
# Implements torch training. If you are going to override this method and not use any other functionality, inherit from TrainingBlock.
def custom_predict(self, x: Any, **pred_args: Any) -> Any:
# Implements torch prediction. If you are going to override this method and not use any other functionality, inherit from TrainingBlock.
def predict_on_loader(loader: DataLoader[tuple[Tensor, ...]]) -> npt.NDArray[np.float32]:
# Predict using a dataloader.
Expand All @@ -64,6 +83,35 @@ def create_dataloaders(train_dataset: Dataset[tuple[Tensor, ...]], test_dataset:
def update_model_directory(model_directory: str) -> None:
# Update the model directory for caching (default: tm).
```
### Usage:
```python
from epochalyst.pipeline.model.training.torch_trainer import TorchTrainer
from torch import nn
from torch.optim import Adam
from torch.optim.lr_scheduler import StepLR
from torch.nn import MSELoss
class MyTorchTrainer(TorchTrainer):
def log_to_terminal(self, message: str) -> None:
....
model = nn.Sequential(nn.Linear(1, 1))
optimizer = functools.partial(Adam, lr=0.01)
criterion = MSELoss()
scheduler = functools.partial(StepLR, step_size=1, gamma=0.1)
epochs = 10
batch_size = 32
patience = 5
test_size = 0.2
trainer = MyTorchTrainer(model=model, optimizer=optimizer, criterion=criterion, scheduler=scheduler, epochs=epochs, batch_size=batch_size, patience=patience, test_size=test_size)
x, y = trainer.train(x, y)
x = trainer.predict(x)
```
"""

model: nn.Module
Expand Down
3 changes: 3 additions & 0 deletions epochalyst/pipeline/model/training/training.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ def train(
and self._cache_exists(name=self.get_hash() + "x", cache_args=cache_args)
and self._cache_exists(name=self.get_hash() + "y", cache_args=cache_args)
):
self.log_to_terminal(
f"Cache exists for training pipeline with hash: {self.get_hash()}. Using the cache."
)
x = self._get_cache(name=self.get_hash() + "x", cache_args=cache_args)
y = self._get_cache(name=self.get_hash() + "y", cache_args=cache_args)
return x, y
Expand Down
3 changes: 3 additions & 0 deletions epochalyst/pipeline/model/training/training_block.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ def train(
and self._cache_exists(name=self.get_hash() + "x", cache_args=cache_args)
and self._cache_exists(name=self.get_hash() + "y", cache_args=cache_args)
):
self.log_to_terminal(
f"Cache exists for {self.__class__} with hash: {self.get_hash()}. Using the cache."
)
x = self._get_cache(name=self.get_hash() + "x", cache_args=cache_args)
y = self._get_cache(name=self.get_hash() + "y", cache_args=cache_args)
return x, y
Expand Down
50 changes: 48 additions & 2 deletions epochalyst/pipeline/model/transformation/transformation.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,51 @@
class TransformationPipeline(TransformingSystem, _Cacher, _Logger):
"""TransformationPipeline is the class used to create the pipeline for the transformation of the data.
:param steps: The steps to transform the data. Can be a list of Transformers, TransformationPipelines, or a combination of both.
:param title: The title of the pipeline. (Default: "Transformation Pipeline")
### Parameters:
- `steps` (List[Union[Transformer, TransformationPipeline]]): The steps to transform the data. Can be a list of Transformers, TransformationPipelines, or a combination of both.
- `title` (str): The title of the pipeline. (Default: "Transformation Pipeline")
### Methods:
```python
@abstractmethod
def log_to_terminal(self, message: str) -> None: # Log the message to the terminal.
@abstractmethod
def log_to_debug(self, message: str) -> None: # Log the message to the debug file.
@abstractmethod
def log_to_warning(self, message: str) -> None: # Log the message to the warning file.
@abstractmethod
def log_to_external(self, message: str) -> None: # Log the message to an external file.
@abstractmethod
def log_section_separator(self, title: str) -> None: # Log a section separator to the terminal.
@abstractmethod
def external_define_metric(self, metric: str, metric_type: str) -> None: # Define a metric to be logged to an external file.
def transform(self, data: Any, cache_args: dict[str, Any] = {}, **transform_args: Any) -> Any: # Transform the input data.
def get_hash(self) -> str: # Get the hash of the pipeline.
```
### Usage:
```python
from epochalyst.pipeline.model.transformation import TransformationPipeline
class MyTransformationPipeline(TransformationPipeline):
def log_to_terminal(self, message: str) -> None:
print(message)
....
step1 = MyTransformer1()
step2 = MyTransformer2()
pipeline = MyTransformationPipeline(steps=[step1, step2])
data = pipeline.transform(data)
```
"""

title: str = "Transformation Pipeline" # The title of the pipeline since transformation pipeline can be used for multiple purposes. (Feature, Label, etc.)
Expand All @@ -26,6 +69,9 @@ def transform(
"""

if cache_args and self._cache_exists(self.get_hash(), cache_args):
self.log_to_terminal(
f"Cache exists for {self.title} with hash: {self.get_hash()}. Using the cache."
)
return self._get_cache(self.get_hash(), cache_args)

if self.steps:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ def transform(
if cache_args and self._cache_exists(
name=self.get_hash(), cache_args=cache_args
):
self.log_to_terminal(
f"Cache exists for {self.__class__} with hash: {self.get_hash()}. Using the cache."
)
return self._get_cache(name=self.get_hash(), cache_args=cache_args)

data = self.custom_transform(data, **transform_args)
Expand Down
10 changes: 5 additions & 5 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "epochalyst"
version = "0.1.2"
version = "0.1.3"
authors = [
{ name = "Jasper van Selm", email = "jmvanselm@gmail.com" },
{ name = "Ariel Ebersberger", email = "arielebersberger@gmail.com" },
Expand All @@ -17,20 +17,20 @@ classifiers = [
]
dependencies= [
# Machine Learning Libraries
"numpy>=1.24.4",
"numpy>=1.22.4",

# Parallel Processing Libraries
"dask>=2023.12.0",

# Data Processing Libraries
"pandas>=1.3.3",
"pandas>=1.3.0",

# Parquet
"pyarrow>=6.0.0",

# PyTorch
"torch==2.2.1+cu118",
"torch>=2.1.0",

# Agogos
"agogos>=0.2.0",
"agogos>=0.2.1",
]
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
agogos>=0.2.0
agogos==0.2.0
annotated-types==0.6.0
certifi==2024.2.2
charset-normalizer==3.3.2
Expand Down
3 changes: 3 additions & 0 deletions tests/pipeline/model/training/test_training.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ class CustomTrainingPipeline(TrainingPipeline):
def log_to_debug(self, message: str) -> None:
return None

def log_to_terminal(self, message: str) -> None:
return None

t1 = TestTrainingBlock()
t2 = TestTrainingBlock()

Expand Down
3 changes: 3 additions & 0 deletions tests/pipeline/model/training/test_training_block.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ def custom_predict(self, x: int) -> int:
def log_to_debug(self, message: str) -> None:
return None

def log_to_terminal(self, message: str) -> None:
return None

tb = TestTrainingBlockImpl()
cache_args = {
"output_data_type": "numpy_array",
Expand Down
3 changes: 3 additions & 0 deletions tests/pipeline/model/transformation/test_transformation.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ class CustomTransformationPipeline(TransformationPipeline):
def log_to_debug(self, message: str) -> None:
return None

def log_to_terminal(self, message: str) -> None:
return None

t1 = TestTransformationBlock()
t2 = TestTransformationBlock()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ def custom_transform(self, data: np.ndarray[int], **transform_args) -> int:
def log_to_debug(self, message: str) -> None:
return None

def log_to_terminal(self, message: str) -> None:
return None

tb = TestTransformationBlockImpl()
cache_args = {
"output_data_type": "numpy_array",
Expand Down

0 comments on commit 40fbe08

Please sign in to comment.