Skip to content

Commit

Permalink
feat(dask): parse and use cpu and ram requirements (reanahub#600)
Browse files Browse the repository at this point in the history
  • Loading branch information
Alputer committed Sep 13, 2024
1 parent e69a39d commit 8a8319f
Show file tree
Hide file tree
Showing 6 changed files with 111 additions and 7 deletions.
21 changes: 21 additions & 0 deletions reana_workflow_controller/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,27 @@ def _parse_interactive_sessions_environments(env_var):
DASK_ENABLED = strtobool(os.getenv("DASK_ENABLED", "true"))
"""Whether dask is enabled in the cluster or not"""

REANA_DASK_CLUSTER_DEFAULT_CORES_LIMIT = os.getenv(
"REANA_DASK_CLUSTER_DEFAULT_CORES_LIMIT"
)
"""Maximum number of cores allowed for the dask clusters used by workflows."""

REANA_DASK_CLUSTER_DEFAULT_MEMORY_LIMIT = os.getenv(
"REANA_DASK_CLUSTER_DEFAULT_MEMORY_LIMIT"
)
"""Default memory limit for the dask clusters used by workflows."""

REANA_DASK_CLUSTER_DEFAULT_SINGLE_WORKER_CORES = os.getenv(
"REANA_DASK_CLUSTER_DEFAULT_SINGLE_WORKER_CORES "
)
"""Number of cores for one dask worker by default."""

REANA_DASK_CLUSTER_DEFAULT_SINGLE_WORKER_MEMORY = os.getenv(
"REANA_DASK_CLUSTER_DEFAULT_SINGLE_WORKER_MEMORY"
)
"""Memory for one dask worker by default."""


VOMSPROXY_CONTAINER_IMAGE = os.getenv(
"VOMSPROXY_CONTAINER_IMAGE", "docker.io/reanahub/reana-auth-vomsproxy:1.2.0"
)
Expand Down
32 changes: 32 additions & 0 deletions reana_workflow_controller/dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
get_workspace_volume,
get_reana_shared_volume,
)
from reana_commons.job_utils import kubernetes_memory_to_bytes

from reana_workflow_controller.k8s import create_dask_dashboard_ingress

Expand All @@ -40,6 +41,10 @@ def __init__(
workflow_spec,
workflow_workspace,
user_id,
cores,
memory,
single_worker_cores,
single_worker_memory,
):
"""Instantiate dask resource manager.
Expand All @@ -53,6 +58,10 @@ def __init__(
:type user_id: str
"""
self.cluster_name = cluster_name
self.cores = cores
self.memory = memory
self.single_worker_cores = single_worker_cores
self.single_worker_memory = single_worker_memory
self.autoscaler_name = f"dask-autoscaler-{cluster_name}"
self.workflow_spec = workflow_spec
self.workflow_workspace = workflow_workspace
Expand Down Expand Up @@ -132,6 +141,17 @@ def _prepare_cluster(self):
0
] = f'cd {self.workflow_workspace} && {self.cluster_body["spec"]["worker"]["spec"]["containers"][0]["args"][0]}'

# Set resource limits for workers
self.cluster_body["spec"]["worker"]["spec"]["containers"][0]["resources"] = {
"limits": {
"memory": f"{self.single_worker_memory}",
"cpu": str(self.single_worker_cores),
}
}

# Set max limit on autoscaler
self.autoscaler_body["spec"]["maximum"] = self.calculate_max_allowed_workers()

# Add DASK SCHEDULER URI env variable
self.cluster_body["spec"]["worker"]["spec"]["containers"][0]["env"].append(
{"name": "DASK_SCHEDULER_URI", "value": self.dask_scheduler_uri},
Expand Down Expand Up @@ -478,6 +498,18 @@ def _create_dask_autoscaler(self):
)
raise

def calculate_max_allowed_workers(self):
"""Calculate the max number of workers for dask autoscaler."""
total_memory_in_bytes = kubernetes_memory_to_bytes(self.memory)
single_worker_memory_in_bytes = kubernetes_memory_to_bytes(
self.single_worker_memory
)

max_workers_by_cores = self.cores // self.single_worker_cores
max_workers_by_memory = total_memory_in_bytes // single_worker_memory_in_bytes

return int(min(max_workers_by_cores, max_workers_by_memory))


