diff --git a/integrations/model-training/deepspeed/notebooks/comet_deepspeed.ipynb b/integrations/model-training/deepspeed/notebooks/comet_deepspeed.ipynb new file mode 100644 index 0000000..d58ce6b --- /dev/null +++ b/integrations/model-training/deepspeed/notebooks/comet_deepspeed.ipynb @@ -0,0 +1,547 @@ +{ + "nbformat": 4, + "nbformat_minor": 0, + "metadata": { + "colab": { + "provenance": [], + "gpuType": "T4" + }, + "kernelspec": { + "name": "python3", + "display_name": "Python 3" + }, + "language_info": { + "name": "python" + }, + "accelerator": "GPU" + }, + "cells": [ + { + "cell_type": "markdown", + "source": [ + "" + ], + "metadata": { + "id": "jZFkqP0gnmum" + } + }, + { + "cell_type": "markdown", + "source": [ + "[Comet](https://www.comet.com/site/products/ml-experiment-tracking/?utm_campaign=ray_train&utm_medium=colab) is an MLOps Platform that is designed to help Data Scientists and Teams build better models faster! Comet provides tooling to track, Explain, Manage, and Monitor your models in a single place! It works with Jupyter Notebooks and Scripts and most importantly it's 100% free to get started!\n", + "\n", + "[DeepSpeed](https://github.com/microsoft/DeepSpeed) empowers ChatGPT-like model training with a single click, offering 15x speedup over SOTA RLHF systems with unprecedented cost reduction at all scales.\n", + "\n", + "Instrument your runs with Comet to start managing experiments, create dataset versions and track hyperparameters for faster and easier reproducibility and collaboration.\n", + "\n", + "[Find more information about our integration with Ray Train](https://www.comet.ml/docs/v2/integrations/ml-frameworks/deepspeed/)\n", + "\n", + "Get a preview for what's to come. Check out a completed experiment created from this notebook [here](https://www.comet.com/examples/comet-example-deepspeed-cifar/d0d057163a3c41adb7dcfe3ba764b2d0).\n", + "\n", + "This example is based on the [following DeepSpeed example](https://github.com/microsoft/DeepSpeedExamples/tree/master/training/cifar)." + ], + "metadata": { + "id": "Hcux3L04mrRP" + } + }, + { + "cell_type": "markdown", + "source": [ + "# Install dependencies" + ], + "metadata": { + "id": "AHCp_xKVnVmk" + } + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": true, + "id": "lldMccrmS1K4" + }, + "outputs": [], + "source": [ + "%pip install comet_ml deepspeed torch torchvision" + ] + }, + { + "cell_type": "markdown", + "source": [ + "# Initialize Comet" + ], + "metadata": { + "id": "ivwmX4GCnY1N" + } + }, + { + "cell_type": "code", + "source": [ + "import comet_ml\n", + "\n", + "comet_ml.init()" + ], + "metadata": { + "id": "2lkQGICnS2aO" + }, + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "markdown", + "source": [ + "# Write the training script to disk" + ], + "metadata": { + "id": "4cDxIs9RnZ9d" + } + }, + { + "cell_type": "code", + "source": [ + "%%writefile cifar10_deepspeed.py\n", + "\n", + "# coding: utf-8\n", + "import argparse\n", + "\n", + "import deepspeed\n", + "import torch\n", + "import torch.nn as nn\n", + "import torch.nn.functional as F\n", + "import torchvision\n", + "import torchvision.transforms as transforms\n", + "from deepspeed.accelerator import get_accelerator\n", + "from deepspeed.moe.utils import split_params_into_different_moe_groups_for_optimizer\n", + "\n", + "\n", + "def add_argument():\n", + " parser = argparse.ArgumentParser(description=\"CIFAR\")\n", + "\n", + " # For train.\n", + " parser.add_argument(\n", + " \"-e\",\n", + " \"--epochs\",\n", + " default=30,\n", + " type=int,\n", + " help=\"number of total epochs (default: 30)\",\n", + " )\n", + " parser.add_argument(\n", + " \"--local_rank\",\n", + " type=int,\n", + " default=-1,\n", + " help=\"local rank passed from distributed launcher\",\n", + " )\n", + " parser.add_argument(\n", + " \"--log-interval\",\n", + " type=int,\n", + " default=2000,\n", + " help=\"output logging information at a given interval\",\n", + " )\n", + "\n", + " # For mixed precision training.\n", + " parser.add_argument(\n", + " \"--dtype\",\n", + " default=\"fp16\",\n", + " type=str,\n", + " choices=[\"bf16\", \"fp16\", \"fp32\"],\n", + " help=\"Datatype used for training\",\n", + " )\n", + "\n", + " # For ZeRO Optimization.\n", + " parser.add_argument(\n", + " \"--stage\",\n", + " default=0,\n", + " type=int,\n", + " choices=[0, 1, 2, 3],\n", + " help=\"Datatype used for training\",\n", + " )\n", + "\n", + " # For MoE (Mixture of Experts).\n", + " parser.add_argument(\n", + " \"--moe\",\n", + " default=False,\n", + " action=\"store_true\",\n", + " help=\"use deepspeed mixture of experts (moe)\",\n", + " )\n", + " parser.add_argument(\n", + " \"--ep-world-size\", default=1, type=int, help=\"(moe) expert parallel world size\"\n", + " )\n", + " parser.add_argument(\n", + " \"--num-experts\",\n", + " type=int,\n", + " nargs=\"+\",\n", + " default=[\n", + " 1,\n", + " ],\n", + " help=\"number of experts list, MoE related.\",\n", + " )\n", + " parser.add_argument(\n", + " \"--mlp-type\",\n", + " type=str,\n", + " default=\"standard\",\n", + " help=\"Only applicable when num-experts > 1, accepts [standard, residual]\",\n", + " )\n", + " parser.add_argument(\n", + " \"--top-k\", default=1, type=int, help=\"(moe) gating top 1 and 2 supported\"\n", + " )\n", + " parser.add_argument(\n", + " \"--min-capacity\",\n", + " default=0,\n", + " type=int,\n", + " help=\"(moe) minimum capacity of an expert regardless of the capacity_factor\",\n", + " )\n", + " parser.add_argument(\n", + " \"--noisy-gate-policy\",\n", + " default=None,\n", + " type=str,\n", + " help=(\n", + " \"(moe) noisy gating (only supported with top-1). Valid values are None,\"\n", + " \" RSample, and Jitter\"\n", + " ),\n", + " )\n", + " parser.add_argument(\n", + " \"--moe-param-group\",\n", + " default=False,\n", + " action=\"store_true\",\n", + " help=\"(moe) create separate moe param groups, required when using ZeRO w. MoE\",\n", + " )\n", + "\n", + " # Include DeepSpeed configuration arguments.\n", + " parser = deepspeed.add_config_arguments(parser)\n", + "\n", + " args = parser.parse_args()\n", + "\n", + " return args\n", + "\n", + "\n", + "def create_moe_param_groups(model):\n", + " \"\"\"Create separate parameter groups for each expert.\"\"\"\n", + " parameters = {\"params\": [p for p in model.parameters()], \"name\": \"parameters\"}\n", + " return split_params_into_different_moe_groups_for_optimizer(parameters)\n", + "\n", + "\n", + "def get_ds_config(args):\n", + " \"\"\"Get the DeepSpeed configuration dictionary.\"\"\"\n", + " ds_config = {\n", + " \"train_batch_size\": 16,\n", + " \"steps_per_print\": 2000,\n", + " \"optimizer\": {\n", + " \"type\": \"Adam\",\n", + " \"params\": {\n", + " \"lr\": 0.001,\n", + " \"betas\": [0.8, 0.999],\n", + " \"eps\": 1e-8,\n", + " \"weight_decay\": 3e-7,\n", + " },\n", + " },\n", + " \"comet\": {\n", + " \"enabled\": True,\n", + " \"project\": \"comet-example-deepspeed-cifar\",\n", + " },\n", + " \"scheduler\": {\n", + " \"type\": \"WarmupLR\",\n", + " \"params\": {\n", + " \"warmup_min_lr\": 0,\n", + " \"warmup_max_lr\": 0.001,\n", + " \"warmup_num_steps\": 1000,\n", + " },\n", + " },\n", + " \"gradient_clipping\": 1.0,\n", + " \"prescale_gradients\": False,\n", + " \"bf16\": {\"enabled\": args.dtype == \"bf16\"},\n", + " \"fp16\": {\n", + " \"enabled\": args.dtype == \"fp16\",\n", + " \"fp16_master_weights_and_grads\": False,\n", + " \"loss_scale\": 0,\n", + " \"loss_scale_window\": 500,\n", + " \"hysteresis\": 2,\n", + " \"min_loss_scale\": 1,\n", + " \"initial_scale_power\": 15,\n", + " },\n", + " \"wall_clock_breakdown\": False,\n", + " \"zero_optimization\": {\n", + " \"stage\": args.stage,\n", + " \"allgather_partitions\": True,\n", + " \"reduce_scatter\": True,\n", + " \"allgather_bucket_size\": 50000000,\n", + " \"reduce_bucket_size\": 50000000,\n", + " \"overlap_comm\": True,\n", + " \"contiguous_gradients\": True,\n", + " \"cpu_offload\": False,\n", + " },\n", + " }\n", + " return ds_config\n", + "\n", + "\n", + "class Net(nn.Module):\n", + " def __init__(self, args):\n", + " super(Net, self).__init__()\n", + " self.conv1 = nn.Conv2d(3, 6, 5)\n", + " self.pool = nn.MaxPool2d(2, 2)\n", + " self.conv2 = nn.Conv2d(6, 16, 5)\n", + " self.fc1 = nn.Linear(16 * 5 * 5, 120)\n", + " self.fc2 = nn.Linear(120, 84)\n", + " self.moe = args.moe\n", + " if self.moe:\n", + " fc3 = nn.Linear(84, 84)\n", + " self.moe_layer_list = []\n", + " for n_e in args.num_experts:\n", + " # Create moe layers based on the number of experts.\n", + " self.moe_layer_list.append(\n", + " deepspeed.moe.layer.MoE(\n", + " hidden_size=84,\n", + " expert=fc3,\n", + " num_experts=n_e,\n", + " ep_size=args.ep_world_size,\n", + " use_residual=args.mlp_type == \"residual\",\n", + " k=args.top_k,\n", + " min_capacity=args.min_capacity,\n", + " noisy_gate_policy=args.noisy_gate_policy,\n", + " )\n", + " )\n", + " self.moe_layer_list = nn.ModuleList(self.moe_layer_list)\n", + " self.fc4 = nn.Linear(84, 10)\n", + " else:\n", + " self.fc3 = nn.Linear(84, 10)\n", + "\n", + " def forward(self, x):\n", + " x = self.pool(F.relu(self.conv1(x)))\n", + " x = self.pool(F.relu(self.conv2(x)))\n", + " x = x.view(-1, 16 * 5 * 5)\n", + " x = F.relu(self.fc1(x))\n", + " x = F.relu(self.fc2(x))\n", + " if self.moe:\n", + " for layer in self.moe_layer_list:\n", + " x, _, _ = layer(x)\n", + " x = self.fc4(x)\n", + " else:\n", + " x = self.fc3(x)\n", + " return x\n", + "\n", + "\n", + "def test(model_engine, testset, local_device, target_dtype, test_batch_size=4):\n", + " \"\"\"Test the network on the test data.\n", + "\n", + " Args:\n", + " model_engine (deepspeed.runtime.engine.DeepSpeedEngine): the DeepSpeed engine.\n", + " testset (torch.utils.data.Dataset): the test dataset.\n", + " local_device (str): the local device name.\n", + " target_dtype (torch.dtype): the target datatype for the test data.\n", + " test_batch_size (int): the test batch size.\n", + "\n", + " \"\"\"\n", + " # The 10 classes for CIFAR10.\n", + " classes = (\n", + " \"plane\",\n", + " \"car\",\n", + " \"bird\",\n", + " \"cat\",\n", + " \"deer\",\n", + " \"dog\",\n", + " \"frog\",\n", + " \"horse\",\n", + " \"ship\",\n", + " \"truck\",\n", + " )\n", + "\n", + " # Define the test dataloader.\n", + " testloader = torch.utils.data.DataLoader(\n", + " testset, batch_size=test_batch_size, shuffle=False, num_workers=0\n", + " )\n", + "\n", + " # For total accuracy.\n", + " correct, total = 0, 0\n", + " # For accuracy per class.\n", + " class_correct = list(0.0 for i in range(10))\n", + " class_total = list(0.0 for i in range(10))\n", + "\n", + " # Start testing.\n", + " model_engine.eval()\n", + " with torch.no_grad():\n", + " for data in testloader:\n", + " images, labels = data\n", + " if target_dtype is not None:\n", + " images = images.to(target_dtype)\n", + " outputs = model_engine(images.to(local_device))\n", + " _, predicted = torch.max(outputs.data, 1)\n", + " # Count the total accuracy.\n", + " total += labels.size(0)\n", + " correct += (predicted == labels.to(local_device)).sum().item()\n", + "\n", + " # Count the accuracy per class.\n", + " batch_correct = (predicted == labels.to(local_device)).squeeze()\n", + " for i in range(test_batch_size):\n", + " label = labels[i]\n", + " class_correct[label] += batch_correct[i].item()\n", + " class_total[label] += 1\n", + "\n", + " if model_engine.local_rank == 0:\n", + " percentage = 100 * correct / total\n", + " print(\n", + " f\"Accuracy of the network on the {total} test images: {percentage : .0f} %\"\n", + " )\n", + "\n", + " # For all classes, print the accuracy.\n", + " for i in range(10):\n", + " class_percentage = 100 * class_correct[i] / class_total[i]\n", + " print(f\"Accuracy of {classes[i] : >5s} : {class_percentage : 2.0f} %\")\n", + "\n", + "\n", + "def main(args):\n", + " # Initialize DeepSpeed distributed backend.\n", + " deepspeed.init_distributed()\n", + "\n", + " ########################################################################\n", + " # Step1. Data Preparation.\n", + " #\n", + " # The output of torchvision datasets are PILImage images of range [0, 1].\n", + " # We transform them to Tensors of normalized range [-1, 1].\n", + " #\n", + " # Note:\n", + " # If running on Windows and you get a BrokenPipeError, try setting\n", + " # the num_worker of torch.utils.data.DataLoader() to 0.\n", + " ########################################################################\n", + " transform = transforms.Compose(\n", + " [transforms.ToTensor(), transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))]\n", + " )\n", + "\n", + " if torch.distributed.get_rank() != 0:\n", + " # Might be downloading cifar data, let rank 0 download first.\n", + " torch.distributed.barrier()\n", + "\n", + " # Load or download cifar data.\n", + " trainset = torchvision.datasets.CIFAR10(\n", + " root=\"./data\", train=True, download=True, transform=transform\n", + " )\n", + " testset = torchvision.datasets.CIFAR10(\n", + " root=\"./data\", train=False, download=True, transform=transform\n", + " )\n", + "\n", + " if torch.distributed.get_rank() == 0:\n", + " # Cifar data is downloaded, indicate other ranks can proceed.\n", + " torch.distributed.barrier()\n", + "\n", + " ########################################################################\n", + " # Step 2. Define the network with DeepSpeed.\n", + " #\n", + " # First, we define a Convolution Neural Network.\n", + " # Then, we define the DeepSpeed configuration dictionary and use it to\n", + " # initialize the DeepSpeed engine.\n", + " ########################################################################\n", + " net = Net(args)\n", + "\n", + " # Get list of parameters that require gradients.\n", + " parameters = filter(lambda p: p.requires_grad, net.parameters())\n", + "\n", + " # If using MoE, create separate param groups for each expert.\n", + " if args.moe_param_group:\n", + " parameters = create_moe_param_groups(net)\n", + "\n", + " # Initialize DeepSpeed to use the following features.\n", + " # 1) Distributed model.\n", + " # 2) Distributed data loader.\n", + " # 3) DeepSpeed optimizer.\n", + " ds_config = get_ds_config(args)\n", + " model_engine, optimizer, trainloader, __ = deepspeed.initialize(\n", + " args=args,\n", + " model=net,\n", + " model_parameters=parameters,\n", + " training_data=trainset,\n", + " config=ds_config,\n", + " )\n", + "\n", + " # Get the local device name (str) and local rank (int).\n", + " local_device = get_accelerator().device_name(model_engine.local_rank)\n", + " local_rank = model_engine.local_rank\n", + "\n", + " # For float32, target_dtype will be None so no datatype conversion needed.\n", + " target_dtype = None\n", + " if model_engine.bfloat16_enabled():\n", + " target_dtype = torch.bfloat16\n", + " elif model_engine.fp16_enabled():\n", + " target_dtype = torch.half\n", + "\n", + " # Define the Classification Cross-Entropy loss function.\n", + " criterion = nn.CrossEntropyLoss()\n", + "\n", + " ########################################################################\n", + " # Step 3. Train the network.\n", + " #\n", + " # This is when things start to get interesting.\n", + " # We simply have to loop over our data iterator, and feed the inputs to the\n", + " # network and optimize. (DeepSpeed handles the distributed details for us!)\n", + " ########################################################################\n", + "\n", + " for epoch in range(args.epochs): # loop over the dataset multiple times\n", + " running_loss = 0.0\n", + " for i, data in enumerate(trainloader):\n", + " # Get the inputs. ``data`` is a list of [inputs, labels].\n", + " inputs, labels = data[0].to(local_device), data[1].to(local_device)\n", + "\n", + " # Try to convert to target_dtype if needed.\n", + " if target_dtype is not None:\n", + " inputs = inputs.to(target_dtype)\n", + "\n", + " outputs = model_engine(inputs)\n", + " loss = criterion(outputs, labels)\n", + "\n", + " model_engine.backward(loss)\n", + " model_engine.step()\n", + "\n", + " # Print statistics\n", + " running_loss += loss.item()\n", + " if local_rank == 0 and i % args.log_interval == (\n", + " args.log_interval - 1\n", + " ): # Print every log_interval mini-batches.\n", + " loss_value = running_loss / args.log_interval\n", + " print(f\"[{epoch + 1 : d}, {i + 1 : 5d}] loss: {loss_value : .3f}\")\n", + " running_loss = 0.0\n", + " print(\"Finished Training\")\n", + "\n", + " ########################################################################\n", + " # Step 4. Test the network on the test data.\n", + " ########################################################################\n", + " test(model_engine, testset, local_device, target_dtype)\n", + "\n", + "\n", + "if __name__ == \"__main__\":\n", + " args = add_argument()\n", + " main(args)\n" + ], + "metadata": { + "id": "0M5BqaIcTAls" + }, + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "markdown", + "source": [ + "# Launch the training script with the Command-Line helper" + ], + "metadata": { + "id": "dbiZdrkPndj1" + } + }, + { + "cell_type": "code", + "source": [ + "!deepspeed cifar10_deepspeed.py" + ], + "metadata": { + "id": "NfMAPmmbTLsW" + }, + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "code", + "source": [], + "metadata": { + "id": "oXyaLHmSUbk9" + }, + "execution_count": null, + "outputs": [] + } + ] +} \ No newline at end of file