Skip to content

Commit

Permalink
feat(config): make Dask autoscaler configurable (#604)
Browse files Browse the repository at this point in the history
  • Loading branch information
Alputer committed Nov 11, 2024
1 parent 1515865 commit db9ac05
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 33 deletions.
3 changes: 3 additions & 0 deletions reana_workflow_controller/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,9 @@ def _parse_interactive_sessions_environments(env_var):
DASK_ENABLED = strtobool(os.getenv("DASK_ENABLED", "true"))
"""Whether Dask is enabled in the cluster or not"""

DASK_AUTOSCALER_ENABLED = os.getenv("DASK_AUTOSCALER_ENABLED", "true").lower() == "true"
"""Whether Dask autoscaler is enabled in the cluster or not"""

REANA_DASK_CLUSTER_MAX_MEMORY_LIMIT = os.getenv(
"REANA_DASK_CLUSTER_MAX_MEMORY_LIMIT", "16Gi"
)
Expand Down
17 changes: 10 additions & 7 deletions reana_workflow_controller/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
from reana_workflow_controller.errors import REANAWorkflowControllerError
from reana_workflow_controller.k8s import delete_dask_dashboard_ingress

from reana_workflow_controller.config import DASK_AUTOSCALER_ENABLED
from reana_workflow_controller.dask import requires_dask

try:
Expand Down Expand Up @@ -325,13 +326,15 @@ def _delete_dask_cluster(workflow: Workflow) -> None:
name=f"reana-run-dask-{workflow.id_}",
)

current_k8s_custom_objects_api_client.delete_namespaced_custom_object(
group="kubernetes.dask.org",
version="v1",
plural="daskautoscalers",
namespace="default",
name=f"dask-autoscaler-reana-run-dask-{workflow.id_}",
)
if DASK_AUTOSCALER_ENABLED:
current_k8s_custom_objects_api_client.delete_namespaced_custom_object(

Check warning on line 330 in reana_workflow_controller/consumer.py

View check run for this annotation

Codecov / codecov/patch

reana_workflow_controller/consumer.py#L329-L330

Added lines #L329 - L330 were not covered by tests
group="kubernetes.dask.org",
version="v1",
plural="daskautoscalers",
namespace="default",
name=f"dask-autoscaler-reana-run-dask-{workflow.id_}",
)

delete_dask_dashboard_ingress(
f"dask-dashboard-ingress-reana-run-dask-{workflow.id_}", workflow.id_
)
56 changes: 39 additions & 17 deletions reana_workflow_controller/dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@
get_workspace_volume,
get_reana_shared_volume,
)
from reana_commons.job_utils import kubernetes_memory_to_bytes

from reana_workflow_controller.config import DASK_AUTOSCALER_ENABLED
from reana_workflow_controller.k8s import create_dask_dashboard_ingress


