diff --git a/ci/scripts/test_imports.sh b/ci/scripts/test_imports.sh index 16a4f652..5f310b69 100644 --- a/ci/scripts/test_imports.sh +++ b/ci/scripts/test_imports.sh @@ -19,3 +19,4 @@ test_import "aws" "import dask_cloudprovider.aws" test_import "azure" "import dask_cloudprovider.azure" test_import "digitalocean" "import dask_cloudprovider.digitalocean" test_import "gcp" "import dask_cloudprovider.gcp" +test_import "openstack" "import dask_cloudprovider.openstack" \ No newline at end of file diff --git a/dask_cloudprovider/cloudprovider.yaml b/dask_cloudprovider/cloudprovider.yaml index aa99b102..932bea1f 100755 --- a/dask_cloudprovider/cloudprovider.yaml +++ b/dask_cloudprovider/cloudprovider.yaml @@ -118,3 +118,18 @@ cloudprovider: image: "ubuntu-20.04" # Operating System image to use docker_image: "daskdev/dask:latest" # docker image to use bootstrap: true # It is assumed that the OS image does not have Docker and needs bootstrapping. Set this to false if using a custom image with Docker already installed. + + openstack: + region: "RegionOne" # The name of the region where resources will be allocated in OpenStack. List available regions using: `openstack region list`. + size: null # Openstack flavors define the compute, memory, and storage capacity of computing instances. List available flavors using: `openstack flavor list` + auth_url: null # The authentication URL for the OpenStack Identity service (Keystone). Example: https://cloud.example.com:5000 + application_credential_id: null # The application credential id created in OpenStack. Create application credentials using: openstack application credential create + application_credential_secret: null # The secret associated with the application credential ID for authentication. + auth_type: "v3applicationcredential" # The type of authentication used, typically "v3applicationcredential" for using OpenStack application credentials. + network_id: null # The unique identifier for the internal/private network in OpenStack where the cluster VMs will be connected. List available networks using: `openstack network list` + image: null # The OS image name or id to use for the VM. List available images using: `openstack image list` + keypair_name: null # The name of the SSH keypair used for instance access. Ensure you have created a keypair or use an existing one. List available keypairs using: `openstack keypair list` + security_group: null # The security group name that defines firewall rules for instances. List available security groups using: `openstack security group list` + external_network_id: null # The ID of the external network used for assigning floating IPs. List available external networks using: `openstack network list --external` + create_floating_ip: false # Specifies whether to assign a floating IP to each instance, enabling external access. Set to `True` if external connectivity is needed. + docker_image: "daskdev/dask:latest" # docker image to use \ No newline at end of file diff --git a/dask_cloudprovider/openstack/__init__.py b/dask_cloudprovider/openstack/__init__.py new file mode 100644 index 00000000..91ca9fe0 --- /dev/null +++ b/dask_cloudprovider/openstack/__init__.py @@ -0,0 +1 @@ +from .instances import OpenStackCluster diff --git a/dask_cloudprovider/openstack/instances.py b/dask_cloudprovider/openstack/instances.py new file mode 100644 index 00000000..903a4a16 --- /dev/null +++ b/dask_cloudprovider/openstack/instances.py @@ -0,0 +1,372 @@ +import asyncio +import dask + +from dask_cloudprovider.generic.vmcluster import ( + VMCluster, + VMInterface, + SchedulerMixin, + WorkerMixin, +) + +from distributed.core import Status + +try: + from openstack import connection +except ImportError as e: + msg = ( + "Dask Cloud Provider OpenStack requirements are not installed.\n\n" + "Please pip install as follows:\n\n" + ' pip install "openstacksdk" ' + ) + raise ImportError(msg) from e + + +class OpenStackInstance(VMInterface): + def __init__( + self, + cluster, + config, + region: str = None, + size: str = None, + image: str = None, + docker_image: str = None, + env_vars: str = None, + extra_bootstrap: str = None, + **kwargs, + ): + super().__init__(**kwargs) + self.instance = None + self.cluster = cluster + self.config = config + self.region = region + self.size = size + self.image = image + self.env_vars = env_vars + self.bootstrap = True + self.docker_image = docker_image + self.extra_bootstrap = extra_bootstrap + + async def create_vm(self): + conn = connection.Connection( + region_name=self.region, + auth_url=self.config["auth_url"], + application_credential_id=self.config["application_credential_id"], + application_credential_secret=self.config["application_credential_secret"], + compute_api_version="2", + identity_interface="public", + auth_type="v3applicationcredential", + ) + + self.instance = conn.create_server( + name=self.name, + image=self.image, + flavor=self.size, # Changed 'flavor_id' to 'flavor' + key_name=self.config["keypair_name"], # Add the keypair name here + nics=[ + {"net-id": self.config["network_id"]} + ], # Changed from 'networks' to 'nics' + userdata=self.cluster.render_process_cloud_init(self), + security_groups=[self.config["security_group"]], + ) + + # Wait for the instance to be up and running + while self.instance.status.lower() != "active": + await asyncio.sleep(0.1) + self.instance = conn.compute.get_server(self.instance.id) + + # Retrieve the internal IP address + self.internal_ip = await self.get_internal_ip(conn) + + # Check if a floating IP should be created and assigned + if self.config.get("create_floating_ip", False): + self.external_ip = await self.create_and_assign_floating_ip(conn) + else: + self.external_ip = await self.get_external_ip(conn) + + self.cluster._log( + f"{self.name}\n\tInternal IP: {self.internal_ip}\n\tExternal IP: " + f"{self.external_ip if self.external_ip else 'None'}" + ) + return self.internal_ip, self.external_ip + + async def get_internal_ip(self, conn): + """Fetch the internal IP address from the OpenStack instance.""" + instance = conn.compute.get_server(self.instance.id) + for network in instance.addresses.values(): + for addr in network: + if addr["OS-EXT-IPS:type"] == "fixed": + return addr["addr"] + return None + + async def get_external_ip(self, conn): + """Fetch the external IP address from the OpenStack instance, if it exists.""" + instance = conn.compute.get_server(self.instance.id) + for network in instance.addresses.values(): + for addr in network: + if addr["OS-EXT-IPS:type"] == "floating": + return addr["addr"] + return None + + async def create_and_assign_floating_ip(self, conn): + """Create and assign a floating IP to the instance.""" + try: + # Create a floating IP + floating_ip = await self.cluster.call_async( + conn.network.create_ip, + floating_network_id=self.config["external_network_id"], + ) + + # Assign the floating IP to the server + await self.cluster.call_async( + conn.compute.add_floating_ip_to_server, + server=self.instance.id, + address=floating_ip.floating_ip_address, + ) + + return floating_ip.floating_ip_address + except Exception as e: + self.cluster._log(f"Failed to create or assign floating IP: {str(e)}") + return None + + async def destroy_vm(self): + conn = connection.Connection( + region_name=self.region, + auth_url=self.config["auth_url"], + application_credential_id=self.config["application_credential_id"], + application_credential_secret=self.config["application_credential_secret"], + compute_api_version="2", + identity_interface="public", + auth_type="v3applicationcredential", + ) + + # Handle floating IP disassociation and deletion if applicable + if self.config.get( + "create_floating_ip", False + ): # Checks if floating IPs were configured to be created + try: + # Retrieve all floating IPs associated with the instance + floating_ips = conn.network.ips(port_id=self.instance.id) + for ip in floating_ips: + # Disassociate and delete the floating IP + conn.network.update_ip(ip, port_id=None) + conn.network.delete_ip(ip.id) + self.cluster._log(f"Deleted floating IP {ip.floating_ip_address}") + except Exception as e: + self.cluster._log( + f"Failed to clean up floating IPs for instance {self.name}: {str(e)}" + ) + return # Exit if floating IP cleanup fails + + # Then, attempt to delete the instance + try: + instance = conn.compute.get_server(self.instance.id) + if instance: + await self.cluster.call_async(conn.compute.delete_server, instance.id) + self.cluster._log(f"Terminated instance {self.name}") + else: + self.cluster._log(f"Instance {self.name} not found or already deleted.") + except Exception as e: + self.cluster._log(f"Failed to terminate instance {self.name}: {str(e)}") + + async def start_vm(self): + # Code to start the instance + pass # Placeholder to ensure correct indentation + + async def stop_vm(self): + # Code to stop the instance + pass # Placeholder to ensure correct indentation + + +class OpenStackScheduler(SchedulerMixin, OpenStackInstance): + """Scheduler running on an OpenStack Instance.""" + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + async def start(self): + await self.start_scheduler() + self.status = Status.running + + async def start_scheduler(self): + self.cluster._log( + f"Launching cluster with the following configuration: " + f"\n OS Image: {self.image} " + f"\n Flavor: {self.size} " + f"\n Docker Image: {self.docker_image} " + f"\n Security Group: {self.config['security_group']} " + ) + self.cluster._log("Creating scheduler instance") + self.internal_ip, self.external_ip = await self.create_vm() + + # Choose the IP based on the access type configuration + if self.config.get("create_floating_ip", True): + # If public access is required and a floating IP is created + self.address = f"{self.cluster.protocol}://{self.external_ip}:{self.port}" + else: + # Use internal IP if no external access is configured + self.address = f"{self.cluster.protocol}://{self.internal_ip}:{self.port}" + + await self.wait_for_scheduler() + + # Storing IPs for cluster-wide use, if necessary + self.cluster.scheduler_internal_ip = self.internal_ip + self.cluster.scheduler_external_ip = self.external_ip + self.cluster.scheduler_port = self.port + + +class OpenStackWorker(WorkerMixin, OpenStackInstance): + """Worker running on a OpenStack Instance.""" + + +class OpenStackCluster(VMCluster): + """Cluster running on Openstack VM Instances + + This cluster manager constructs a Dask cluster running on generic Openstack cloud + + When configuring your cluster you may find it useful to install the 'python-openstackclient' + client for querying the Openstack APIs for available options. + + https://github.com/openstack/python-openstackclient + + Parameters + ---------- + + region: str + The name of the region where resources will be allocated in OpenStack. + Typically set to 'default' unless specified in your cloud configuration. + + List available regions using: `openstack region list`. + auth_url: str + The authentication URL for the OpenStack Identity service (Keystone). + Example: https://cloud.example.com:5000 + application_credential_id: str + The application credential id created in OpenStack. + + Create application credentials using: openstack application credential create + application_credential_secret: str + The secret associated with the application credential ID for authentication. + auth_type: str + The type of authentication used, typically "v3applicationcredential" for + using OpenStack application credentials. + network_id: str + The unique identifier for the internal/private network in OpenStack where the cluster + VMs will be connected. + + List available networks using: `openstack network list` + image: str + The OS image name or id to use for the VM. Dask Cloudprovider will boostrap Ubuntu + based images automatically. Other images require Docker and for GPUs + the NVIDIA Drivers and NVIDIA Docker. + + List available images using: `openstack image list` + keypair_name: str + The name of the SSH keypair used for instance access. Ensure you have created a keypair + or use an existing one. + + List available keypairs using: `openstack keypair list` + security_group: str + The security group name that defines firewall rules for instances. + + The default is `default`. Please ensure the follwing accesses are configured: + - egress 0.0.0.0/0 on all ports for downloading docker images and general data access + - ingress /8 on all ports for internal communication of workers + - ingress 0.0.0.0/0 on 8786-8787 for external accessibility of the dashboard/scheduler + - (optional) ingress 0.0.0.0./0 on 22 for ssh access + + List available security groups using: `openstack security group list` + create_floating_ip: bool + Specifies whether to assign a floating IP to each instance, enabling external + access. Set to `True` if external connectivity is needed. + external_network_id: str + The ID of the external network used for assigning floating IPs. + + List available external networks using: `openstack network list --external` + n_workers: int (optional) + Number of workers to initialise the cluster with. Defaults to ``0``. + worker_module: str + The Python module to run for the worker. Defaults to ``distributed.cli.dask_worker`` + worker_options: dict + Params to be passed to the worker class. + See :class:`distributed.worker.Worker` for default worker class. + If you set ``worker_module`` then refer to the docstring for the custom worker class. + scheduler_options: dict + Params to be passed to the scheduler class. + See :class:`distributed.scheduler.Scheduler`. + env_vars: dict + Environment variables to be passed to the worker. + extra_bootstrap: list[str] (optional) + Extra commands to be run during the bootstrap phase. + docker_image: string (optional) + The Docker image to run on all instances. + + This image must have a valid Python environment and have ``dask`` installed in order for the + ``dask-scheduler`` and ``dask-worker`` commands to be available. It is recommended the Python + environment matches your local environment where ``OpenStackCluster`` is being created from. + + For GPU instance types the Docker image much have NVIDIA drivers and ``dask-cuda`` installed. + + By default the ``daskdev/dask:latest`` image will be used. + + Example + -------- + + >>> from dask_cloudprovider.openstack import OpenStackCluster + >>> cluster = OpenStackCluster(n_workers=1) + Launching cluster with the following configuration: + OS Image: ubuntu-22-04 + Flavor: 4vcpu-8gbram-50gbdisk + Docker Image: daskdev/dask:latest + Security Group: all-open + Creating scheduler instance + dask-9b85a5f8-scheduler + Internal IP: 10.0.30.148 + External IP: None + Waiting for scheduler to run at 10.0.30.148:8786 + Scheduler is running + Creating worker instance + + >>> from dask.distributed import Client + >>> client = Client(cluster) + + >>> import dask.array as da + >>> arr = da.random.random((1000, 1000), chunks=(100, 100)) + >>> arr.mean().compute() + + >>> client.close() + >>> cluster.close() + Terminated instance dask-07280176-worker-319005a2 + Terminated instance dask-07280176-scheduler + """ + + def __init__( + self, + region: str = None, + size: str = None, + image: str = None, + docker_image: str = None, + debug: bool = False, + bootstrap: bool = True, + **kwargs, + ): + self.config = dask.config.get("cloudprovider.openstack", {}) + self.scheduler_class = OpenStackScheduler + self.worker_class = OpenStackWorker + self.debug = debug + self.bootstrap = ( + bootstrap if bootstrap is not None else self.config.get("bootstrap") + ) + self.options = { + "cluster": self, + "config": self.config, + "region": region if region is not None else self.config.get("region"), + "size": size if size is not None else self.config.get("size"), + "image": image if image is not None else self.config.get("image"), + "docker_image": docker_image or self.config.get("docker_image"), + } + self.scheduler_options = {**self.options} + self.worker_options = {**self.options} + + if "extra_bootstrap" not in kwargs: + kwargs["extra_bootstrap"] = self.config.get("extra_bootstrap") + + super().__init__(debug=debug, **kwargs) diff --git a/dask_cloudprovider/openstack/tests/test_instances.py b/dask_cloudprovider/openstack/tests/test_instances.py new file mode 100644 index 00000000..eae815bf --- /dev/null +++ b/dask_cloudprovider/openstack/tests/test_instances.py @@ -0,0 +1,72 @@ +import pytest +import dask +from dask_cloudprovider.openstack.instances import OpenStackCluster +from dask.distributed import Client +from distributed.core import Status + +# Optional: Skips tests if OpenStack credentials are not set + + +async def skip_without_credentials(config): + if ( + config.get("auth_url") is None + or config.get("application_credential_secret") is None + ): + pytest.skip( + """ + You must configure OpenStack credentials to run this test. + + Set this in your config file or environment variables: + + # cloudprovider.yaml + cloudprovider: + openstack: + auth_url: "your_auth_url" + application_credential_id: "your_app_cred_id" + application_credential_secret: "your_app_cred_secret" + """ + ) + + +@pytest.fixture +async def config(): + return dask.config.get("cloudprovider.openstack", {}) + + +@pytest.fixture +@pytest.mark.external +async def cluster(config): + await skip_without_credentials(config) + + async with OpenStackCluster(asynchronous=True) as cluster: + yield cluster + + +@pytest.mark.asyncio +async def test_init(): + cluster = OpenStackCluster(asynchronous=True) + assert cluster.status == Status.created + + +@pytest.mark.asyncio +@pytest.mark.timeout(600) +async def test_create_cluster(cluster): + assert cluster.status == Status.running + cluster.scale(1) + await cluster + assert len(cluster.workers) == 1 + + async with Client(cluster, asynchronous=True) as client: + + def inc(x): + return x + 1 + + assert await client.submit(inc, 10).result() == 11 + + +@pytest.mark.asyncio +async def test_get_cloud_init(): + cloud_init = OpenStackCluster.get_cloud_init( + docker_args="--privileged", + ) + assert " --privileged " in cloud_init diff --git a/dask_cloudprovider/tests/test_imports.py b/dask_cloudprovider/tests/test_imports.py index e9ab7b4d..3dc42e66 100644 --- a/dask_cloudprovider/tests/test_imports.py +++ b/dask_cloudprovider/tests/test_imports.py @@ -24,3 +24,5 @@ def test_import_exceptions(): from dask_cloudprovider import GCPCluster # noqa with pytest.raises(ImportError): from dask_cloudprovider import DropletCluster # noqa + with pytest.raises(ImportError): + from dask_cloudprovider import OpenStackCluster # noqa diff --git a/doc/source/index.rst b/doc/source/index.rst index c2e57968..74d2a661 100644 --- a/doc/source/index.rst +++ b/doc/source/index.rst @@ -70,6 +70,7 @@ this code. gcp.rst azure.rst hetzner.rst + openstack.rst .. toctree:: :maxdepth: 2 diff --git a/doc/source/installation.rst b/doc/source/installation.rst index b953cbd6..f4994435 100644 --- a/doc/source/installation.rst +++ b/doc/source/installation.rst @@ -16,7 +16,8 @@ You can also restrict your install to just a specific cloud provider by giving t $ pip install dask-cloudprovider[azure]  # or $ pip install dask-cloudprovider[azureml]  # or $ pip install dask-cloudprovider[digitalocean]  # or - $ pip install dask-cloudprovider[gcp] + $ pip install dask-cloudprovider[gcp]  # or + $ pip install dask-cloudprovider[openstack] Conda ----- diff --git a/doc/source/openstack.rst b/doc/source/openstack.rst new file mode 100644 index 00000000..cb0a743d --- /dev/null +++ b/doc/source/openstack.rst @@ -0,0 +1,75 @@ +Openstack +============ + +.. currentmodule:: dask_cloudprovider.openstack + +.. autosummary:: + OpenStackCluster + +Overview +-------- + +Authentication +^^^^^^^^^^^^^^ + +To authenticate with the OpenStack Identity service (Keystone) + +1) Get your Authentication URL (auth_url) for OpenStack Identity service (Keystone) and put it in your Dask configuration at ``cloudprovider.openstack.auth_url``. + +2) Get your `region `_ and put it in your Dask configuration at ``cloudprovider.openstack.region``. + .. code-block:: console + + $ openstack region list + +-----------+---------------+-------------+ + | Region | Parent Region | Description | + +-----------+---------------+-------------+ + | RegionOne | None | | + +-----------+---------------+-------------+ + +3) Generate an `application credential `_. + + .. code-block:: console + + $ openstack application credential create dask --unrestricted + +--------------+----------------------------------------------------------------------------------------+ + | Field | Value | + +--------------+----------------------------------------------------------------------------------------+ + | description | None | + | expires_at | None | + | id | 0a0372dbedfb4e82ab66449c3316ef1e | + | name | dask | + | project_id | e99b6f4b9bf84a9da27e20c9cbfe887a | + | roles | Member anotherrole | + | secret | ArOy6DYcLeLTRlTmfvF1TH1QmRzYbmD91cbVPOHL3ckyRaLXlaq5pTGJqvCvqg6leEvTI1SQeX3QK-3iwmdPxg | + | unrestricted | True | + +--------------+----------------------------------------------------------------------------------------+ + + and put ``application_credential_id`` and ``application_credential_secret`` in your Dask configuration at ``cloudprovider.openstack.application_credential_id`` + and ``cloudprovider.openstack.application_credential_secret``. + +All of this variables can be gathered from either `OpenStack RC file `_ +or `clouds.yaml file `_. + +Example Config File +^^^^^^^^^^^^^^ +.. code-block:: yaml + + # ~/.config/dask/cloudprovider.yaml + + cloudprovider: + openstack: + region: "RegionOne" + auth_url: "https://cloud.home.karatosun.xyz:5000" + application_credential_id: "0a0372dbedfb4e82ab66449c3316ef1e" + application_credential_secret: "ArOy6DYcLeLTRlTmfvF1TH1QmRzYbmD91cbVPOHL3ckyRaLXlaq5pTGJqvCvqg6leEvTI1SQeX3QK-3iwmdPxg" + auth_type: "v3applicationcredential" + +You can also export them as environment variables. + +.. code-block:: console + + $ export DASK_CLOUDPROVIDER__APPLICATION_CREDENTIAL_ID="0a0372dbedfb4e82ab66449c3316ef1e" + + +.. autoclass:: OpenStackCluster + :members: diff --git a/examples/OpenstackCluster-scorepredict.ipynb b/examples/OpenstackCluster-scorepredict.ipynb new file mode 100644 index 00000000..b8d83fb3 --- /dev/null +++ b/examples/OpenstackCluster-scorepredict.ipynb @@ -0,0 +1,1270 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "bc03588b-e539-4d6f-8c5c-13b9c5a499e1", + "metadata": {}, + "source": [ + "# Score and Predict Large Datasets with Dask Openstack Cloud Provider" + ] + }, + { + "cell_type": "markdown", + "id": "58a2fdac-0f03-48df-bc6b-286515761ba6", + "metadata": {}, + "source": [ + "This example combines the [Score and Predict Large Datasets example at dask-examples git repository](https://github.com/dask/dask-examples/blob/main/machine-learning/parallel-prediction.ipynb) with Dask Openstack Cloud Provider.\n", + "\n", + "Details: https://examples.dask.org/machine-learning/parallel-prediction.html" + ] + }, + { + "cell_type": "markdown", + "id": "d7269385-d165-4d33-884d-0cf9b9f1590d", + "metadata": {}, + "source": [ + "## Prerequisites" + ] + }, + { + "cell_type": "markdown", + "id": "4db66a79-0648-40d3-8c7d-f1b8d69f6c14", + "metadata": {}, + "source": [ + "```bash\n", + "pip install dask-cloudprovider\n", + "pip install numpy scikit-learn dask_ml\n", + "```" + ] + }, + { + "cell_type": "code", + "execution_count": 46, + "id": "b1973e66-8d3b-4f30-bcac-9e506a1d0ca3", + "metadata": {}, + "outputs": [], + "source": [ + "# > (optional) Let's disable the deprecation warnings to improve readability.\n", + "import warnings\n", + "warnings.filterwarnings('ignore', category=UserWarning)" + ] + }, + { + "cell_type": "markdown", + "id": "4000888b-e720-4ea2-955c-9bcbee7a5db3", + "metadata": {}, + "source": [ + "## Create and Connect to Dask Cluster with Openstack Cloud Provider" + ] + }, + { + "cell_type": "code", + "execution_count": 49, + "id": "5b745083-bb26-4fe9-a4f6-f7d049628a79", + "metadata": {}, + "outputs": [], + "source": [ + "import dask\n", + "import dask_cloudprovider\n", + "from instances import OpenStackCluster\n", + "from dask.distributed import Client, progress" + ] + }, + { + "cell_type": "code", + "execution_count": 50, + "id": "169db10e-b9cf-484f-9d3d-115272eec85d", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Launching cluster with the following configuration: \n", + " OS Image: ubuntu-22-04 \n", + " Flavor: 4vcpu-8gbram-50gbdisk \n", + " Docker Image: armagankaratosun/dask-ml:latest \n", + " Security Group: all-open \n", + "Creating scheduler instance\n", + "dask-d523cb04-scheduler\n", + "\tInternal IP: 10.0.30.30\n", + "\tExternal IP: None\n", + "Waiting for scheduler to run at 10.0.30.30:8786\n", + "Scheduler is running\n", + "Creating worker instance\n", + "Creating worker instance\n", + "dask-d523cb04-worker-96fd7205\n", + "\tInternal IP: 10.0.30.95\n", + "\tExternal IP: None\n", + "dask-d523cb04-worker-e0f7591e\n", + "\tInternal IP: 10.0.30.107\n", + "\tExternal IP: None\n" + ] + } + ], + "source": [ + "cluster = OpenStackCluster(n_workers=2, shutdown_on_close=True, docker_image=\"armagankaratosun/dask-ml:latest\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "abdbec11-1154-43d3-91c1-55d03d5935dc", + "metadata": {}, + "outputs": [], + "source": [ + "client = Client(cluster)" + ] + }, + { + "cell_type": "code", + "execution_count": 55, + "id": "9c413324-904c-41af-bf10-a82ecf10d2c0", + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "
\n", + "
\n", + "