def requires_dask(workflow):
"""Check whether Dask is necessary to run the workflow."""
Expand Down
3 changes: 1 addition & 2 deletions reana_workflow_controller/templates/dask_autoscaler.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
apiVersion: kubernetes.dask.org/v1
kind: DaskAutoscaler
spec:
minimum: 0
maximum: 8
minimum: 0
31 changes: 26 additions & 5 deletions reana_workflow_controller/workflow_run_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,10 @@
WORKFLOW_ENGINE_SERIAL_ENV_VARS,
WORKFLOW_ENGINE_SNAKEMAKE_ENV_VARS,
WORKFLOW_ENGINE_YADAGE_ENV_VARS,
DASK_ENABLED,
REANA_DASK_CLUSTER_DEFAULT_MEMORY_LIMIT,
REANA_DASK_CLUSTER_DEFAULT_CORES_LIMIT,
REANA_DASK_CLUSTER_DEFAULT_SINGLE_WORKER_MEMORY,
REANA_DASK_CLUSTER_DEFAULT_SINGLE_WORKER_CORES,
)


Expand Down Expand Up @@ -374,16 +377,34 @@ def start_batch_workflow_run(
try:
# Create the dask cluster and required resources
if requires_dask(self.workflow):
if not DASK_ENABLED:
raise RuntimeError(
"Dask workflows are not supported in this cluster"
)

DaskResourceManager(
cluster_name=f"reana-run-dask-{self.workflow.id_}",
workflow_spec=self.workflow.reana_specification["workflow"],
workflow_workspace=self.workflow.workspace_path,
user_id=self.workflow.owner_id,
cores=self.workflow.reana_specification["workflow"]
.get("resources", {})
.get("dask", {})
.get("cores", REANA_DASK_CLUSTER_DEFAULT_CORES_LIMIT),
memory=self.workflow.reana_specification["workflow"]
.get("resources", {})
.get("dask", {})
.get("memory", REANA_DASK_CLUSTER_DEFAULT_MEMORY_LIMIT),
single_worker_cores=self.workflow.reana_specification["workflow"]
.get("resources", {})
.get("dask", {})
.get(
"single_worker_cores",
REANA_DASK_CLUSTER_DEFAULT_SINGLE_WORKER_CORES,
),
single_worker_memory=self.workflow.reana_specification["workflow"]
.get("resources", {})
.get("dask", {})
.get(
"single_worker_memory",
REANA_DASK_CLUSTER_DEFAULT_SINGLE_WORKER_MEMORY,
),
).create_dask_resources()

current_k8s_batchv1_api_client.create_namespaced_job(
Expand Down
2 changes: 2 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,8 @@ def dask_resource_manager(sample_serial_workflow_in_db_with_dask, mock_user_secr
"""Fixture to create a DaskResourceManager instance."""
manager = DaskResourceManager(
cluster_name="test-cluster",
cores=3,
memory="2G",
workflow_spec=sample_serial_workflow_in_db_with_dask.reana_specification[
"workflow"
],
Expand Down
29 changes: 29 additions & 0 deletions tests/test_dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,35 @@ def test_requires_dask(workflow_fixture, expected_result, request):
), f"Expected requires_dask to return {expected_result} for {workflow_fixture}"


@pytest.mark.parametrize(
"cores, memory, single_worker_cores, single_worker_memory, expected_workers",
[
(8, "16G", 2, "4G", 4),
(16, "16G", 4, "8G", 2),
(16, "32G", 4, "4G", 4),
(4.5, "9G", 1.5, "3G", 3),
(12.5, "10G", 4, "2G", 3),
(16, "8G", 4, "2G", 4),
],
)
def test_calculate_max_allowed_workers(
dask_resource_manager,
cores,
memory,
single_worker_cores,
single_worker_memory,
expected_workers,
):
# Set the DaskResourceManager instance attributes
dask_resource_manager.cores = cores
dask_resource_manager.memory = memory
dask_resource_manager.single_worker_cores = single_worker_cores
dask_resource_manager.single_worker_memory = single_worker_memory

# Call the method and check the result
assert dask_resource_manager.calculate_max_allowed_workers() == expected_workers


def test_create_dask_cluster(mock_k8s_client, dask_resource_manager):
# Arrange
dask_resource_manager.cluster_body = {"mock": "cluster_body"}
Expand Down

0 comments on commit 8a8319f

Please sign in to comment.