From a6f1799505b5fee3945489560e3c71d7c179e29c Mon Sep 17 00:00:00 2001 From: Chan Lee Date: Fri, 19 Apr 2024 05:23:09 +0900 Subject: [PATCH] tmp-release-0.6.7 --- luma/__import__.py | 15 ++-- luma/core/main.py | 4 +- luma/core/super.py | 4 +- luma/interface/util.py | 27 +++++++ luma/model_selection/split.py | 77 ++++++++++++++++-- luma/neural/activation.py | 29 ++----- luma/neural/layer.py | 145 ++++++++++++++++++++++++++++------ luma/neural/loss.py | 24 ++++++ luma/neural/network.py | 4 +- 9 files changed, 267 insertions(+), 62 deletions(-) create mode 100644 luma/neural/loss.py diff --git a/luma/__import__.py b/luma/__import__.py index bd658c4..4790538 100644 --- a/luma/__import__.py +++ b/luma/__import__.py @@ -18,7 +18,7 @@ NearestNeighbors, ) from luma.interface.util import SilhouetteUtil, DBUtil, KernelUtil, ActivationUtil -from luma.interface.util import Clone, ParamRange, Layer +from luma.interface.util import Clone, ParamRange, Layer, Loss from luma.classifier.discriminant import ( LDAClassifier, @@ -75,7 +75,8 @@ ) from luma.neural.single import PerceptronClassifier, PerceptronRegressor from luma.neural.network import MLPClassifier, MLPRegressor -from luma.neural.layer import Convolution, Pooling, Dense, Dropout, Flatten +from luma.neural.layer import Convolution, Pooling, Dense, Dropout, Flatten, Sequential +from luma.neural.loss import CategoricalCrossEntropy from luma.metric.classification import Accuracy, Precision, Recall, F1Score, Specificity from luma.metric.regression import ( @@ -90,7 +91,7 @@ from luma.metric.distance import Euclidean, Manhattan, Chebyshev, Minkowski from luma.metric.distance import CosineSimilarity, Correlation, Mahalanobis -from luma.model_selection.split import TrainTestSplit +from luma.model_selection.split import TrainTestSplit, BatchGenerator from luma.model_selection.search import GridSearchCV, RandomizedSearchCV from luma.model_selection.cv import CrossValidator from luma.model_selection.fold import KFold, StratifiedKFold @@ -168,7 +169,7 @@ Matrix, Vector, Tensor, Scalar, DecisionTreeNode, NearestNeighbors, SilhouetteUtil, DBUtil, KernelUtil, ActivationUtil, - Clone, ParamRange, Layer + Clone, ParamRange, Layer, Loss # ----------------- [ luma.classifier ] -------------------- LDAClassifier, QDAClassifier, RDAClassifier, KDAClassifier @@ -223,7 +224,9 @@ AdamOptimizer, AdaGradOptimizer, AdaDeltaOptimizer, AdaMaxOptimizer, AdamWOptimizer, NAdamOptimizer - Convolution, Pooling, Dense, Dropout, Flatten + Convolution, Pooling, Dense, Dropout, Flatten, Sequential + + CategoricalCrossEntropy # ------------------- [ luma.metric ] ---------------------- Accuracy, Precision, Recall, F1Score, Specificity @@ -238,7 +241,7 @@ CosineSimilarity, Correlation, Mahalanobis # --------------- [ luma.module_selection ] ---------------- - TrainTestSplit + TrainTestSplit, BatchGenerator GridSearchCV, RandomizedSearchCV diff --git a/luma/core/main.py b/luma/core/main.py index ded97e3..573f0e4 100644 --- a/luma/core/main.py +++ b/luma/core/main.py @@ -18,5 +18,5 @@ def __dealloc__(self) -> None: def __doc__(self) -> str: return luma.__doc__ - if sys.version_info < (3, 10): - print("Luma requires Python 3.10 or more", file=sys.stderr) + if sys.version_info < (3, 12): + print("Luma requires Python 3.12 or more", file=sys.stderr) diff --git a/luma/core/super.py b/luma/core/super.py index 388145a..315355d 100644 --- a/luma/core/super.py +++ b/luma/core/super.py @@ -1,4 +1,4 @@ -from typing import Any, Dict, Self +from typing import Any, Dict, NoReturn, Self from abc import ABCMeta, abstractmethod from luma.core.base import * @@ -266,7 +266,7 @@ def __init__(self) -> None: self.updated_weights = None self.updated_biases = None - def update(self, weights, biases, grad_weights, grad_biases) -> None: + def update(self, weights, biases, grad_weights, grad_biases) -> NoReturn: if weights is not None: self.updated_weights = self._update_weights(weights, grad_weights) if biases is not None: diff --git a/luma/interface/util.py b/luma/interface/util.py index 967fdbb..dd9cfe9 100644 --- a/luma/interface/util.py +++ b/luma/interface/util.py @@ -19,9 +19,13 @@ "Clone", "ParamRange", "Layer", + "Loss", ) +type TensorLike = Matrix | Tensor | Vector + + class Matrix(np.ndarray): """ Internal class for matrices(2D-array) that extends `numpy.ndarray`. @@ -556,6 +560,7 @@ class Layer: - `dW` : Gradient w.r.t. the weights - `dB` : Gradient w.r.t. the biases - `optimizer` : Optimizer for certain layer + - `out_shape` : Shape of the output when forwarding """ @@ -569,6 +574,7 @@ def __init__(self) -> None: self.dB: Tensor = None self.optimizer: object = None + self.out_shape: tuple = None def forward(self) -> Tensor: ... @@ -582,3 +588,24 @@ def update(self) -> None: ) self.weights_ = Tensor(weights_) self.biases_ = Tensor(biases_) + + def __str__(self) -> str: + return type(self).__name__ + + +class Loss: + """ + An internal class for loss functions used in neural networks. + + Loss functions, integral to the training process of machine + learning models, serve as crucial metrics assessing the disparity + between predicted outcomes and ground truth labels. They play a + pivotal role in optimization algorithms, guiding parameter updates + towards minimizing the discrepancy between predictions and true values. + """ + + def __init__(self) -> None: ... + + def loss(self) -> float: ... + + def grad(self) -> Matrix: ... diff --git a/luma/model_selection/split.py b/luma/model_selection/split.py index 27b6b1b..0078f4d 100644 --- a/luma/model_selection/split.py +++ b/luma/model_selection/split.py @@ -1,10 +1,10 @@ -from typing import Tuple +from typing import Iterator, Tuple import numpy as np -from luma.interface.util import Matrix, Vector +from luma.interface.util import Matrix, Vector, TensorLike -__all__ = "TrainTestSplit" +__all__ = ("TrainTestSplit", "BatchGenerator") class TrainTestSplit: @@ -58,11 +58,11 @@ def get(self) -> Tuple[Matrix, Matrix, Vector, Vector]: return self._split() def _split(self) -> Tuple[Matrix, Matrix, Vector, Vector]: - num_samples = self.X.shape[0] - indices = np.arange(num_samples) + n_samples = self.X.shape[0] + indices = np.arange(n_samples) if isinstance(self.test_size, float): - num_test_samples = int(self.test_size * num_samples) + num_test_samples = int(self.test_size * n_samples) else: num_test_samples = self.test_size @@ -113,3 +113,68 @@ def _stratified_split(self) -> Tuple[Matrix, Matrix, Vector, Vector]: y_test = self.y[test_indices] return X_train, X_test, y_train, y_test + + +class BatchGenerator: + """ + A class for generating mini-batches of data for training machine + learning models including neural networks. + + Parameters + ---------- + `X` : Input features + `y` : Targets or labels + `batch_size` : Size of a mini-batch + `shuffle` : Whether to shuffle the data for every batch generation + + Examples + -------- + An instance of `BatchGenerator` can be used as an iterator. + + - With instantiation: + + ```py + batch_gen = BatchGenerator(X, y, batch_size=100) + for X_batch, y_batch in batch_gen: + pass + ``` + - Without instantiation: + + ```py + for X_batch, y_batch in BatchGenerator(X, y, batch_size=100): + pass + ``` + """ + + def __init__( + self, + X: TensorLike, + y: TensorLike, + batch_size: int = 100, + shuffle: bool = True, + ) -> None: + self.X = X + self.y = y + self.batch_size = batch_size + self.shuffle = shuffle + + self.n_samples = X.shape[0] + self.n_batches = self.n_samples // batch_size + + if self.n_samples % batch_size != 0: + self.n_batches += 1 + + self.indices = np.arange(self.n_samples) + if self.shuffle: + np.random.shuffle(self.indices) + + def __iter__(self) -> Iterator[Tuple[TensorLike, TensorLike]]: + for i in range(self.n_batches): + start_idx = i * self.batch_size + end_idx = min((i + 1) * self.batch_size, self.n_samples) + + batch_indices = self.indices[start_idx:end_idx] + X_batch = self.X[batch_indices] + y_batch = self.y[batch_indices] + + yield X_batch, y_batch diff --git a/luma/neural/activation.py b/luma/neural/activation.py index 52d6b7e..d952cc2 100644 --- a/luma/neural/activation.py +++ b/luma/neural/activation.py @@ -4,14 +4,14 @@ __all__ = ("ReLU", "LeakyReLU", "ELU", "Tanh", "Sigmoid", "Softmax") -type Matrix = np.ndarray +type Matrix = Matrix class ReLU: def func(self, X: Matrix) -> Matrix: return np.maximum(0, X) - def derivative(self, X: Matrix) -> Matrix: + def grad(self, X: Matrix) -> Matrix: return (X > 0).astype(float) @@ -22,7 +22,7 @@ def __init__(self, alpha=0.01): def func(self, X: Matrix) -> Matrix: return np.where(X > 0, X, X * self.alpha) - def derivative(self, X: Matrix) -> Matrix: + def grad(self, X: Matrix) -> Matrix: return np.where(X > 0, 1, self.alpha) @@ -33,7 +33,7 @@ def __init__(self, alpha=1.0): def func(self, X: Matrix) -> Matrix: return np.where(X > 0, X, self.alpha * (np.exp(X) - 1)) - def derivative(self, X: Matrix) -> Matrix: + def grad(self, X: Matrix) -> Matrix: return np.where(X > 0, 1, self.func(X) + self.alpha) @@ -41,7 +41,7 @@ class Tanh: def func(self, X: Matrix) -> Matrix: return np.tanh(X) - def derivative(self, X: Matrix) -> Matrix: + def grad(self, X: Matrix) -> Matrix: return 1 - np.tanh(X) ** 2 @@ -49,7 +49,7 @@ class Sigmoid: def func(self, X: Matrix) -> Matrix: return 1 / (1 + np.exp(-X)) - def derivative(self, X: Matrix) -> Matrix: + def grad(self, X: Matrix) -> Matrix: return X * (1 - X) @@ -58,18 +58,5 @@ def func(self, X: Matrix) -> Matrix: exps = np.exp(X - np.max(X, axis=1, keepdims=True)) return exps / np.sum(exps, axis=1, keepdims=True) - def derivative(self, X: Matrix) -> Matrix: - m, n = X.shape - soft_out = self.func(X) - jacobian = np.zeros((m, n, n)) - - for i in range(len(soft_out)): - for j in range(len(soft_out[i])): - for k in range(len(soft_out[i])): - if j == k: - val = soft_out[i, j] * (1 - soft_out[i, j]) - else: - val = -soft_out[i, j] * soft_out[i, k] - jacobian[i, j, k] = val - - return jacobian + def grad(self, _: Matrix) -> Matrix: + NotImplemented diff --git a/luma/neural/layer.py b/luma/neural/layer.py index dafb848..013daaf 100644 --- a/luma/neural/layer.py +++ b/luma/neural/layer.py @@ -1,13 +1,13 @@ -from typing import Literal, Tuple +from typing import Any, List, Literal, Self, Tuple import numpy as np from luma.core.super import Optimizer -from luma.interface.util import Layer, Matrix, Tensor, ActivationUtil +from luma.interface.util import Layer, Matrix, Tensor, ActivationUtil, Loss, Clone from luma.interface.exception import UnsupportedParameterError -from luma.neural.optimizer import SGDOptimizer +from luma.neural.activation import Softmax -__all__ = ("Convolution", "Pooling", "Dense", "Dropout", "Flatten") +__all__ = ("Convolution", "Pooling", "Dense", "Dropout", "Flatten", "Sequential") class Convolution(Layer): @@ -46,8 +46,8 @@ def __init__( stride: int = 1, padding: Literal["valid", "same"] = "same", activation: ActivationUtil.FuncType = "relu", - optimizer: Optimizer = SGDOptimizer(), - lambda_: float = 0.1, + optimizer: Optimizer = None, + lambda_: float = 0.0, random_state: int = None, ) -> None: super().__init__() @@ -78,7 +78,9 @@ def forward(self, X: Tensor) -> Tensor: out_height = ((padded_height - self.size) // self.stride) + 1 out_width = ((padded_width - self.size) // self.stride) + 1 + out: Tensor = np.zeros((batch_size, self.n_filters, out_height, out_width)) + self.out_shape = out.shape X_padded = np.pad( X, ((0, 0), (0, 0), (pad_h, pad_h), (pad_w, pad_w)), mode="constant" @@ -103,7 +105,7 @@ def forward(self, X: Tensor) -> Tensor: out = self.act_.func(out) return out - def backward(self, d_out: Tensor) -> Tuple[Tensor, Tensor, Tensor]: + def backward(self, d_out: Tensor) -> Tensor: X = self.input_ batch_size, channels, height, width = X.shape pad_h, pad_w, padded_height, padded_width = self._get_padding_dim(height, width) @@ -147,7 +149,7 @@ def backward(self, d_out: Tensor) -> Tuple[Tensor, Tensor, Tensor]: if pad_h > 0 or pad_w > 0 else dX_padded ) - self.dX = self.act_.derivative(self.dX) + self.dX = self.act_.grad(self.dX) return self.dX def _get_padding_dim(self, height: int, width: int) -> Tuple[int, int, int, int]: @@ -206,6 +208,8 @@ def forward(self, X: Tensor) -> Tensor: out_width = 1 + (width - self.size) // self.stride out: Tensor = np.zeros((batch_size, channels, out_height, out_width)) + self.out_shape = out.shape + for i in range(out_height): for j in range(out_width): h_start, h_end, w_start, w_end = self._get_height_width(i, j) @@ -279,9 +283,6 @@ class Dense(Layer): ```py X.shape = (batch_size, n_features) ``` - - Enabling `reshape` in `backward` forces the returning gradient - to have the shape of the original input. (default = True) - """ def __init__( @@ -289,8 +290,8 @@ def __init__( input_size: int, output_size: int, activation: ActivationUtil.FuncType = "relu", - optimizer: Optimizer = SGDOptimizer(), - lambda_: float = 0.1, + optimizer: Optimizer = None, + lambda_: float = 0.0, random_state: int = None, ) -> None: super().__init__() @@ -308,27 +309,27 @@ def __init__( self.weights_: Matrix = 0.01 * self.rs_.randn(self.input_size, self.output_size) self.biases_: Matrix = np.zeros((1, self.output_size)) - def forward(self, X: Tensor | Matrix) -> Tensor: + def forward(self, X: Matrix) -> Matrix: self.input_ = X out = np.dot(X, self.weights_) + self.biases_ out = self.act_.func(out) + self.out_shape = out.shape return out - def backward(self, d_out: Tensor, reshape: bool = True) -> Tensor: + def backward(self, d_out: Matrix) -> Matrix: X = self.input_ - d_out = self.act_.derivative(d_out) + if isinstance(self.act_, Softmax): + pass + else: + d_out = self.act_.grad(d_out) self.dX = np.dot(d_out, self.weights_.T) self.dW = np.dot(X.T, d_out) - self.dB = np.sum(d_out, axis=0, keepdims=True) - self.dW += 2 * self.lambda_ * self.weights_ + self.dB = np.sum(d_out, axis=0, keepdims=True) - if reshape: - return self.dX.reshape(*self.input_.shape) - else: - return self.dX + return self.dX class Dropout(Layer): @@ -360,6 +361,8 @@ def __init__(self, dropout_rate: float = 0.1, random_state: int = None) -> None: def forward(self, X: Tensor, is_train: bool = False) -> Tensor: self.input_ = X + self.out_shape = self.input_.shape + if is_train: self.mask_ = ( self.rs_.rand(*X.shape) < self.dropout_rate @@ -389,8 +392,104 @@ def __init__(self) -> None: def forward(self, X: Tensor) -> Matrix: self.input_ = X - return X.reshape(X.shape[0], -1) + out = X.reshape(X.shape[0], -1) + self.out_shape = out.shape + return out def backward(self, d_out: Matrix) -> Tensor: dX = d_out.reshape(self.input_.shape) return dX + + +class Sequential(Layer): + trainable: List[Layer] = [Convolution, Dense] + only_for_train: List[Layer] = [Dropout] + + def __init__( + self, + *layers: Layer | Tuple[str, Layer], + verbose: bool = False, + ) -> None: + self.layers: List[Tuple[str, Layer]] = list() + for layer in layers: + self.add(layer) + + self.optimizer = None + self.loss_func_ = None + self.verbose = verbose + + def forward(self, X: Tensor, is_train: bool = False) -> Tensor: + self.input_ = X + out = X + + for name, layer in self.layers: + if Sequential._check_only_for_train(layer): + out = layer.forward(out, is_train=is_train) + else: + out = layer.forward(out) + if self.verbose: + print(f"[Sequential] Feed-forwarded '{name}'") + + self.out_shape = out.shape + return out + + def backward(self, d_out: Matrix) -> None: + for name, layer in reversed(self.layers): + d_out = layer.backward(d_out) + if self.verbose: + print(f"[Sequential] Backpropagated '{name}'") + + def update(self) -> None: + self._check_no_optimizer_loss() + for name, layer in reversed(self.layers): + layer.update() + if self.verbose and Sequential._check_trainable_layer(layer): + print(f"[Sequential] Updated '{name}'") + + def set_optimizer(self, optimizer: Optimizer, **params: Any) -> None: + self.optimizer = optimizer + self.optimizer.set_params(**params) + + for _, layer in self.layers: + layer.optimizer = Clone(self.optimizer).get + + def set_loss(self, loss_func: Loss) -> None: + self.loss_func_ = loss_func + + @classmethod + def _check_only_for_train(cls, layer: Layer) -> bool: + return layer in cls.only_for_train + + @classmethod + def _check_trainable_layer(cls, layer: Layer) -> bool: + return layer in cls.trainable + + def _check_no_optimizer_loss(self) -> None: + if self.optimizer is None: + raise RuntimeError( + f"'{self}' has no optimizer! " + + f"Call '{self}().set_optimizer' to assign an optimizer." + ) + if self.loss_func_ is None: + raise RuntimeError( + f"'{self}' has no loss function! " + + f"Call '{self}().set_loss' to assign a loss function." + ) + + def add(self, layer: Layer | Tuple[str, Layer]) -> None: + if not isinstance(layer, tuple): + layer = (str(layer), layer) + self.layers.append(layer) + + def __add__(self, seq: Self) -> Self: ... + + def __call__(self, X: Tensor, y: Matrix, is_train: bool = False) -> float: + self._check_no_optimizer_loss() + + out = self.forward(X, is_train=is_train) + d_out = self.loss_func_.grad(y, out) + loss = self.loss_func_.loss(y, out) + + self.backward(d_out) + self.update() + return loss diff --git a/luma/neural/loss.py b/luma/neural/loss.py new file mode 100644 index 0000000..521f674 --- /dev/null +++ b/luma/neural/loss.py @@ -0,0 +1,24 @@ +import numpy as np + +from luma.interface.util import Matrix, Loss + + +__all__ = "CategoricalCrossEntropy" + + +class CategoricalCrossEntropy(Loss): + def __init__(self) -> None: + super().__init__() + self.epsilon = 1e-8 + + def loss(self, y_true: Matrix, y_pred: Matrix) -> float: + m = y_true.shape[0] + y_pred = np.clip(y_pred, self.epsilon, 1 - self.epsilon) + loss = -np.sum(y_true * np.log(y_pred)) / m + return loss + + def grad(self, y_true: Matrix, y_pred: Matrix) -> Matrix: + m = y_true.shape[0] + y_pred = np.clip(y_pred, self.epsilon, 1 - self.epsilon) + grad = (y_pred - y_true) / m + return grad diff --git a/luma/neural/network.py b/luma/neural/network.py index f324e5a..6a7c858 100644 --- a/luma/neural/network.py +++ b/luma/neural/network.py @@ -223,7 +223,7 @@ def _backpropagation(self, X: Matrix, y: Matrix) -> None: for i in range(self.n_layers - 2, -1, -1): delta = np.dot(delta, self.weights[i + 1].T) - delta *= self.act_.derivative(as_[i + 1]) + delta *= self.act_.grad(as_[i + 1]) dW = np.dot(as_[i].T, delta) dW += (self.lambda_ / m) * self.weights[i] @@ -447,7 +447,7 @@ def _backpropagation(self, X: Matrix, y: Matrix) -> None: for i in range(self.n_layers - 2, -1, -1): delta = np.dot(delta, self.weights[i + 1].T) - delta *= self.act_.derivative(as_[i + 1]) + delta *= self.act_.grad(as_[i + 1]) dW = np.dot(as_[i].T, delta) dW += (self.lambda_ / m) * self.weights[i]