Client

\n", + "

Client-6f338508-661e-11ef-936f-2a7e43ef1ca5

\n", + " \n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "\n", + "
Connection method: Cluster objectCluster type: instances.OpenStackCluster
\n", + " Dashboard: http://10.0.30.30:8787/status\n", + "
\n", + "\n", + " \n", + "\n", + " \n", + "
\n", + "

Cluster Info

\n", + "
\n", + "
\n", + "
\n", + "
\n", + "

OpenStackCluster

\n", + "

3b3f98df

\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
\n", + " Dashboard: http://10.0.30.30:8787/status\n", + " \n", + " Workers: 2\n", + "
\n", + " Total threads: 8\n", + " \n", + " Total memory: 15.32 GiB\n", + "
\n", + "\n", + "
\n", + " \n", + "

Scheduler Info

\n", + "
\n", + "\n", + "
\n", + "
\n", + "
\n", + "
\n", + "

Scheduler

\n", + "

Scheduler-c3c079d0-f8e5-4ce6-8f11-d26e8654fe07

\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
\n", + " Comm: tls://10.0.30.30:8786\n", + " \n", + " Workers: 2\n", + "
\n", + " Dashboard: http://10.0.30.30:8787/status\n", + " \n", + " Total threads: 8\n", + "
\n", + " Started: 4 minutes ago\n", + " \n", + " Total memory: 15.32 GiB\n", + "
\n", + "
\n", + "
\n", + "\n", + "
\n", + " \n", + "

