From 8a8319f2c6c0fdef087be98ea012ebefd0751a98 Mon Sep 17 00:00:00 2001 From: Alputer Date: Wed, 11 Sep 2024 15:47:21 +0200 Subject: [PATCH] feat(dask): parse and use cpu and ram requirements (#600) --- reana_workflow_controller/config.py | 21 ++++++++++++ reana_workflow_controller/dask.py | 32 +++++++++++++++++++ .../templates/dask_autoscaler.yaml | 3 +- .../workflow_run_manager.py | 31 +++++++++++++++--- tests/conftest.py | 2 ++ tests/test_dask.py | 29 +++++++++++++++++ 6 files changed, 111 insertions(+), 7 deletions(-) diff --git a/reana_workflow_controller/config.py b/reana_workflow_controller/config.py index 37a0cc9b..30fd8761 100644 --- a/reana_workflow_controller/config.py +++ b/reana_workflow_controller/config.py @@ -260,6 +260,27 @@ def _parse_interactive_sessions_environments(env_var): DASK_ENABLED = strtobool(os.getenv("DASK_ENABLED", "true")) """Whether dask is enabled in the cluster or not""" +REANA_DASK_CLUSTER_DEFAULT_CORES_LIMIT = os.getenv( + "REANA_DASK_CLUSTER_DEFAULT_CORES_LIMIT" +) +"""Maximum number of cores allowed for the dask clusters used by workflows.""" + +REANA_DASK_CLUSTER_DEFAULT_MEMORY_LIMIT = os.getenv( + "REANA_DASK_CLUSTER_DEFAULT_MEMORY_LIMIT" +) +"""Default memory limit for the dask clusters used by workflows.""" + +REANA_DASK_CLUSTER_DEFAULT_SINGLE_WORKER_CORES = os.getenv( + "REANA_DASK_CLUSTER_DEFAULT_SINGLE_WORKER_CORES " +) +"""Number of cores for one dask worker by default.""" + +REANA_DASK_CLUSTER_DEFAULT_SINGLE_WORKER_MEMORY = os.getenv( + "REANA_DASK_CLUSTER_DEFAULT_SINGLE_WORKER_MEMORY" +) +"""Memory for one dask worker by default.""" + + VOMSPROXY_CONTAINER_IMAGE = os.getenv( "VOMSPROXY_CONTAINER_IMAGE", "docker.io/reanahub/reana-auth-vomsproxy:1.2.0" ) diff --git a/reana_workflow_controller/dask.py b/reana_workflow_controller/dask.py index 8656811a..7cd29f71 100644 --- a/reana_workflow_controller/dask.py +++ b/reana_workflow_controller/dask.py @@ -27,6 +27,7 @@ get_workspace_volume, get_reana_shared_volume, ) +from reana_commons.job_utils import kubernetes_memory_to_bytes from reana_workflow_controller.k8s import create_dask_dashboard_ingress @@ -40,6 +41,10 @@ def __init__( workflow_spec, workflow_workspace, user_id, + cores, + memory, + single_worker_cores, + single_worker_memory, ): """Instantiate dask resource manager. @@ -53,6 +58,10 @@ def __init__( :type user_id: str """ self.cluster_name = cluster_name + self.cores = cores + self.memory = memory + self.single_worker_cores = single_worker_cores + self.single_worker_memory = single_worker_memory self.autoscaler_name = f"dask-autoscaler-{cluster_name}" self.workflow_spec = workflow_spec self.workflow_workspace = workflow_workspace @@ -132,6 +141,17 @@ def _prepare_cluster(self): 0 ] = f'cd {self.workflow_workspace} && {self.cluster_body["spec"]["worker"]["spec"]["containers"][0]["args"][0]}' + # Set resource limits for workers + self.cluster_body["spec"]["worker"]["spec"]["containers"][0]["resources"] = { + "limits": { + "memory": f"{self.single_worker_memory}", + "cpu": str(self.single_worker_cores), + } + } + + # Set max limit on autoscaler + self.autoscaler_body["spec"]["maximum"] = self.calculate_max_allowed_workers() + # Add DASK SCHEDULER URI env variable self.cluster_body["spec"]["worker"]["spec"]["containers"][0]["env"].append( {"name": "DASK_SCHEDULER_URI", "value": self.dask_scheduler_uri}, @@ -478,6 +498,18 @@ def _create_dask_autoscaler(self): ) raise + def calculate_max_allowed_workers(self): + """Calculate the max number of workers for dask autoscaler.""" + total_memory_in_bytes = kubernetes_memory_to_bytes(self.memory) + single_worker_memory_in_bytes = kubernetes_memory_to_bytes( + self.single_worker_memory + ) + + max_workers_by_cores = self.cores // self.single_worker_cores + max_workers_by_memory = total_memory_in_bytes // single_worker_memory_in_bytes + + return int(min(max_workers_by_cores, max_workers_by_memory)) + def requires_dask(workflow): """Check whether Dask is necessary to run the workflow.""" diff --git a/reana_workflow_controller/templates/dask_autoscaler.yaml b/reana_workflow_controller/templates/dask_autoscaler.yaml index ded5131d..86792c14 100644 --- a/reana_workflow_controller/templates/dask_autoscaler.yaml +++ b/reana_workflow_controller/templates/dask_autoscaler.yaml @@ -1,5 +1,4 @@ apiVersion: kubernetes.dask.org/v1 kind: DaskAutoscaler spec: - minimum: 0 - maximum: 8 \ No newline at end of file + minimum: 0 \ No newline at end of file diff --git a/reana_workflow_controller/workflow_run_manager.py b/reana_workflow_controller/workflow_run_manager.py index 846de0b0..f9ea3273 100644 --- a/reana_workflow_controller/workflow_run_manager.py +++ b/reana_workflow_controller/workflow_run_manager.py @@ -85,7 +85,10 @@ WORKFLOW_ENGINE_SERIAL_ENV_VARS, WORKFLOW_ENGINE_SNAKEMAKE_ENV_VARS, WORKFLOW_ENGINE_YADAGE_ENV_VARS, - DASK_ENABLED, + REANA_DASK_CLUSTER_DEFAULT_MEMORY_LIMIT, + REANA_DASK_CLUSTER_DEFAULT_CORES_LIMIT, + REANA_DASK_CLUSTER_DEFAULT_SINGLE_WORKER_MEMORY, + REANA_DASK_CLUSTER_DEFAULT_SINGLE_WORKER_CORES, ) @@ -374,16 +377,34 @@ def start_batch_workflow_run( try: # Create the dask cluster and required resources if requires_dask(self.workflow): - if not DASK_ENABLED: - raise RuntimeError( - "Dask workflows are not supported in this cluster" - ) DaskResourceManager( cluster_name=f"reana-run-dask-{self.workflow.id_}", workflow_spec=self.workflow.reana_specification["workflow"], workflow_workspace=self.workflow.workspace_path, user_id=self.workflow.owner_id, + cores=self.workflow.reana_specification["workflow"] + .get("resources", {}) + .get("dask", {}) + .get("cores", REANA_DASK_CLUSTER_DEFAULT_CORES_LIMIT), + memory=self.workflow.reana_specification["workflow"] + .get("resources", {}) + .get("dask", {}) + .get("memory", REANA_DASK_CLUSTER_DEFAULT_MEMORY_LIMIT), + single_worker_cores=self.workflow.reana_specification["workflow"] + .get("resources", {}) + .get("dask", {}) + .get( + "single_worker_cores", + REANA_DASK_CLUSTER_DEFAULT_SINGLE_WORKER_CORES, + ), + single_worker_memory=self.workflow.reana_specification["workflow"] + .get("resources", {}) + .get("dask", {}) + .get( + "single_worker_memory", + REANA_DASK_CLUSTER_DEFAULT_SINGLE_WORKER_MEMORY, + ), ).create_dask_resources() current_k8s_batchv1_api_client.create_namespaced_job( diff --git a/tests/conftest.py b/tests/conftest.py index 8fc4ab7b..e0b5d010 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -207,6 +207,8 @@ def dask_resource_manager(sample_serial_workflow_in_db_with_dask, mock_user_secr """Fixture to create a DaskResourceManager instance.""" manager = DaskResourceManager( cluster_name="test-cluster", + cores=3, + memory="2G", workflow_spec=sample_serial_workflow_in_db_with_dask.reana_specification[ "workflow" ], diff --git a/tests/test_dask.py b/tests/test_dask.py index 1dd94007..f2296772 100644 --- a/tests/test_dask.py +++ b/tests/test_dask.py @@ -30,6 +30,35 @@ def test_requires_dask(workflow_fixture, expected_result, request): ), f"Expected requires_dask to return {expected_result} for {workflow_fixture}" +@pytest.mark.parametrize( + "cores, memory, single_worker_cores, single_worker_memory, expected_workers", + [ + (8, "16G", 2, "4G", 4), + (16, "16G", 4, "8G", 2), + (16, "32G", 4, "4G", 4), + (4.5, "9G", 1.5, "3G", 3), + (12.5, "10G", 4, "2G", 3), + (16, "8G", 4, "2G", 4), + ], +) +def test_calculate_max_allowed_workers( + dask_resource_manager, + cores, + memory, + single_worker_cores, + single_worker_memory, + expected_workers, +): + # Set the DaskResourceManager instance attributes + dask_resource_manager.cores = cores + dask_resource_manager.memory = memory + dask_resource_manager.single_worker_cores = single_worker_cores + dask_resource_manager.single_worker_memory = single_worker_memory + + # Call the method and check the result + assert dask_resource_manager.calculate_max_allowed_workers() == expected_workers + + def test_create_dask_cluster(mock_k8s_client, dask_resource_manager): # Arrange dask_resource_manager.cluster_body = {"mock": "cluster_body"}