From bb785deb30ebd938a2c23620e99eb75c101fb2b6 Mon Sep 17 00:00:00 2001 From: Tom Hosking Date: Tue, 9 Apr 2024 16:16:39 +0100 Subject: [PATCH] Add model - Hierarchical Residual Quantization (HRQVAE) (#140) * Add HRQVAE model - seems to work OK * Bug fixes * Add example notebook and update readme * Update expected output in test * Fix typos, update docstrings --- README.md | 1 + .../models_training/hrqvae_training.ipynb | 4125 +++++++++++++++++ src/pythae/models/__init__.py | 3 + src/pythae/models/auto_model/auto_config.py | 5 + src/pythae/models/auto_model/auto_model.py | 5 + src/pythae/models/hrq_vae/__init__.py | 19 + src/pythae/models/hrq_vae/hrq_vae_config.py | 37 + src/pythae/models/hrq_vae/hrq_vae_model.py | 128 + src/pythae/models/hrq_vae/hrq_vae_utils.py | 153 + .../models/nn/benchmarks/mnist/resnets.py | 230 + tests/test_HRQVAE.py | 760 +++ 11 files changed, 5466 insertions(+) create mode 100644 examples/notebooks/models_training/hrqvae_training.ipynb create mode 100755 src/pythae/models/hrq_vae/__init__.py create mode 100755 src/pythae/models/hrq_vae/hrq_vae_config.py create mode 100755 src/pythae/models/hrq_vae/hrq_vae_model.py create mode 100755 src/pythae/models/hrq_vae/hrq_vae_utils.py create mode 100755 tests/test_HRQVAE.py diff --git a/README.md b/README.md index b601f2e8..ecaacdc2 100644 --- a/README.md +++ b/README.md @@ -107,6 +107,7 @@ VAE with Inverse Autoregressive Flows (VAE_IAF) | [![Open In Colab](https://col | Regularized AE with L2 decoder param (RAE_L2) | [![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/clementchadebec/benchmark_VAE/blob/main/examples/notebooks/models_training/rae_l2_training.ipynb) | [link](https://arxiv.org/abs/1903.12436) | [link](https://github.com/ParthaEth/Regularized_autoencoders-RAE-/tree/master/) | | Regularized AE with gradient penalty (RAE_GP) | [![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/clementchadebec/benchmark_VAE/blob/main/examples/notebooks/models_training/rae_gp_training.ipynb) | [link](https://arxiv.org/abs/1903.12436) | [link](https://github.com/ParthaEth/Regularized_autoencoders-RAE-/tree/master/) | | Riemannian Hamiltonian VAE (RHVAE) | [![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/clementchadebec/benchmark_VAE/blob/main/examples/notebooks/models_training/rhvae_training.ipynb) | [link](https://arxiv.org/abs/2105.00026) | [link](https://github.com/clementchadebec/pyraug)| +| Hierarchical Residual Quantization (HRQVAE) | [![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/clementchadebec/benchmark_VAE/blob/main/examples/notebooks/models_training/hrqvae_training.ipynb) | [link](https://aclanthology.org/2022.acl-long.178/) | [link](https://github.com/tomhosking/hrq-vae)| **See [reconstruction](#Reconstruction) and [generation](#Generation) results for all aforementionned models** diff --git a/examples/notebooks/models_training/hrqvae_training.ipynb b/examples/notebooks/models_training/hrqvae_training.ipynb new file mode 100644 index 00000000..dc71a51b --- /dev/null +++ b/examples/notebooks/models_training/hrqvae_training.ipynb @@ -0,0 +1,4125 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": 1, + "metadata": {}, + "outputs": [], + "source": [ + "# Install the library\n", + "%pip install pythae" + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "metadata": {}, + "outputs": [], + "source": [ + "import torch\n", + "import torchvision.datasets as datasets\n", + "\n", + "device = \"cuda\" if torch.cuda.is_available() else \"cpu\"\n", + "\n", + "%load_ext autoreload\n", + "%autoreload 2" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [], + "source": [ + "mnist_trainset = datasets.MNIST(root='../../data', train=True, download=True, transform=None)\n", + "\n", + "train_dataset = mnist_trainset.data[:-50000].reshape(-1, 1, 28, 28) / 255.\n", + "eval_dataset = mnist_trainset.data[-10000:].reshape(-1, 1, 28, 28) / 255." + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": {}, + "outputs": [], + "source": [ + "from pythae.models import HRQVAE, HRQVAEConfig\n", + "from pythae.trainers import BaseTrainerConfig\n", + "from pythae.pipelines.training import TrainingPipeline\n", + "from pythae.models.nn.benchmarks.mnist.resnets import Encoder_ResNet_HRQVAE_MNIST, Decoder_ResNet_HRQVAE_MNIST" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "metadata": {}, + "outputs": [], + "source": [ + "config = BaseTrainerConfig(\n", + " output_dir='my_model',\n", + " learning_rate=1e-4,\n", + " per_device_train_batch_size=64,\n", + " per_device_eval_batch_size=64,\n", + " num_epochs=100, # Change this to train the model a bit more\n", + ")\n", + "\n", + "\n", + "model_config = HRQVAEConfig(\n", + " latent_dim=128,\n", + " input_dim=(1, 28, 28),\n", + " num_embeddings=10,\n", + " num_levels = 3,\n", + " kl_weight = 0.01,\n", + " init_scale = 2.0,\n", + " init_decay_weight = 0.5,\n", + " norm_loss_weight = 1.0, # 0.5\n", + " norm_loss_scale = 1.5,\n", + " temp_schedule_gamma=10,\n", + " depth_drop_rate = 0.1,\n", + ")\n", + "\n", + "model = HRQVAE(\n", + " model_config=model_config,\n", + " encoder=Encoder_ResNet_HRQVAE_MNIST(model_config), \n", + " decoder=Decoder_ResNet_HRQVAE_MNIST(model_config) \n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "metadata": {}, + "outputs": [], + "source": [ + "pipeline = TrainingPipeline(\n", + " training_config=config,\n", + " model=model\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "Preprocessing train data...\n", + "Checking train dataset...\n", + "Preprocessing eval data...\n", + "\n", + "Checking eval dataset...\n", + "Using Base Trainer\n", + "\n", + "Model passed sanity check !\n", + "Ready for training.\n", + "\n", + "Created my_model folder since did not exist.\n", + "\n", + "Created my_model/HRQVAE_training_2024-04-05_09-25-49. \n", + "Training config, checkpoints and final model will be saved here.\n", + "\n", + "Training params:\n", + " - max_epochs: 100\n", + " - per_device_train_batch_size: 64\n", + " - per_device_eval_batch_size: 64\n", + " - checkpoint saving every: None\n", + "Optimizer: Adam (\n", + "Parameter Group 0\n", + " amsgrad: False\n", + " betas: (0.9, 0.999)\n", + " capturable: False\n", + " differentiable: False\n", + " eps: 1e-08\n", + " foreach: None\n", + " fused: None\n", + " lr: 0.0001\n", + " maximize: False\n", + " weight_decay: 0\n", + ")\n", + "Scheduler: None\n", + "\n", + "Successfully launched training !\n", + "\n" + ] + }, + { + "data": { + "application/vnd.jupyter.widget-view+json": { + "model_id": "d501d8fe00884236b5bbef59c0a552bb", + "version_major": 2, + "version_minor": 0 + }, + "text/plain": [ + "Training of epoch 1/100: 0%| | 0/157 [00:00" + ] + }, + "metadata": {}, + "output_type": "display_data" + } + ], + "source": [ + "import matplotlib.pyplot as plt\n", + "\n", + "# show reconstructions\n", + "fig, axes = plt.subplots(nrows=5, ncols=5, figsize=(10, 10))\n", + "\n", + "for i in range(5):\n", + " for j in range(5):\n", + " axes[i][j].imshow(reconstructions[i*5 + j].cpu().squeeze(0), cmap='gray')\n", + " axes[i][j].axis('off')\n", + "plt.tight_layout(pad=0.)" + ] + }, + { + "cell_type": "code", + "execution_count": 11, + "metadata": {}, + "outputs": [ + { + "data": { + "image/png": "", + "text/plain": [ + "
" + ] + }, + "metadata": {}, + "output_type": "display_data" + } + ], + "source": [ + "# show the true data\n", + "fig, axes = plt.subplots(nrows=5, ncols=5, figsize=(10, 10))\n", + "\n", + "for i in range(5):\n", + " for j in range(5):\n", + " axes[i][j].imshow(eval_dataset[i*5 +j].cpu().squeeze(0), cmap='gray')\n", + " axes[i][j].axis('off')\n", + "plt.tight_layout(pad=0.)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Visualizing interpolations" + ] + }, + { + "cell_type": "code", + "execution_count": 12, + "metadata": {}, + "outputs": [], + "source": [ + "interpolations = trained_model.interpolate(eval_dataset[:5].to(device), eval_dataset[5:10].to(device), granularity=10).detach().cpu()" + ] + }, + { + "cell_type": "code", + "execution_count": 13, + "metadata": {}, + "outputs": [ + { + "data": { + "image/png": "", + "text/plain": [ + "
" + ] + }, + "metadata": {}, + "output_type": "display_data" + } + ], + "source": [ + "# show interpolations\n", + "fig, axes = plt.subplots(nrows=5, ncols=10, figsize=(10, 5))\n", + "\n", + "for i in range(5):\n", + " for j in range(10):\n", + " axes[i][j].imshow(interpolations[i, j].cpu().squeeze(0), cmap='gray')\n", + " axes[i][j].axis('off')\n", + "plt.tight_layout(pad=0.)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "interpreter": { + "hash": "95022f601a219c6b6d093149c9a9b9a061a4446d3680d89cef8a1f82970031f2" + }, + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.10.14" + } + }, + "nbformat": 4, + "nbformat_minor": 4 +} diff --git a/src/pythae/models/__init__.py b/src/pythae/models/__init__.py index ba45191e..40284ac6 100755 --- a/src/pythae/models/__init__.py +++ b/src/pythae/models/__init__.py @@ -24,6 +24,7 @@ from .disentangled_beta_vae import DisentangledBetaVAE, DisentangledBetaVAEConfig from .factor_vae import FactorVAE, FactorVAEConfig from .hvae import HVAE, HVAEConfig +from .hrq_vae import HRQVAE, HRQVAEConfig from .info_vae import INFOVAE_MMD, INFOVAE_MMD_Config from .iwae import IWAE, IWAEConfig from .miwae import MIWAE, MIWAEConfig @@ -96,4 +97,6 @@ "MIWAEConfig", "PIWAE", "PIWAEConfig", + "HRQVAE", + "HRQVAEConfig", ] diff --git a/src/pythae/models/auto_model/auto_config.py b/src/pythae/models/auto_model/auto_config.py index 3a2d55a2..fae89d18 100644 --- a/src/pythae/models/auto_model/auto_config.py +++ b/src/pythae/models/auto_model/auto_config.py @@ -60,6 +60,11 @@ def from_json_file(cls, json_path): model_config = HVAEConfig.from_json_file(json_path) + elif config_name == "HRQVAEConfig": + from ..hrq_vae import HRQVAEConfig + + model_config = HRQVAEConfig.from_json_file(json_path) + elif config_name == "INFOVAE_MMD_Config": from ..info_vae import INFOVAE_MMD_Config diff --git a/src/pythae/models/auto_model/auto_model.py b/src/pythae/models/auto_model/auto_model.py index 7d875c0d..d103ac29 100644 --- a/src/pythae/models/auto_model/auto_model.py +++ b/src/pythae/models/auto_model/auto_model.py @@ -74,6 +74,11 @@ def load_from_folder(cls, dir_path: str): model = HVAE.load_from_folder(dir_path=dir_path) + elif model_name == "HRQVAEConfig": + from ..hrq_vae import HRQVAE + + model = HRQVAE.load_from_folder(dir_path=dir_path) + elif model_name == "INFOVAE_MMD_Config": from ..info_vae import INFOVAE_MMD diff --git a/src/pythae/models/hrq_vae/__init__.py b/src/pythae/models/hrq_vae/__init__.py new file mode 100755 index 00000000..770b4a93 --- /dev/null +++ b/src/pythae/models/hrq_vae/__init__.py @@ -0,0 +1,19 @@ +"""This module is the implementation of the Vector Quantized VAE proposed in +(https://arxiv.org/abs/1711.00937). + +Available samplers +------------------- + +Normalizing flows sampler to come. + +.. autosummary:: + ~pythae.samplers.GaussianMixtureSampler + ~pythae.samplers.MAFSampler + ~pythae.samplers.IAFSampler + :nosignatures: +""" + +from .hrq_vae_config import HRQVAEConfig +from .hrq_vae_model import HRQVAE + +__all__ = ["HRQVAE", "HRQVAEConfig"] diff --git a/src/pythae/models/hrq_vae/hrq_vae_config.py b/src/pythae/models/hrq_vae/hrq_vae_config.py new file mode 100755 index 00000000..51aa0131 --- /dev/null +++ b/src/pythae/models/hrq_vae/hrq_vae_config.py @@ -0,0 +1,37 @@ +from pydantic.dataclasses import dataclass +from typing import Optional + +from ..ae import AEConfig + + +@dataclass +class HRQVAEConfig(AEConfig): + r""" + Hierarchical Residual Quantization VAE model config config class + + Parameters: + input_dim (tuple): The input_data dimension. + latent_dim (int): The latent space dimension. Default: 10. + num_embedding (int): The number of embedding points. Default: 64 + num_levels (int): Depth of hierarchy. Default: 4 + kl_weight (float): Weighting of the KL term. Default: 0.1 + init_scale (float): Magnitude of the embedding initialisation, should roughly match the encoder. Default: 1.0 + init_decay_weight (float): Factor by which the magnitude of each successive levels is multiplied. Default: 1.5 + norm_loss_weight (float): Weighting of the norm loss term. Default: 0.5 + norm_loss_scale (float): Scale for the norm loss. Default: 1.5 + temp_schedule_gamma (float): Decay constant for the Gumbel temperature - will be (epoch/gamma). Default: 33.333 + depth_drop_rate (float): Probability of dropping each level during training. Default: 0.1 + """ + + num_embeddings: int = 64 + num_levels: int = 4 + kl_weight: float = 0.1 + init_scale: float = 1.0 + init_decay_weight: float = 0.5 + norm_loss_weight: Optional[float] = 0.5 + norm_loss_scale: float = 1.5 + temp_schedule_gamma: float = 33.333 + depth_drop_rate: float = 0.1 + + def __post_init__(self): + super().__post_init__() diff --git a/src/pythae/models/hrq_vae/hrq_vae_model.py b/src/pythae/models/hrq_vae/hrq_vae_model.py new file mode 100755 index 00000000..59459335 --- /dev/null +++ b/src/pythae/models/hrq_vae/hrq_vae_model.py @@ -0,0 +1,128 @@ +from typing import Optional, Any, Dict + +import torch +import torch.nn.functional as F + +from ...data.datasets import BaseDataset +from ..ae import AE +from ..base.base_utils import ModelOutput +from ..nn import BaseDecoder, BaseEncoder +from .hrq_vae_config import HRQVAEConfig +from .hrq_vae_utils import HierarchicalResidualQuantizer + + +class HRQVAE(AE): + r""" + Hierarchical Residual Quantization-VAE model. Introduced in https://aclanthology.org/2022.acl-long.178/ (Hosking et al., ACL 2022) + + Args: + model_config (HRQVAEConfig): The Variational Autoencoder configuration setting the main + parameters of the model. + + encoder (BaseEncoder): An instance of BaseEncoder (inheriting from `torch.nn.Module` which + plays the role of encoder. This argument allows you to use your own neural networks + architectures if desired. If None is provided, a simple Multi Layer Preception + (https://en.wikipedia.org/wiki/Multilayer_perceptron) is used. Default: None. + + decoder (BaseDecoder): An instance of BaseDecoder (inheriting from `torch.nn.Module` which + plays the role of encoder. This argument allows you to use your own neural networks + architectures if desired. If None is provided, a simple Multi Layer Preception + (https://en.wikipedia.org/wiki/Multilayer_perceptron) is used. Default: None. + + """ + + def __init__( + self, + model_config: HRQVAEConfig, + encoder: Optional[BaseEncoder] = None, + decoder: Optional[BaseDecoder] = None, + ): + AE.__init__(self, model_config=model_config, encoder=encoder, decoder=decoder) + + self._set_quantizer(model_config) + + self.model_name = "HRQVAE" + + def _set_quantizer(self, model_config): + if model_config.input_dim is None: + raise AttributeError( + "No input dimension provided !" + "'input_dim' parameter of HRQVAEConfig instance must be set to 'data_shape' where " + "the shape of the data is (C, H, W ..). Unable to set quantizer." + ) + + x = torch.randn((2,) + self.model_config.input_dim) + z = self.encoder(x).embedding + if len(z.shape) == 2: + z = z.reshape(z.shape[0], 1, 1, -1) + + z = z.permute(0, 2, 3, 1) + + self.model_config.embedding_dim = z.shape[-1] + + self.quantizer = HierarchicalResidualQuantizer(model_config=model_config) + + def forward(self, inputs: Dict[str, Any], **kwargs) -> ModelOutput: + """ + The VAE model + + Args: + inputs (dict): A dict of samples + + Returns: + ModelOutput: An instance of ModelOutput containing all the relevant parameters + + """ + + x = inputs["data"] + uses_ddp = kwargs.pop("uses_ddp", False) + epoch = kwargs.pop("epoch", 0) + + encoder_output = self.encoder(x) + + embeddings = encoder_output.embedding + + reshape_for_decoding = False + + if len(embeddings.shape) == 2: + embeddings = embeddings.reshape(embeddings.shape[0], 1, 1, -1) + reshape_for_decoding = True + + embeddings = embeddings.permute(0, 2, 3, 1) + + quantizer_output = self.quantizer(embeddings, epoch=epoch, uses_ddp=uses_ddp) + + quantized_embed = quantizer_output.quantized_vector + + if reshape_for_decoding: + quantized_embed = quantized_embed.reshape(embeddings.shape[0], -1) + + recon_x = self.decoder(quantized_embed).reconstruction + + loss, recon_loss, hrq_loss = self.loss_function(recon_x, x, quantizer_output) + + output = ModelOutput( + loss=loss, + recon_loss=recon_loss, + hrq_loss=hrq_loss, + recon_x=recon_x, + z=quantized_embed, + z_orig=quantizer_output.z_orig, + quantized_indices=quantizer_output.quantized_indices, + probs=quantizer_output.probs, + ) + + return output + + def loss_function(self, recon_x, x, quantizer_output): + recon_loss = F.mse_loss( + recon_x.reshape(x.shape[0], -1), x.reshape(x.shape[0], -1), reduction="none" + ).sum(dim=-1) + + hrq_loss = quantizer_output.loss + + return ( + (recon_loss + hrq_loss).mean(dim=0), + recon_loss.mean(dim=0), + hrq_loss.mean(dim=0), + ) diff --git a/src/pythae/models/hrq_vae/hrq_vae_utils.py b/src/pythae/models/hrq_vae/hrq_vae_utils.py new file mode 100755 index 00000000..346c68ea --- /dev/null +++ b/src/pythae/models/hrq_vae/hrq_vae_utils.py @@ -0,0 +1,153 @@ +import torch +import torch.distributed as dist +import torch.nn as nn +import torch.nn.functional as F +from math import sqrt + +from ..base.base_utils import ModelOutput +from .hrq_vae_config import HRQVAEConfig + + +class HierarchicalResidualQuantizer(nn.Module): + def __init__(self, model_config: HRQVAEConfig): + nn.Module.__init__(self) + + self.model_config = model_config + + self.embedding_dim = model_config.embedding_dim + self.num_embeddings = model_config.num_embeddings + self.num_levels = model_config.num_levels + + self.embeddings = nn.ModuleList( + [ + nn.Embedding(self.num_embeddings, self.embedding_dim) + for hix in range(self.num_levels) + ] + ) + + init_scale = model_config.init_scale + init_decay_weight = model_config.init_decay_weight + for hix, embedding in enumerate(self.embeddings): + scale = init_scale * init_decay_weight**hix / sqrt(self.embedding_dim) + embedding.weight.data.uniform_(-1.0 * scale, scale) + + # Normalise onto sphere + embedding.weight.data = ( + embedding.weight.data + / torch.linalg.vector_norm(embedding.weight, dim=1, keepdim=True) + * init_scale + * init_decay_weight**hix + ) + + def forward(self, z: torch.Tensor, epoch: int, uses_ddp: bool = False): + if uses_ddp: + raise Exception("HRQVAE doesn't currently support DDP :(") + + input_shape = z.shape + + z = z.reshape(-1, self.embedding_dim) + + loss = torch.zeros(z.shape[0]).to(z.device) + + resid_error = z + + quantized = [] + codes = [] + all_probs = [] + + for head_ix, embedding in enumerate(self.embeddings): + if head_ix > 0: + resid_error = z - torch.sum(torch.cat(quantized, dim=1), dim=1) + + distances = -1.0 * ( + torch.sum(resid_error**2, dim=-1, keepdim=True) + + torch.sum(embedding.weight**2, dim=-1) + - 2 * torch.matmul(resid_error, embedding.weight.T) + ) + + gumbel_sched_weight = torch.exp( + -torch.tensor(float(epoch)) + / float(self.model_config.temp_schedule_gamma * 1.5**head_ix) + ) + gumbel_temp = max(gumbel_sched_weight, 0.5) + + if self.training: + + sample_onehot = F.gumbel_softmax( + distances, tau=gumbel_temp, hard=True, dim=-1 + ) + else: + indices = torch.argmax(distances, dim=-1) + sample_onehot = F.one_hot( + indices, num_classes=self.num_embeddings + ).float() + + probs = F.softmax(distances / gumbel_temp, dim=-1) + + # KL loss + prior = ( + torch.ones_like(distances).detach() + / torch.ones_like(distances).sum(-1, keepdim=True).detach() + ) + kl_loss = torch.nn.KLDivLoss(reduction="none") + kl = kl_loss(nn.functional.log_softmax(distances, dim=-1), prior).sum( + dim=-1 + ) + loss += kl * self.model_config.kl_weight + + # quantization + this_quantized = sample_onehot @ embedding.weight + this_quantized = this_quantized.reshape_as(z) + + quantized.append(this_quantized.unsqueeze(-2)) + codes.append(torch.argmax(sample_onehot, dim=-1).unsqueeze(-1)) + all_probs.append(probs.unsqueeze(-2)) + + quantized = torch.cat(quantized, dim=-2) + quantized_indices = torch.cat(codes, dim=-1) + all_probs = torch.cat(all_probs, dim=-2) + + # Calculate the norm loss + if self.model_config.norm_loss_weight is not None: + upper_norms = torch.linalg.vector_norm(quantized[:, :-1, :], dim=-1) + lower_norms = torch.linalg.vector_norm(quantized[:, 1:, :], dim=-1) + norm_loss = ( + torch.max( + lower_norms / upper_norms * self.model_config.norm_loss_scale, + torch.ones_like(lower_norms), + ) + - 1.0 + ) ** 2 + + loss += norm_loss.mean(dim=1) * self.model_config.norm_loss_weight + + # Depth drop out + if self.training: + + drop_dist = torch.distributions.Bernoulli( + 1 - self.model_config.depth_drop_rate + ) + + mask = drop_dist.sample(sample_shape=(*quantized.shape[:-1], 1)) + + mask = torch.cumprod(mask, dim=1).to(quantized.device) + quantized = quantized * mask + + quantized = quantized.sum(dim=-2).reshape(*input_shape) + quantized = quantized.permute(0, 3, 1, 2) + + loss = loss.reshape(input_shape[0], -1).mean(dim=1) + + quantized_indices = quantized_indices.reshape( + *input_shape[:-1], self.num_levels + ) + + output = ModelOutput( + z_orig=z, + quantized_vector=quantized, + quantized_indices=quantized_indices, + loss=loss, + probs=all_probs, + ) + + return output diff --git a/src/pythae/models/nn/benchmarks/mnist/resnets.py b/src/pythae/models/nn/benchmarks/mnist/resnets.py index 4176f5a9..d7afc0ea 100644 --- a/src/pythae/models/nn/benchmarks/mnist/resnets.py +++ b/src/pythae/models/nn/benchmarks/mnist/resnets.py @@ -578,6 +578,117 @@ def forward(self, x: torch.Tensor, output_layer_levels: List[int] = None): return output +class Encoder_ResNet_HRQVAE_MNIST(BaseEncoder): + """ + A ResNet encoder suited for MNIST and VQ- or HRQ- VAE models. It differs from the VQVAE ResNet in that it outputs only a single embedding vector. + + It can be built as follows: + + .. code-block:: + + >>> from pythae.models.nn.benchmarks.mnist import Encoder_ResNet_HRQVAE_MNIST + >>> from pythae.models import HRQVAEConfig + >>> model_config = HRQVAEConfig(input_dim=(1, 28, 28), latent_dim=16) + >>> encoder = Encoder_ResNet_HRQVAE_MNIST(model_config) + + + and then passed to a :class:`pythae.models` instance + + >>> from pythae.models import HRQVAE + >>> model = HRQVAE(model_config=model_config, encoder=encoder) + >>> model.encoder == encoder + ... True + + .. note:: + + Please note that this encoder is only suitable for Autoencoder based models since it only + outputs the embeddings of the input data under the key `embedding`. + + .. code-block:: + + >>> import torch + >>> input = torch.rand(2, 1, 28, 28) + >>> out = encoder(input) + >>> out.embedding.shape + ... torch.Size([2, 16, 1, 1]) + + """ + + def __init__(self, args: BaseAEConfig): + BaseEncoder.__init__(self) + + self.input_dim = (1, 28, 28) + self.latent_dim = args.latent_dim + self.n_channels = 1 + + layers = nn.ModuleList() + + layers.append(nn.Sequential(nn.Conv2d(self.n_channels, 64, 4, 2, padding=1))) + + layers.append(nn.Sequential(nn.Conv2d(64, 128, 4, 2, padding=1))) + + layers.append(nn.Sequential(nn.Conv2d(128, 128, 3, 2, padding=1))) + + layers.append( + nn.Sequential( + ResBlock(in_channels=128, out_channels=32), + ResBlock(in_channels=128, out_channels=32), + ) + ) + + # Additional Conv layer to squeeze down to a single embedding + # layers.append(nn.Sequential(nn.Conv2d(128, 128, 4, 1, padding=0))) + + self.layers = layers + self.depth = len(layers) + + self.pre_qantized = nn.Conv2d(128, self.latent_dim, 4, 1) + + def forward(self, x: torch.Tensor, output_layer_levels: List[int] = None): + """Forward method + + Args: + x (torch.Tensor): A batch of inputs. + output_layer_levels (List[int]): The levels of the layers where the outputs are + extracted. If None, the last layer's output is returned. Default: None. + + Returns: + ModelOutput: An instance of ModelOutput containing the embeddings of the input data + under the key `embedding`. Optional: The outputs of the layers specified in + `output_layer_levels` arguments are available under the keys `embedding_layer_i` where + i is the layer's level.""" + output = ModelOutput() + + max_depth = self.depth + + if output_layer_levels is not None: + assert all( + self.depth >= levels > 0 or levels == -1 + for levels in output_layer_levels + ), ( + f"Cannot output layer deeper than depth ({self.depth})." + f"Got ({output_layer_levels})." + ) + + if -1 in output_layer_levels: + max_depth = self.depth + else: + max_depth = max(output_layer_levels) + + out = x + + for i in range(max_depth): + out = self.layers[i](out) + + if output_layer_levels is not None: + if i + 1 in output_layer_levels: + output[f"embedding_layer_{i+1}"] = out + if i + 1 == self.depth: + output["embedding"] = self.pre_qantized(out) + + return output + + class Decoder_ResNet_AE_MNIST(BaseDecoder): """ A ResNet decoder suited for MNIST and Autoencoder-based @@ -884,3 +995,122 @@ def forward(self, z: torch.Tensor, output_layer_levels: List[int] = None): output["reconstruction"] = out return output + + +class Decoder_ResNet_HRQVAE_MNIST(BaseDecoder): + """ + A ResNet decoder suited for MNIST and VQ- or HRQ- VAE models. It differs from the VQVAE ResNet in that it expects only a single embedding vector as input. + + .. code-block:: + + >>> from pythae.models.nn.benchmarks.mnist import Decoder_ResNet_HRQVAE_MNIST + >>> from pythae.models import HRQVAEConfig + >>> model_config = HRQVAEConfig(input_dim=(1, 28, 28), latent_dim=16) + >>> decoder = Decoder_ResNet_HRQVAE_MNIST(model_config) + + + and then passed to a :class:`pythae.models` instance + + >>> from pythae.models import HRQVAE + >>> model = HRQVAE(model_config=model_config, decoder=decoder) + >>> model.decoder == decoder + ... True + + .. note:: + + Please note that this decoder is suitable for **all** models. + + .. code-block:: + + >>> import torch + >>> input = torch.randn(2, 16, 1, 1) + >>> out = decoder(input) + >>> out.reconstruction.shape + ... torch.Size([2, 1, 28, 28]) + """ + + def __init__(self, args: BaseAEConfig): + BaseDecoder.__init__(self) + + self.input_dim = (1, 28, 28) + self.latent_dim = args.latent_dim + self.n_channels = 1 + + layers = nn.ModuleList() + + layers.append(nn.ConvTranspose2d(self.latent_dim, 128, 4, 1)) + + layers.append(nn.ConvTranspose2d(128, 128, 3, 2, padding=1)) + + layers.append( + nn.Sequential( + ResBlock(in_channels=128, out_channels=32), + ResBlock(in_channels=128, out_channels=32), + nn.ReLU(), + ) + ) + + layers.append( + nn.Sequential( + nn.ConvTranspose2d(128, 64, 3, 2, padding=1, output_padding=1), + nn.ReLU(), + ) + ) + + layers.append( + nn.Sequential( + nn.ConvTranspose2d( + 64, self.n_channels, 3, 2, padding=1, output_padding=1 + ), + nn.Sigmoid(), + ) + ) + + self.layers = layers + self.depth = len(layers) + + def forward(self, z: torch.Tensor, output_layer_levels: List[int] = None): + """Forward method + + Args: + z (torch.Tensor): A batch of embeddings. + output_layer_levels (List[int]): The levels of the layers where the outputs are + extracted. If None, the last layer's output is returned. Default: None. + + Returns: + ModelOutput: An instance of ModelOutput containing the reconstruction of the latent code + under the key `reconstruction`. Optional: The outputs of the layers specified in + `output_layer_levels` arguments are available under the keys `reconstruction_layer_i` + where i is the layer's level. + """ + output = ModelOutput() + + max_depth = self.depth + + if output_layer_levels is not None: + assert all( + self.depth >= levels > 0 or levels == -1 + for levels in output_layer_levels + ), ( + f"Cannot output layer deeper than depth ({self.depth})." + f"Got ({output_layer_levels})" + ) + + if -1 in output_layer_levels: + max_depth = self.depth + else: + max_depth = max(output_layer_levels) + + out = z + + for i in range(max_depth): + out = self.layers[i](out) + + if output_layer_levels is not None: + if i + 1 in output_layer_levels: + output[f"reconstruction_layer_{i+1}"] = out + + if i + 1 == self.depth: + output["reconstruction"] = out + + return output diff --git a/tests/test_HRQVAE.py b/tests/test_HRQVAE.py new file mode 100755 index 00000000..262dc715 --- /dev/null +++ b/tests/test_HRQVAE.py @@ -0,0 +1,760 @@ +import os +from copy import deepcopy + +import pytest +import torch +from pydantic import ValidationError + +from pythae.customexception import BadInheritanceError +from pythae.models import HRQVAE, AutoModel, HRQVAEConfig +from pythae.models.base.base_utils import ModelOutput +from pythae.models.vq_vae.vq_vae_utils import Quantizer, QuantizerEMA +from pythae.pipelines import GenerationPipeline, TrainingPipeline +from pythae.samplers import PixelCNNSamplerConfig +from pythae.trainers import BaseTrainer, BaseTrainerConfig +from tests.data.custom_architectures import ( + Decoder_AE_Conv, + Encoder_AE_Conv, + NetBadInheritance, +) + +PATH = os.path.dirname(os.path.abspath(__file__)) + + +@pytest.fixture(params=[HRQVAEConfig(), HRQVAEConfig(latent_dim=4)]) +def model_configs_no_input_dim(request): + return request.param + + +@pytest.fixture( + params=[ + HRQVAEConfig( + input_dim=(1, 28, 28), latent_dim=16, num_embeddings=10 + ), # ! Needs squared latent_dim ! + HRQVAEConfig( + input_dim=(1, 28, 28), + latent_dim=16, + num_embeddings=10, + num_levels = 8, + kl_weight = 0.01, + init_scale = 1.0, + init_decay_weight = 0.5, + norm_loss_weight = 0.5, + norm_loss_scale = 1.5, + temp_schedule_gamma=10, + ), + ] +) +def model_configs(request): + return request.param + + +@pytest.fixture +def custom_encoder(model_configs): + return Encoder_AE_Conv(model_configs) + + +@pytest.fixture +def custom_decoder(model_configs): + return Decoder_AE_Conv(model_configs) + + +class Test_Model_Building: + @pytest.fixture() + def bad_net(self): + return NetBadInheritance() + + def test_build_model(self, model_configs): + model = HRQVAE(model_configs) + assert all( + [ + model.input_dim == model_configs.input_dim, + model.latent_dim == model_configs.latent_dim, + ] + ) + + + def build_quantizer(self, model_configs): + model = HRQVAE(model_configs) + + if model.use_ema: + assert isinstance(model.quantizer, QuantizerEMA) + + else: + assert isinstance(model.quantizer, Quantizer) + + def test_raises_bad_inheritance(self, model_configs, bad_net): + with pytest.raises(BadInheritanceError): + model = HRQVAE(model_configs, encoder=bad_net) + + with pytest.raises(BadInheritanceError): + model = HRQVAE(model_configs, decoder=bad_net) + + def test_raises_no_input_dim( + self, model_configs_no_input_dim, custom_encoder, custom_decoder + ): + with pytest.raises(AttributeError): + model = HRQVAE(model_configs_no_input_dim) + + with pytest.raises(AttributeError): + model = HRQVAE(model_configs_no_input_dim, encoder=custom_encoder) + + with pytest.raises(AttributeError): + model = HRQVAE(model_configs_no_input_dim, decoder=custom_decoder) + + def test_build_custom_arch(self, model_configs, custom_encoder, custom_decoder): + model = HRQVAE(model_configs, encoder=custom_encoder, decoder=custom_decoder) + + assert model.encoder == custom_encoder + assert not model.model_config.uses_default_encoder + assert model.decoder == custom_decoder + assert not model.model_config.uses_default_decoder + + model = HRQVAE(model_configs, encoder=custom_encoder) + + assert model.encoder == custom_encoder + assert not model.model_config.uses_default_encoder + assert model.model_config.uses_default_decoder + + model = HRQVAE(model_configs, decoder=custom_decoder) + + assert model.model_config.uses_default_encoder + assert model.decoder == custom_decoder + assert not model.model_config.uses_default_decoder + + +class Test_Model_Saving: + def test_default_model_saving(self, tmpdir, model_configs): + tmpdir.mkdir("dummy_folder") + dir_path = dir_path = os.path.join(tmpdir, "dummy_folder") + + model = HRQVAE(model_configs) + + model.state_dict()["encoder.layers.0.0.weight"][0] = 0 + + model.save(dir_path=dir_path) + + assert set(os.listdir(dir_path)) == set( + ["model_config.json", "model.pt", "environment.json"] + ) + + # reload model + model_rec = AutoModel.load_from_folder(dir_path) + + # check configs are the same + assert model_rec.model_config.__dict__ == model.model_config.__dict__ + + assert all( + [ + torch.equal(model_rec.state_dict()[key], model.state_dict()[key]) + for key in model.state_dict().keys() + ] + ) + + def test_custom_encoder_model_saving(self, tmpdir, model_configs, custom_encoder): + tmpdir.mkdir("dummy_folder") + dir_path = dir_path = os.path.join(tmpdir, "dummy_folder") + + model = HRQVAE(model_configs, encoder=custom_encoder) + + model.state_dict()["encoder.layers.0.0.weight"][0] = 0 + + model.save(dir_path=dir_path) + + assert set(os.listdir(dir_path)) == set( + ["model_config.json", "model.pt", "encoder.pkl", "environment.json"] + ) + + # reload model + model_rec = AutoModel.load_from_folder(dir_path) + + # check configs are the same + assert model_rec.model_config.__dict__ == model.model_config.__dict__ + + assert all( + [ + torch.equal(model_rec.state_dict()[key], model.state_dict()[key]) + for key in model.state_dict().keys() + ] + ) + + def test_custom_decoder_model_saving(self, tmpdir, model_configs, custom_decoder): + tmpdir.mkdir("dummy_folder") + dir_path = dir_path = os.path.join(tmpdir, "dummy_folder") + + model = HRQVAE(model_configs, decoder=custom_decoder) + + model.state_dict()["encoder.layers.0.0.weight"][0] = 0 + + model.save(dir_path=dir_path) + + assert set(os.listdir(dir_path)) == set( + ["model_config.json", "model.pt", "decoder.pkl", "environment.json"] + ) + + # reload model + model_rec = AutoModel.load_from_folder(dir_path) + + # check configs are the same + assert model_rec.model_config.__dict__ == model.model_config.__dict__ + + assert all( + [ + torch.equal(model_rec.state_dict()[key], model.state_dict()[key]) + for key in model.state_dict().keys() + ] + ) + + def test_full_custom_model_saving( + self, tmpdir, model_configs, custom_encoder, custom_decoder + ): + tmpdir.mkdir("dummy_folder") + dir_path = dir_path = os.path.join(tmpdir, "dummy_folder") + + model = HRQVAE(model_configs, encoder=custom_encoder, decoder=custom_decoder) + + model.state_dict()["encoder.layers.0.0.weight"][0] = 0 + + model.save(dir_path=dir_path) + + assert set(os.listdir(dir_path)) == set( + [ + "model_config.json", + "model.pt", + "encoder.pkl", + "decoder.pkl", + "environment.json", + ] + ) + + # reload model + model_rec = AutoModel.load_from_folder(dir_path) + + # check configs are the same + assert model_rec.model_config.__dict__ == model.model_config.__dict__ + + assert all( + [ + torch.equal(model_rec.state_dict()[key], model.state_dict()[key]) + for key in model.state_dict().keys() + ] + ) + + def test_raises_missing_files( + self, tmpdir, model_configs, custom_encoder, custom_decoder + ): + tmpdir.mkdir("dummy_folder") + dir_path = dir_path = os.path.join(tmpdir, "dummy_folder") + + model = HRQVAE(model_configs, encoder=custom_encoder, decoder=custom_decoder) + + model.state_dict()["encoder.layers.0.0.weight"][0] = 0 + + model.save(dir_path=dir_path) + + os.remove(os.path.join(dir_path, "decoder.pkl")) + + # check raises decoder.pkl is missing + with pytest.raises(FileNotFoundError): + model_rec = AutoModel.load_from_folder(dir_path) + + os.remove(os.path.join(dir_path, "encoder.pkl")) + + # check raises encoder.pkl is missing + with pytest.raises(FileNotFoundError): + model_rec = AutoModel.load_from_folder(dir_path) + + os.remove(os.path.join(dir_path, "model.pt")) + + # check raises encoder.pkl is missing + with pytest.raises(FileNotFoundError): + model_rec = AutoModel.load_from_folder(dir_path) + + os.remove(os.path.join(dir_path, "model_config.json")) + + # check raises encoder.pkl is missing + with pytest.raises(FileNotFoundError): + model_rec = AutoModel.load_from_folder(dir_path) + + +class Test_Model_forward: + @pytest.fixture + def demo_data(self): + data = torch.load(os.path.join(PATH, "data/mnist_clean_train_dataset_sample"))[ + : + ] + return data # This is an extract of 3 data from MNIST (unnormalized) used to test custom architecture + + @pytest.fixture + def vae(self, model_configs, demo_data): + model_configs.input_dim = tuple(demo_data["data"][0].shape) + return HRQVAE(model_configs) + + def test_model_train_output(self, vae, demo_data): + vae.train() + + out = vae(demo_data) + + assert isinstance(out, ModelOutput) + + assert set( + ["loss", "recon_loss", "hrq_loss", "recon_x", "z", "z_orig", "quantized_indices", "probs"] + ) == set(out.keys()) + + print(out.z.shape, demo_data['data'].shape) + + assert out.z.shape[0] == demo_data["data"].shape[0] + assert out.recon_x.shape == demo_data["data"].shape + + +class Test_Model_interpolate: + @pytest.fixture( + params=[ + torch.rand(3, 2, 3, 1), + torch.rand(3, 2, 2), + torch.load(os.path.join(PATH, "data/mnist_clean_train_dataset_sample"))[:][ + "data" + ], + ] + ) + def demo_data(self, request): + return request.param + + @pytest.fixture() + def granularity(self): + return int(torch.randint(1, 10, (1,))) + + @pytest.fixture + def ae(self, model_configs, demo_data): + model_configs.input_dim = tuple(demo_data[0].shape) + return HRQVAE(model_configs) + + def test_interpolate(self, ae, demo_data, granularity): + with pytest.raises(AssertionError): + ae.interpolate(demo_data, demo_data[1:], granularity) + + interp = ae.interpolate(demo_data, demo_data, granularity) + + assert tuple(interp.shape) == ( + demo_data.shape[0], + granularity, + ) + (demo_data.shape[1:]) + + +class Test_Model_reconstruct: + @pytest.fixture( + params=[ + torch.rand(3, 2, 3, 1), + torch.rand(3, 2, 2), + torch.load(os.path.join(PATH, "data/mnist_clean_train_dataset_sample"))[:][ + "data" + ], + ] + ) + def demo_data(self, request): + return request.param + + @pytest.fixture + def ae(self, model_configs, demo_data): + model_configs.input_dim = tuple(demo_data[0].shape) + return HRQVAE(model_configs) + + def test_reconstruct(self, ae, demo_data): + recon = ae.reconstruct(demo_data) + assert tuple(recon.shape) == demo_data.shape + + +@pytest.mark.slow +class Test_HRQVAETraining: + @pytest.fixture + def train_dataset(self): + return torch.load(os.path.join(PATH, "data/mnist_clean_train_dataset_sample")) + + @pytest.fixture( + params=[BaseTrainerConfig(num_epochs=3, steps_saving=2, learning_rate=1e-5)] + ) + def training_configs(self, tmpdir, request): + tmpdir.mkdir("dummy_folder") + dir_path = os.path.join(tmpdir, "dummy_folder") + request.param.output_dir = dir_path + return request.param + + @pytest.fixture( + params=[ + torch.rand(1), + torch.rand(1), + torch.rand(1), + torch.rand(1), + torch.rand(1), + ] + ) + def vae(self, model_configs, custom_encoder, custom_decoder, request): + # randomized + + alpha = request.param + + if alpha < 0.25: + model = HRQVAE(model_configs) + + elif 0.25 <= alpha < 0.5: + model = HRQVAE(model_configs, encoder=custom_encoder) + + elif 0.5 <= alpha < 0.75: + model = HRQVAE(model_configs, decoder=custom_decoder) + + else: + model = HRQVAE(model_configs, encoder=custom_encoder, decoder=custom_decoder) + + return model + + @pytest.fixture + def trainer(self, vae, train_dataset, training_configs): + trainer = BaseTrainer( + model=vae, + train_dataset=train_dataset, + eval_dataset=train_dataset, + training_config=training_configs, + ) + + trainer.prepare_training() + + return trainer + + def test_vae_train_step(self, trainer): + start_model_state_dict = deepcopy(trainer.model.state_dict()) + + step_1_loss = trainer.train_step(epoch=1) + + step_1_model_state_dict = deepcopy(trainer.model.state_dict()) + + # check that weights were updated + assert not all( + [ + torch.equal(start_model_state_dict[key], step_1_model_state_dict[key]) + for key in start_model_state_dict.keys() + ] + ) + + def test_vae_eval_step(self, trainer): + start_model_state_dict = deepcopy(trainer.model.state_dict()) + + step_1_loss = trainer.eval_step(epoch=1) + + step_1_model_state_dict = deepcopy(trainer.model.state_dict()) + + # check that weights were not updated + assert all( + [ + torch.equal(start_model_state_dict[key], step_1_model_state_dict[key]) + for key in start_model_state_dict.keys() + ] + ) + + def test_vae_predict_step(self, trainer, train_dataset): + start_model_state_dict = deepcopy(trainer.model.state_dict()) + + inputs, recon, generated = trainer.predict(trainer.model) + + step_1_model_state_dict = deepcopy(trainer.model.state_dict()) + + # check that weights were not updated + assert all( + [ + torch.equal(start_model_state_dict[key], step_1_model_state_dict[key]) + for key in start_model_state_dict.keys() + ] + ) + + assert inputs.cpu() in train_dataset.data + assert recon.shape == inputs.shape + assert generated.shape == inputs.shape + + def test_vae_main_train_loop(self, trainer): + start_model_state_dict = deepcopy(trainer.model.state_dict()) + + trainer.train() + + step_1_model_state_dict = deepcopy(trainer.model.state_dict()) + + # check that weights were updated + assert not all( + [ + torch.equal(start_model_state_dict[key], step_1_model_state_dict[key]) + for key in start_model_state_dict.keys() + ] + ) + + def test_checkpoint_saving(self, vae, trainer, training_configs): + dir_path = training_configs.output_dir + + # Make a training step + step_1_loss = trainer.train_step(epoch=1) + + model = deepcopy(trainer.model) + optimizer = deepcopy(trainer.optimizer) + + trainer.save_checkpoint(dir_path=dir_path, epoch=0, model=model) + + checkpoint_dir = os.path.join(dir_path, "checkpoint_epoch_0") + + assert os.path.isdir(checkpoint_dir) + + files_list = os.listdir(checkpoint_dir) + + assert set(["model.pt", "optimizer.pt", "training_config.json"]).issubset( + set(files_list) + ) + + # check pickled custom decoder + if not vae.model_config.uses_default_decoder: + assert "decoder.pkl" in files_list + + else: + assert not "decoder.pkl" in files_list + + # check pickled custom encoder + if not vae.model_config.uses_default_encoder: + assert "encoder.pkl" in files_list + + else: + assert not "encoder.pkl" in files_list + + model_rec_state_dict = torch.load(os.path.join(checkpoint_dir, "model.pt"))[ + "model_state_dict" + ] + + assert all( + [ + torch.equal( + model_rec_state_dict[key].cpu(), model.state_dict()[key].cpu() + ) + for key in model.state_dict().keys() + ] + ) + + # check reload full model + model_rec = AutoModel.load_from_folder(os.path.join(checkpoint_dir)) + + assert all( + [ + torch.equal( + model_rec.state_dict()[key].cpu(), model.state_dict()[key].cpu() + ) + for key in model.state_dict().keys() + ] + ) + + assert type(model_rec.encoder.cpu()) == type(model.encoder.cpu()) + assert type(model_rec.decoder.cpu()) == type(model.decoder.cpu()) + + optim_rec_state_dict = torch.load(os.path.join(checkpoint_dir, "optimizer.pt")) + + assert all( + [ + dict_rec == dict_optimizer + for (dict_rec, dict_optimizer) in zip( + optim_rec_state_dict["param_groups"], + optimizer.state_dict()["param_groups"], + ) + ] + ) + + assert all( + [ + dict_rec == dict_optimizer + for (dict_rec, dict_optimizer) in zip( + optim_rec_state_dict["state"], optimizer.state_dict()["state"] + ) + ] + ) + + def test_checkpoint_saving_during_training(self, vae, trainer, training_configs): + # + target_saving_epoch = training_configs.steps_saving + + dir_path = training_configs.output_dir + + model = deepcopy(trainer.model) + + trainer.train() + + training_dir = os.path.join( + dir_path, f"HRQVAE_training_{trainer._training_signature}" + ) + assert os.path.isdir(training_dir) + + checkpoint_dir = os.path.join( + training_dir, f"checkpoint_epoch_{target_saving_epoch}" + ) + + assert os.path.isdir(checkpoint_dir) + + files_list = os.listdir(checkpoint_dir) + + # check files + assert set(["model.pt", "optimizer.pt", "training_config.json"]).issubset( + set(files_list) + ) + + # check pickled custom decoder + if not vae.model_config.uses_default_decoder: + assert "decoder.pkl" in files_list + + else: + assert not "decoder.pkl" in files_list + + # check pickled custom encoder + if not vae.model_config.uses_default_encoder: + assert "encoder.pkl" in files_list + + else: + assert not "encoder.pkl" in files_list + + model_rec_state_dict = torch.load(os.path.join(checkpoint_dir, "model.pt"))[ + "model_state_dict" + ] + + assert not all( + [ + torch.equal(model_rec_state_dict[key], model.state_dict()[key]) + for key in model.state_dict().keys() + ] + ) + + def test_final_model_saving(self, vae, trainer, training_configs): + dir_path = training_configs.output_dir + + trainer.train() + + model = deepcopy(trainer._best_model) + + training_dir = os.path.join( + dir_path, f"HRQVAE_training_{trainer._training_signature}" + ) + assert os.path.isdir(training_dir) + + final_dir = os.path.join(training_dir, f"final_model") + assert os.path.isdir(final_dir) + + files_list = os.listdir(final_dir) + + assert set(["model.pt", "model_config.json", "training_config.json"]).issubset( + set(files_list) + ) + + # check pickled custom decoder + if not vae.model_config.uses_default_decoder: + assert "decoder.pkl" in files_list + + else: + assert not "decoder.pkl" in files_list + + # check pickled custom encoder + if not vae.model_config.uses_default_encoder: + assert "encoder.pkl" in files_list + + else: + assert not "encoder.pkl" in files_list + + # check reload full model + model_rec = AutoModel.load_from_folder(os.path.join(final_dir)) + + assert all( + [ + torch.equal( + model_rec.state_dict()[key].cpu(), model.state_dict()[key].cpu() + ) + for key in model.state_dict().keys() + ] + ) + + assert type(model_rec.encoder.cpu()) == type(model.encoder.cpu()) + assert type(model_rec.decoder.cpu()) == type(model.decoder.cpu()) + + def test_vae_training_pipeline(self, vae, train_dataset, training_configs): + dir_path = training_configs.output_dir + + # build pipeline + pipeline = TrainingPipeline(model=vae, training_config=training_configs) + + # Launch Pipeline + pipeline( + train_data=train_dataset.data, # gives tensor to pipeline + eval_data=train_dataset.data, # gives tensor to pipeline + ) + + model = deepcopy(pipeline.trainer._best_model) + + training_dir = os.path.join( + dir_path, f"HRQVAE_training_{pipeline.trainer._training_signature}" + ) + assert os.path.isdir(training_dir) + + final_dir = os.path.join(training_dir, f"final_model") + assert os.path.isdir(final_dir) + + files_list = os.listdir(final_dir) + + assert set(["model.pt", "model_config.json", "training_config.json"]).issubset( + set(files_list) + ) + + # check pickled custom decoder + if not vae.model_config.uses_default_decoder: + assert "decoder.pkl" in files_list + + else: + assert not "decoder.pkl" in files_list + + # check pickled custom encoder + if not vae.model_config.uses_default_encoder: + assert "encoder.pkl" in files_list + + else: + assert not "encoder.pkl" in files_list + + # check reload full model + model_rec = AutoModel.load_from_folder(os.path.join(final_dir)) + + assert all( + [ + torch.equal( + model_rec.state_dict()[key].cpu(), model.state_dict()[key].cpu() + ) + for key in model.state_dict().keys() + ] + ) + + assert type(model_rec.encoder.cpu()) == type(model.encoder.cpu()) + assert type(model_rec.decoder.cpu()) == type(model.decoder.cpu()) + + +class Test_HRQVAE_Generation: + @pytest.fixture + def train_data(self): + return torch.load( + os.path.join(PATH, "data/mnist_clean_train_dataset_sample") + ).data + + @pytest.fixture() + def ae_model(self): + return HRQVAE(HRQVAEConfig(input_dim=(1, 28, 28), latent_dim=4)) + + @pytest.fixture(params=[PixelCNNSamplerConfig()]) + def sampler_configs(self, request): + return request.param + + @pytest.mark.skip(reason="Sampling not currently supported for HRQVAE") + def test_fits_in_generation_pipeline(self, ae_model, sampler_configs, train_data): + pipeline = GenerationPipeline(model=ae_model, sampler_config=sampler_configs) + gen_data = pipeline( + num_samples=11, + batch_size=7, + output_dir=None, + return_gen=True, + train_data=train_data, + eval_data=train_data, + training_config=BaseTrainerConfig(num_epochs=1), + ) + + assert gen_data.shape[0] == 11