Expand Down Expand Up @@ -64,7 +66,7 @@ def __init__(
self.user_id = user_id

self.cluster_spec = workflow_spec.get("resources", {}).get("dask", [])
self.cluster_body, self.autoscaler_body = self._load_dask_templates()
self.cluster_body = self._load_dask_cluster_template()
self.cluster_image = self.cluster_spec["image"]
self.dask_scheduler_uri = (
f"{self.cluster_name}-scheduler.default.svc.cluster.local:8786"
Expand All @@ -77,29 +79,43 @@ def __init__(
)
self.kubernetes_uid = WORKFLOW_RUNTIME_USER_UID

def _load_dask_templates(self):
"""Load Dask templates from YAML files."""
if DASK_AUTOSCALER_ENABLED:
self.autoscaler_name = f"dask-autoscaler-{cluster_name}"
self.autoscaler_body = self._load_dask_autoscaler_template()

def _load_dask_cluster_template(self):
"""Load Dask cluster template from YAML file."""
with open(
"reana_workflow_controller/templates/dask_cluster.yaml", "r"
) as dask_cluster_yaml, open(
"reana_workflow_controller/templates/dask_autoscaler.yaml", "r"
) as dask_autoscaler_yaml:
) as dask_cluster_yaml:
dask_cluster_body = yaml.safe_load(dask_cluster_yaml)
dask_autoscaler_body = yaml.safe_load(dask_autoscaler_yaml)
dask_cluster_body["spec"]["worker"]["spec"]["initContainers"] = []
dask_cluster_body["spec"]["worker"]["spec"]["containers"][0]["env"] = []
dask_cluster_body["spec"]["worker"]["spec"]["containers"][0][
"volumeMounts"
] = []
dask_cluster_body["spec"]["worker"]["spec"]["volumes"] = []

return dask_cluster_body, dask_autoscaler_body
return dask_cluster_body

def _load_dask_autoscaler_template(self):
"""Load Dask autoscaler template from YAML file."""
with open(
"reana_workflow_controller/templates/dask_autoscaler.yaml", "r"
) as dask_autoscaler_yaml:
dask_autoscaler_body = yaml.safe_load(dask_autoscaler_yaml)

return dask_autoscaler_body

def create_dask_resources(self):
"""Create necessary Dask resources for the workflow."""
self._prepare_cluster()
self._create_dask_cluster()
self._create_dask_autoscaler()

if DASK_AUTOSCALER_ENABLED:
self._prepare_autoscaler()
self._create_dask_autoscaler()

create_dask_dashboard_ingress(self.cluster_name, self.workflow_id)

def _prepare_cluster(self):
Expand All @@ -113,16 +129,10 @@ def _prepare_cluster(self):
# Add the name of the cluster, used in scheduler service name
self.cluster_body["metadata"] = {"name": self.cluster_name}

# Add the name of the dask autoscaler
self.autoscaler_body["metadata"] = {"name": self.autoscaler_name}

self.cluster_body["spec"]["scheduler"]["service"]["selector"][
"dask.org/cluster-name"
] = self.cluster_name

# Connect autoscaler to the cluster
self.autoscaler_body["spec"]["cluster"] = self.cluster_name

# Add image to worker and scheduler
self.cluster_body["spec"]["worker"]["spec"]["containers"][0][
"image"
Expand All @@ -141,8 +151,9 @@ def _prepare_cluster(self):
"limits": {"memory": f"{self.single_worker_memory}", "cpu": "1"}
}

# Set max limit on autoscaler
self.autoscaler_body["spec"]["maximum"] = self.num_of_workers
self.cluster_body["spec"]["worker"]["replicas"] = (
0 if DASK_AUTOSCALER_ENABLED else self.num_of_workers
)

# Add DASK SCHEDULER URI env variable
self.cluster_body["spec"]["worker"]["spec"]["containers"][0]["env"].append(
Expand Down Expand Up @@ -174,6 +185,17 @@ def _prepare_cluster(self):
if rucio:
self._add_rucio_init_container()

def _prepare_autoscaler(self):
"""Prepare Dask autoscaler body."""
# Add the name of the dask autoscaler
self.autoscaler_body["metadata"] = {"name": self.autoscaler_name}

# Connect autoscaler to the cluster
self.autoscaler_body["spec"]["cluster"] = self.cluster_name

# Set max limit on autoscaler
self.autoscaler_body["spec"]["maximum"] = self.num_of_workers

def _add_image_pull_secrets(self):
"""Attach the configured image pull secrets to scheduler and worker containers."""
image_pull_secrets = []
Expand Down
9 changes: 0 additions & 9 deletions tests/test_dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -415,10 +415,6 @@ def test_prepare_cluster(dask_resource_manager):
dask_resource_manager.cluster_body["metadata"]["name"]
== dask_resource_manager.cluster_name
)
assert (
dask_resource_manager.autoscaler_body["metadata"]["name"]
== dask_resource_manager.autoscaler_name
)

assert {
"name": "DASK_SCHEDULER_URI",
Expand Down Expand Up @@ -462,11 +458,6 @@ def test_prepare_cluster(dask_resource_manager):
"worker"
]["spec"]["volumes"]

assert (
dask_resource_manager.autoscaler_body["spec"]["cluster"]
== dask_resource_manager.cluster_name
)

assert (
dask_resource_manager.cluster_body["spec"]["scheduler"]["service"][
"selector"
Expand Down

0 comments on commit db9ac05

Please sign in to comment.