Skip to content

Commit

Permalink
optimizers reform & dense layer
Browse files Browse the repository at this point in the history
  • Loading branch information
ChanLumerico committed Apr 17, 2024
1 parent 458c0aa commit 688ed4e
Show file tree
Hide file tree
Showing 6 changed files with 399 additions and 263 deletions.
8 changes: 4 additions & 4 deletions luma/__import__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
NearestNeighbors,
)
from luma.interface.util import SilhouetteUtil, DBUtil, KernelUtil, ActivationUtil
from luma.interface.util import Clone, ParamRange
from luma.interface.util import Clone, ParamRange, Layer

from luma.classifier.discriminant import (
LDAClassifier,
Expand Down Expand Up @@ -75,7 +75,7 @@
)
from luma.neural.single import PerceptronClassifier, PerceptronRegressor
from luma.neural.network import MLPClassifier, MLPRegressor
from luma.neural.layer import Layer, Convolution, Pooling
from luma.neural.layer import Convolution, Pooling

from luma.metric.classification import Accuracy, Precision, Recall, F1Score, Specificity
from luma.metric.regression import (
Expand Down Expand Up @@ -168,7 +168,7 @@
Matrix, Vector, Tensor, Scalar,
DecisionTreeNode, NearestNeighbors,
SilhouetteUtil, DBUtil, KernelUtil, ActivationUtil,
Clone, ParamRange
Clone, ParamRange, Layer

# ----------------- [ luma.classifier ] --------------------
LDAClassifier, QDAClassifier, RDAClassifier, KDAClassifier
Expand Down Expand Up @@ -223,7 +223,7 @@
AdamOptimizer, AdaGradOptimizer, AdaDeltaOptimizer,
AdaMaxOptimizer, AdamWOptimizer, NAdamOptimizer

Layer, Convolution, Pooling
Convolution, Pooling

# ------------------- [ luma.metric ] ----------------------
Accuracy, Precision, Recall, F1Score, Specificity
Expand Down
15 changes: 13 additions & 2 deletions luma/core/super.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,8 +262,19 @@ def update(self, **kwargs) -> tuple
```
"""

@abstractmethod
def update(self, **kwargs) -> tuple: ...
def __init__(self) -> None:
self.updated_weights = None
self.updated_biases = None

def update(self, weights, biases, grad_weights, grad_biases) -> None:
if weights is not None:
self.updated_weights = self._update_weights(weights, grad_weights)
if biases is not None:
self.updated_biases = self._update_biases(biases, grad_biases)

def _update_weights(self) -> Any: ...

def _update_biases(self) -> Any: ...

@property
def best_model(self) -> Estimator | Transformer: ...
Expand Down
56 changes: 51 additions & 5 deletions luma/interface/util.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Any, AnyStr, Callable, Literal, Type, TypeGuard
from typing import Any, AnyStr, Callable, Literal, Self, Type, TypeGuard
import numpy as np

from luma.interface.exception import UnsupportedParameterError, InvalidRangeError
Expand All @@ -18,6 +18,7 @@
"ActivationUtil",
"Clone",
"ParamRange",
"Layer",
)


Expand All @@ -36,7 +37,7 @@ class Matrix(np.ndarray):
"""

def __new__(cls, array_like: Any) -> "Matrix":
def __new__(cls, array_like: Any) -> Self:
if isinstance(array_like, (list, np.matrix)):
obj = np.array(array_like)
else:
Expand All @@ -57,7 +58,7 @@ class Vector(Matrix):
"""

def __new__(cls, array_like: Any) -> "Vector":
def __new__(cls, array_like: Any) -> Self:
if isinstance(array_like, list):
obj = Matrix(array_like)
else:
Expand All @@ -74,7 +75,10 @@ class Tensor(Matrix):
additional functionalities and readability.
"""

def __new__(cls, array_like: Any) -> "Tensor":
type Tensor_3D = "Tensor"
type Tensor_4D = "Tensor"

def __new__(cls, array_like: Any) -> Self:
if isinstance(array_like, list):
obj = Matrix(array_like)
else:
Expand All @@ -89,7 +93,7 @@ class Scalar:
This class encompasses `int` and `float`.
"""

def __new__(cls, value: int | float) -> "Scalar":
def __new__(cls, value: int | float) -> Self:
return float(value)


Expand Down Expand Up @@ -533,3 +537,45 @@ def condition(self) -> Callable[[Scalar], bool]:
return lambda x: lower <= x <= upper
else:
NotImplemented


class Layer:
"""
An internal class for layers in neural networks.
Neural network layers are composed of interconnected nodes,
each performing computations on input data. Common types include
fully connected, convolutional, and recurrent layers, each
serving distinct roles in learning from data.
Attributes
----------
- `weights_` : Weight tensor
- `biases_` : Bias tensor
- `dX` : Gradient w.r.t. the input
- `dW` : Gradient w.r.t. the weights
- `dB` : Gradient w.r.t. the biases
- `optimizer` : Optimizer for certain layer
"""

def __init__(self) -> None:
self.weights_: Tensor | Matrix = None
self.biases_: Vector = None

self.dX: Tensor | Matrix = None
self.dW: Tensor | Matrix = None
self.dB: Vector = None

self.optimizer: object = None

def forward(self) -> Tensor: ...

def backward(self) -> Tensor: ...

def update(self) -> None:
if self.optimizer is None:
return
self.weights_, self.biases_ = self.optimizer.update(
self.weights_, self.biases_, self.dW, self.dB
)
118 changes: 82 additions & 36 deletions luma/neural/layer.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,13 @@
from typing import Literal, Tuple
import numpy as np

from luma.interface.util import Tensor, ActivationUtil
from luma.core.super import Optimizer
from luma.interface.util import Layer, Vector, Matrix, Tensor, ActivationUtil
from luma.interface.exception import UnsupportedParameterError
from luma.neural.optimizer import SGDOptimizer


__all__ = ("Layer", "Convolution", "Pooling", "Dense")


class Layer:
"""
An internal class for layers in neural networks.
Neural network layers are composed of interconnected nodes,
each performing computations on input data. Common types include
fully connected, convolutional, and recurrent layers, each
serving distinct roles in learning from data.
"""

def forward(self) -> Tensor: ...

def backward(self) -> Tensor | Tuple[Tensor, ...]: ...
__all__ = ("Convolution", "Pooling", "Dense")


class Convolution(Layer):
Expand All @@ -39,6 +26,7 @@ class Convolution(Layer):
`padding` : Padding stratagies
(`valid` for no padding, `same` for typical 0-padding)
`activation` : Type of activation function
`optimizer` : Optimizer for weight update (default `SGDOptimizer`)
Notes
-----
Expand All @@ -62,27 +50,29 @@ def __init__(
stride: int = 1,
padding: Literal["valid", "same"] = "same",
activation: ActivationUtil.FuncType = "relu",
optimizer: Optimizer = SGDOptimizer(),
random_state: int = None,
) -> None:
super().__init__()
self.n_filters = n_filters
self.size = size
self.stride = stride
self.padding = padding
self.activation = activation
self.optimizer = optimizer

act = ActivationUtil(self.activation)
self.act_ = act.activation_type()
self.rs_ = np.random.RandomState(random_state)

self.filters_ = None
self.biases_ = np.zeros(self.n_filters)
self.biases_: Vector = np.zeros(self.n_filters)

def forward(self, X: Tensor) -> Tensor:
assert len(X.shape) == 4, "X must have the form of 4D-array!"
batch_size, channels, height, width = X.shape

if self.filters_ is None:
self.filters_ = 0.01 * self.rs_.randn(
if self.weights_ is None:
self.weights_ = 0.01 * self.rs_.randn(
self.n_filters, channels, self.size, self.size
)

Expand All @@ -97,7 +87,7 @@ def forward(self, X: Tensor) -> Tensor:
)
X_fft = np.fft.rfftn(X_padded, s=(padded_height, padded_width), axes=[2, 3])
filter_fft = np.fft.rfftn(
self.filters_, s=(padded_height, padded_width), axes=[2, 3]
self.weights_, s=(padded_height, padded_width), axes=[2, 3]
)

for i in range(batch_size):
Expand All @@ -120,8 +110,8 @@ def backward(self, X: Tensor, d_out: Tensor) -> Tuple[Tensor, Tensor, Tensor]:
pad_h, pad_w, padded_height, padded_width = self._get_padding_dim(height, width)

dX_padded = np.zeros((batch_size, channels, padded_height, padded_width))
dW = np.zeros_like(self.filters_)
dB = np.zeros(self.n_filters)
self.dW = np.zeros_like(self.weights_)
self.dB = np.zeros_like(self.biases_)

X_padded = np.pad(
X, ((0, 0), (0, 0), (pad_h, pad_h), (pad_w, pad_w)), mode="constant"
Expand All @@ -130,12 +120,12 @@ def backward(self, X: Tensor, d_out: Tensor) -> Tuple[Tensor, Tensor, Tensor]:
d_out_fft = np.fft.rfftn(d_out, s=(padded_height, padded_width), axes=[2, 3])

for f in range(self.n_filters):
dB[f] = np.sum(d_out[:, f, :, :])
self.dB[f] = np.sum(d_out[:, f, :, :])

for f in range(self.n_filters):
for c in range(channels):
filter_d_out_fft = np.sum(X_fft[:, c] * d_out_fft[:, f].conj(), axis=0)
dW[f, c] = np.fft.irfftn(
self.dW[f, c] = np.fft.irfftn(
filter_d_out_fft, s=(padded_height, padded_width)
)[pad_h : pad_h + self.size, pad_w : pad_w + self.size]

Expand All @@ -146,19 +136,18 @@ def backward(self, X: Tensor, d_out: Tensor) -> Tuple[Tensor, Tensor, Tensor]:
)
for f in range(self.n_filters):
filter_fft = np.fft.rfftn(
self.filters_[f, c], s=(padded_height, padded_width)
self.weights_[f, c], s=(padded_height, padded_width)
)
temp += filter_fft * d_out_fft[i, f]
dX_padded[i, c] = np.fft.irfftn(temp, s=(padded_height, padded_width))

dX = (
self.dX = (
dX_padded[:, :, pad_h:-pad_h, pad_w:-pad_w]
if pad_h > 0 or pad_w > 0
else dX_padded
)

dX = self.act_.derivative(dX)
return dX, dW, dB
self.dX = self.act_.derivative(self.dX)
return self.dX

def _get_padding_dim(self, height: int, width: int) -> Tuple[int, int, int, int]:
if self.padding == "same":
Expand Down Expand Up @@ -191,11 +180,24 @@ class Pooling(Layer):
`stride` : Step size of filter during pooling
`mode` : Pooling strategy (i.e. max, average)
Notes
-----
- The input `X` must have the form of 4D-array(`Tensor`).
```py
X.shape = (batch_size, channels, height, width)
```
- `backward` returns gradients w.r.t. the input.
```py
def backward(self, ...) -> Tensor
```
"""

def __init__(
self, size: int = 2, stride: int = 2, mode: Literal["max", "avg"] = "max"
) -> None:
super().__init__()
self.size = size
self.stride = stride
self.mode = mode
Expand All @@ -222,7 +224,7 @@ def forward(self, X: Tensor) -> Tensor:

def backward(self, X: Tensor, d_out: Tensor) -> Tensor:
_, _, out_height, out_width = d_out.shape
dX = np.zeros_like(X)
self.dX = np.zeros_like(X)

for i in range(out_height):
for j in range(out_width):
Expand All @@ -232,16 +234,16 @@ def backward(self, X: Tensor, d_out: Tensor) -> Tensor:
if self.mode == "max":
max_vals = np.max(window, axis=(2, 3), keepdims=True)
mask = window == max_vals
dX[:, :, h_start:h_end, w_start:w_end] += (
self.dX[:, :, h_start:h_end, w_start:w_end] += (
mask * d_out[:, :, i : i + 1, j : j + 1]
)
elif self.mode == "avg":
avg_grad = d_out[:, :, i, j] / (self.size**2)
dX[:, :, h_start:h_end, w_start:w_end] += (
self.dX[:, :, h_start:h_end, w_start:w_end] += (
np.ones((1, 1, self.size, self.size)) * avg_grad
)

return dX
return self.dX

def _get_height_width(self, cur_h: int, cur_w: int) -> Tuple[int, int, int, int]:
h_start = cur_h * self.stride
Expand All @@ -253,4 +255,48 @@ def _get_height_width(self, cur_h: int, cur_w: int) -> Tuple[int, int, int, int]


class Dense(Layer):
NotImplemented
"""
A dense layer, also known as a fully connected layer, connects each
neuron in one layer to every neuron in the next layer. It performs a
linear transformation followed by a nonlinear activation function,
enabling complex relationships between input and output. Dense layers
are fundamental in deep learning models for learning representations from
data. They play a crucial role in capturing intricate patterns and
features during the training process.
Parameters
----------
- `input_size` : Number of input neurons
- `output_size`: Number of output neurons
- `optimizer` : Optimizer for weight update (default `SGDOptimizer`)
"""

def __init__(
self, input_size: int, output_size: int, optimizer: Optimizer = SGDOptimizer()
) -> None:
super().__init__()
self.input_size = input_size
self.output_size = output_size
self.optimizer = optimizer

self.weights_: Matrix = 0.01 * np.random.randn(
self.input_size, self.output_size
)
self.biases_: Vector = np.zeros(self.output_size)

def forward(self, X: Tensor | Matrix) -> Tensor:
X = self._flatten(X)
out = np.dot(X, self.weights_) + self.biases_
return out

def backward(self, X: Tensor, d_out: Tensor) -> Tensor:
X = self._flatten(X)
self.dX = np.dot(d_out, self.weights_.T)
self.dW = np.dot(X.T, d_out)
self.dB = np.sum(d_out, axis=0, keepdims=True)

return self.dX

def _flatten(self, X: Tensor) -> Matrix:
return X.reshape(X.shape[0], -1) if len(X.shape) > 2 else X
Loading

0 comments on commit 688ed4e

Please sign in to comment.