Workers

\n", + "
\n", + "\n", + " \n", + "
\n", + "
\n", + "
\n", + "
\n", + " \n", + "

Worker: dask-d523cb04-worker-96fd7205

\n", + "
\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "\n", + " \n", + "\n", + " \n", + "\n", + "
\n", + " Comm: tls://10.0.30.95:40477\n", + " \n", + " Total threads: 4\n", + "
\n", + " Dashboard: http://10.0.30.95:43771/status\n", + " \n", + " Memory: 7.66 GiB\n", + "
\n", + " Nanny: tls://10.0.30.95:44951\n", + "
\n", + " Local directory: /tmp/dask-scratch-space/worker-e5rljt6_\n", + "
\n", + "
\n", + "
\n", + "
\n", + " \n", + "
\n", + "
\n", + "
\n", + "
\n", + " \n", + "

Worker: dask-d523cb04-worker-e0f7591e

\n", + "
\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "\n", + " \n", + "\n", + " \n", + "\n", + "
\n", + " Comm: tls://10.0.30.107:35117\n", + " \n", + " Total threads: 4\n", + "
\n", + " Dashboard: http://10.0.30.107:46297/status\n", + " \n", + " Memory: 7.66 GiB\n", + "
\n", + " Nanny: tls://10.0.30.107:43675\n", + "
\n", + " Local directory: /tmp/dask-scratch-space/worker-5l075jui\n", + "
\n", + "
\n", + "
\n", + "
\n", + " \n", + "\n", + "
\n", + "
\n", + "\n", + "
\n", + "
\n", + "
\n", + "
\n", + " \n", + "\n", + "
\n", + "
" + ], + "text/plain": [ + "" + ] + }, + "execution_count": 55, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "client" + ] + }, + { + "cell_type": "markdown", + "id": "bac247cc-e089-4a2d-9f6b-b5f565cf90d1", + "metadata": {}, + "source": [ + "## Score and Predict Large Datasets" + ] + }, + { + "cell_type": "code", + "execution_count": 56, + "id": "68f0aec9-36af-4818-b9b3-c9af6dcede84", + "metadata": {}, + "outputs": [], + "source": [ + "import numpy as np\n", + "import dask.array as da\n", + "from sklearn.datasets import make_classification" + ] + }, + { + "cell_type": "markdown", + "id": "0dcb00a8-6ba6-40bb-bcbc-e30e47af0ec1", + "metadata": {}, + "source": [ + "We'll generate a small random dataset with scikit-learn." + ] + }, + { + "cell_type": "code", + "execution_count": 57, + "id": "74433cd6-5997-424f-9004-723f6748ad21", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "array([[ 1.53682958, -1.39869399],\n", + " [ 1.36917601, -0.63734411],\n", + " [ 0.50231787, -0.45910529],\n", + " [ 1.83319262, -1.29808229],\n", + " [ 1.04235568, 1.12152929]])" + ] + }, + "execution_count": 57, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "X_train, y_train = make_classification(\n", + " n_features=2, n_redundant=0, n_informative=2,\n", + " random_state=1, n_clusters_per_class=1, n_samples=1000)\n", + "X_train[:5]" + ] + }, + { + "cell_type": "markdown", + "id": "54d40a88-145d-4fb0-bafd-b6d5f852f5dd", + "metadata": {}, + "source": [ + "And we'll clone that dataset many times with dask.array. X_large and y_large represent our larger than memory dataset." + ] + }, + { + "cell_type": "code", + "execution_count": 58, + "id": "92067947-c8ca-4bd6-a589-9b8c877b96ca", + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "\n", + " \n", + " \n", + " \n", + " \n", + "
\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
Array Chunk
Bytes 1.53 MiB 15.62 kiB
Shape (100000, 2) (1000, 2)
Dask graph 100 chunks in 2 graph layers
Data type float64 numpy.ndarray
\n", + "
\n", + " \n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "\n", + " \n", + " \n", + " \n", + "\n", + " \n", + " \n", + "\n", + " \n", + " 2\n", + " 100000\n", + "\n", + "
" + ], + "text/plain": [ + "dask.array" + ] + }, + "execution_count": 58, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "# Scale up: increase N, the number of times we replicate the data.\n", + "N = 100\n", + "X_large = da.concatenate([da.from_array(X_train, chunks=X_train.shape)\n", + " for _ in range(N)])\n", + "y_large = da.concatenate([da.from_array(y_train, chunks=y_train.shape)\n", + " for _ in range(N)])\n", + "X_large" + ] + }, + { + "cell_type": "markdown", + "id": "ea21bdbb-fbe5-4801-8132-b41d4d62f190", + "metadata": {}, + "source": [ + "Since our training dataset fits in memory, we can use a scikit-learn estimator as the actual estimator fit during training. But we know that we'll want to predict for a large dataset, so we'll wrap the scikit-learn estimator with ParallelPostFit." + ] + }, + { + "cell_type": "code", + "execution_count": 59, + "id": "3df99eb7-c643-4f00-a41c-d3e4f0f37a12", + "metadata": {}, + "outputs": [], + "source": [ + "from sklearn.linear_model import LogisticRegressionCV\n", + "from dask_ml.wrappers import ParallelPostFit" + ] + }, + { + "cell_type": "code", + "execution_count": 60, + "id": "aa0dfd51-ab6b-4709-bd5a-465c1d0f8350", + "metadata": {}, + "outputs": [], + "source": [ + "clf = ParallelPostFit(LogisticRegressionCV(cv=3), scoring=\"r2\")" + ] + }, + { + "cell_type": "markdown", + "id": "86d0c69c-2a06-4f6b-b59f-99301786f9d7", + "metadata": {}, + "source": [ + "See the note in the dask-ml's documentation about when and why a scoring parameter is needed: https://ml.dask.org/modules/generated/dask_ml.wrappers.ParallelPostFit.html#dask_ml.wrappers.ParallelPostFit.\n", + "\n", + "Now we'll call clf.fit. Dask-ML does nothing here, so this step can only use datasets that fit in memory." + ] + }, + { + "cell_type": "code", + "execution_count": 61, + "id": "7157844d-3d66-41d0-8360-0b4e4a700ff6", + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
ParallelPostFit(estimator=LogisticRegressionCV(cv=3), scoring='r2')
In a Jupyter environment, please rerun this cell to show the HTML representation or trust the notebook.
On GitHub, the HTML representation is unable to render, please try loading this page with nbviewer.org.
" + ], + "text/plain": [ + "ParallelPostFit(estimator=LogisticRegressionCV(cv=3), scoring='r2')" + ] + }, + "execution_count": 61, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "clf.fit(X_train, y_train)" + ] + }, + { + "cell_type": "markdown", + "id": "262f1ee3-475e-4542-b662-e33be7302f78", + "metadata": {}, + "source": [ + "Now that training is done, we'll turn to predicting for the full (larger than memory) dataset." + ] + }, + { + "cell_type": "code", + "execution_count": 62, + "id": "fd23ebc8-8355-4a01-8493-b9e1f6ceb42a", + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "\n", + " \n", + " \n", + " \n", + " \n", + "
\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
Array Chunk
Bytes 781.25 kiB 7.81 kiB
Shape (100000,) (1000,)
Dask graph 100 chunks in 3 graph layers
Data type int64 numpy.ndarray
\n", + "
\n", + " \n", + "\n", + " \n", + " \n", + " \n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "\n", + " \n", + " \n", + "\n", + " \n", + " 100000\n", + " 1\n", + "\n", + "
" + ], + "text/plain": [ + "dask.array<_predict, shape=(100000,), dtype=int64, chunksize=(1000,), chunktype=numpy.ndarray>" + ] + }, + "execution_count": 62, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "y_pred = clf.predict(X_large)\n", + "y_pred" + ] + }, + { + "cell_type": "markdown", + "id": "7e59a188-6de6-43fe-9648-54a4b5596c0d", + "metadata": {}, + "source": [ + "`y_pred` is a Dask array. Workers can write the predicted values to a shared file system, without ever having to collect the data on a single machine.\n", + "\n", + "Or we can check the models score on the entire large dataset. The computation will be done in parallel, and no single machine will have to hold all the data." + ] + }, + { + "cell_type": "markdown", + "id": "276d251a-81aa-4d4a-b155-38a1c02ae115", + "metadata": {}, + "source": [ + "## Results" + ] + }, + { + "cell_type": "code", + "execution_count": 63, + "id": "7782c30a-a237-49a0-840d-ebae6dedc6ab", + "metadata": {}, + "outputs": [], + "source": [ + "from dask.distributed import Client, performance_report\n", + "from IPython.display import IFrame" + ] + }, + { + "cell_type": "code", + "execution_count": 64, + "id": "8bbd740d-81d4-4b4f-84c7-42cefde782e5", + "metadata": {}, + "outputs": [], + "source": [ + "with performance_report(filename=\"dask-report.html\"):\n", + " # Place your Dask computations here\n", + " # For example, a delayed operation or compute()\n", + " result = clf.score(X_large, y_large) " + ] + }, + { + "cell_type": "code", + "execution_count": 65, + "id": "8f01f103-ec33-48e9-b7ce-fbac32f90e62", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "np.float64(0.596)" + ] + }, + "execution_count": 65, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "result" + ] + }, + { + "cell_type": "code", + "execution_count": 66, + "id": "932eb174-8587-478a-bb68-ccf951ed6b25", + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "\n", + " \n", + " " + ], + "text/plain": [ + "" + ] + }, + "execution_count": 66, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "# Display the performance report within the notebook\n", + "IFrame(src=\"dask-report.html\", width=\"100%\", height=\"500px\")" + ] + }, + { + "cell_type": "markdown", + "id": "251cacbb-f69d-4484-991a-e2671876a66c", + "metadata": {}, + "source": [ + "## Clean Up" + ] + }, + { + "cell_type": "markdown", + "id": "a4678b96-915c-482d-bbe5-c1b5d250dd46", + "metadata": {}, + "source": [ + "After the prediction, we can close the client and throw-away the Dask Openstack Cloud Cluster" + ] + }, + { + "cell_type": "code", + "execution_count": 67, + "id": "1a861ef5-f98c-4ea2-8883-36eaf03eec2d", + "metadata": {}, + "outputs": [], + "source": [ + "client.close()" + ] + }, + { + "cell_type": "code", + "execution_count": 68, + "id": "19781f45", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Terminated instance dask-d523cb04-worker-96fd7205\n", + "Terminated instance dask-d523cb04-worker-e0f7591e\n", + "Terminated instance dask-d523cb04-scheduler\n" + ] + } + ], + "source": [ + "cluster.close()" + ] + }, + { + "cell_type": "markdown", + "id": "fab56ddf-700c-46e5-80f2-055a8b01d421", + "metadata": {}, + "source": [ + "## Credits" + ] + }, + { + "cell_type": "markdown", + "id": "3d409e66-838a-4fe6-948c-7d2fc9935fd9", + "metadata": {}, + "source": [ + "All credit for [Score and Predict Large Datasets](https://github.com/dask/dask-examples/blob/main/machine-learning/parallel-prediction.ipynb) belongs to their respective developers\n", + "\n", + "* https://examples.dask.org/machine-learning/parallel-prediction.html\n", + "* https://github.com/dask/dask-examples/blob/main/machine-learning/parallel-prediction.ipynb" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.10.14" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/setup.py b/setup.py index 64ca18a5..5d3f3b78 100644 --- a/setup.py +++ b/setup.py @@ -15,6 +15,7 @@ "digitalocean": ["python-digitalocean>=1.15.0"], "gcp": ["google-api-python-client>=1.12.5", "google-auth>=1.23.0"], "hetzner": ["hcloud>=1.10.0"], + "openstack": ["openstacksdk>=3.3.0"], } extras_require["all"] = set(pkg for pkgs in extras_require.values() for pkg in pkgs)