Skip to content

Commit

Permalink
feat(dask): parse and use cpu and ram requirements (reanahub#600)
Browse files Browse the repository at this point in the history
  • Loading branch information
Alputer committed Sep 11, 2024
1 parent e69a39d commit 49bac4b
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 2 deletions.
32 changes: 32 additions & 0 deletions reana_workflow_controller/dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
get_workspace_volume,
get_reana_shared_volume,
)
from reana_commons.job_utils import kubernetes_memory_to_bytes

from reana_workflow_controller.k8s import create_dask_dashboard_ingress

Expand All @@ -40,6 +41,10 @@ def __init__(
workflow_spec,
workflow_workspace,
user_id,
cores,
memory,
single_worker_cores=0.3,
single_worker_memory="256M",
):
"""Instantiate dask resource manager.
Expand All @@ -53,6 +58,10 @@ def __init__(
:type user_id: str
"""
self.cluster_name = cluster_name
self.cores = cores
self.memory = memory
self.single_worker_cores = single_worker_cores
self.single_worker_memory = single_worker_memory
self.autoscaler_name = f"dask-autoscaler-{cluster_name}"
self.workflow_spec = workflow_spec
self.workflow_workspace = workflow_workspace
Expand Down Expand Up @@ -132,6 +141,17 @@ def _prepare_cluster(self):
0
] = f'cd {self.workflow_workspace} && {self.cluster_body["spec"]["worker"]["spec"]["containers"][0]["args"][0]}'

# Set resource limits for workers
self.cluster_body["spec"]["worker"]["spec"]["containers"][0]["resources"] = {
"limits": {
"memory": f"{self.single_worker_memory}",
"cpu": str(self.single_worker_cores),
}
}

# Set max limit on autoscaler
self.autoscaler_body["spec"]["maximum"] = self.calculate_max_allowed_workers()

# Add DASK SCHEDULER URI env variable
self.cluster_body["spec"]["worker"]["spec"]["containers"][0]["env"].append(
{"name": "DASK_SCHEDULER_URI", "value": self.dask_scheduler_uri},
Expand Down Expand Up @@ -478,6 +498,18 @@ def _create_dask_autoscaler(self):
)
raise

def calculate_max_allowed_workers(self):
"""Calculate the max number of workers for dask autoscaler."""
total_memory_in_bytes = kubernetes_memory_to_bytes(self.memory)
single_worker_memory_in_bytes = kubernetes_memory_to_bytes(
self.single_worker_memory
)

max_workers_by_cores = self.cores // self.single_worker_cores
max_workers_by_memory = total_memory_in_bytes // single_worker_memory_in_bytes

return int(min(max_workers_by_cores, max_workers_by_memory))


def requires_dask(workflow):
"""Check whether Dask is necessary to run the workflow."""
Expand Down
3 changes: 1 addition & 2 deletions reana_workflow_controller/templates/dask_autoscaler.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
apiVersion: kubernetes.dask.org/v1
kind: DaskAutoscaler
spec:
minimum: 0
maximum: 8
minimum: 0
8 changes: 8 additions & 0 deletions reana_workflow_controller/workflow_run_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,14 @@ def start_batch_workflow_run(

DaskResourceManager(
cluster_name=f"reana-run-dask-{self.workflow.id_}",
cores=self.workflow.reana_specification["workflow"]
.get("resources", {})
.get("dask", {})
.get("cores"),
memory=self.workflow.reana_specification["workflow"]
.get("resources", {})
.get("dask", {})
.get("memory"),
workflow_spec=self.workflow.reana_specification["workflow"],
workflow_workspace=self.workflow.workspace_path,
user_id=self.workflow.owner_id,
Expand Down
2 changes: 2 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,8 @@ def dask_resource_manager(sample_serial_workflow_in_db_with_dask, mock_user_secr
"""Fixture to create a DaskResourceManager instance."""
manager = DaskResourceManager(
cluster_name="test-cluster",
cores=3,
memory="2G",
workflow_spec=sample_serial_workflow_in_db_with_dask.reana_specification[
"workflow"
],
Expand Down
29 changes: 29 additions & 0 deletions tests/test_dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,35 @@ def test_requires_dask(workflow_fixture, expected_result, request):
), f"Expected requires_dask to return {expected_result} for {workflow_fixture}"


@pytest.mark.parametrize(
"cores, memory, single_worker_cores, single_worker_memory, expected_workers",
[
(8, "16G", 2, "4G", 4),
(16, "16G", 4, "8G", 2),
(16, "32G", 4, "4G", 4),
(4.5, "9G", 1.5, "3G", 3),
(12.5, "10G", 4, "2G", 3),
(16, "8G", 4, "2G", 4),
],
)
def test_calculate_max_allowed_workers(
dask_resource_manager,
cores,
memory,
single_worker_cores,
single_worker_memory,
expected_workers,
):
# Set the DaskResourceManager instance attributes
dask_resource_manager.cores = cores
dask_resource_manager.memory = memory
dask_resource_manager.single_worker_cores = single_worker_cores
dask_resource_manager.single_worker_memory = single_worker_memory

# Call the method and check the result
assert dask_resource_manager.calculate_max_allowed_workers() == expected_workers


def test_create_dask_cluster(mock_k8s_client, dask_resource_manager):
# Arrange
dask_resource_manager.cluster_body = {"mock": "cluster_body"}
Expand Down

0 comments on commit 49bac4b

Please sign in to comment.