diff --git a/reana_workflow_controller/config.py b/reana_workflow_controller/config.py index 5bdcd9df..05a6c8fa 100644 --- a/reana_workflow_controller/config.py +++ b/reana_workflow_controller/config.py @@ -260,6 +260,9 @@ def _parse_interactive_sessions_environments(env_var): DASK_ENABLED = strtobool(os.getenv("DASK_ENABLED", "true")) """Whether dask is enabled in the cluster or not""" +DASK_AUTOSCALER_ENABLED = strtobool(os.getenv("DASK_AUTOSCALER_ENABLED", "true")) +"""Whether dask autoscaler is enabled in the cluster or not""" + REANA_DASK_CLUSTER_DEFAULT_CORES_LIMIT = float( os.getenv("REANA_DASK_CLUSTER_DEFAULT_CORES_LIMIT", 4) ) diff --git a/reana_workflow_controller/dask.py b/reana_workflow_controller/dask.py index 7cd29f71..9671c61a 100644 --- a/reana_workflow_controller/dask.py +++ b/reana_workflow_controller/dask.py @@ -29,6 +29,7 @@ ) from reana_commons.job_utils import kubernetes_memory_to_bytes +from reana_workflow_controller.config import DASK_AUTOSCALER_ENABLED from reana_workflow_controller.k8s import create_dask_dashboard_ingress @@ -62,14 +63,13 @@ def __init__( self.memory = memory self.single_worker_cores = single_worker_cores self.single_worker_memory = single_worker_memory - self.autoscaler_name = f"dask-autoscaler-{cluster_name}" self.workflow_spec = workflow_spec self.workflow_workspace = workflow_workspace self.workflow_id = workflow_workspace.split("/")[-1] self.user_id = user_id self.cluster_spec = workflow_spec.get("resources", {}).get("dask", []) - self.cluster_body, self.autoscaler_body = self._load_dask_templates() + self.cluster_body = self._load_dask_cluster_template() self.cluster_image = self.cluster_spec["image"] self.dask_scheduler_uri = ( f"{self.cluster_name}-scheduler.default.svc.cluster.local:8786" @@ -82,15 +82,16 @@ def __init__( ) self.kubernetes_uid = WORKFLOW_RUNTIME_USER_UID - def _load_dask_templates(self): - """Load Dask templates from YAML files.""" + if DASK_AUTOSCALER_ENABLED: + self.autoscaler_name = f"dask-autoscaler-{cluster_name}" + self.autoscaler_body = self._load_dask_autoscaler_template() + + def _load_dask_cluster_template(self): + """Load Dask cluster template from YAML file.""" with open( "reana_workflow_controller/templates/dask_cluster.yaml", "r" - ) as dask_cluster_yaml, open( - "reana_workflow_controller/templates/dask_autoscaler.yaml", "r" - ) as dask_autoscaler_yaml: + ) as dask_cluster_yaml: dask_cluster_body = yaml.safe_load(dask_cluster_yaml) - dask_autoscaler_body = yaml.safe_load(dask_autoscaler_yaml) dask_cluster_body["spec"]["worker"]["spec"]["initContainers"] = [] dask_cluster_body["spec"]["worker"]["spec"]["containers"][0]["env"] = [] dask_cluster_body["spec"]["worker"]["spec"]["containers"][0][ @@ -98,13 +99,26 @@ def _load_dask_templates(self): ] = [] dask_cluster_body["spec"]["worker"]["spec"]["volumes"] = [] - return dask_cluster_body, dask_autoscaler_body + return dask_cluster_body + + def _load_dask_autoscaler_template(self): + """Load Dask autoscaler template from YAML file.""" + with open( + "reana_workflow_controller/templates/dask_autoscaler.yaml", "r" + ) as dask_autoscaler_yaml: + dask_autoscaler_body = yaml.safe_load(dask_autoscaler_yaml) + + return dask_autoscaler_body def create_dask_resources(self): """Create necessary dask resources for the workflow.""" self._prepare_cluster() self._create_dask_cluster() - self._create_dask_autoscaler() + + if DASK_AUTOSCALER_ENABLED: + self._prepare_autoscaler() + self._create_dask_autoscaler() + create_dask_dashboard_ingress(self.cluster_name, self.workflow_id) def _prepare_cluster(self): @@ -118,16 +132,10 @@ def _prepare_cluster(self): # Add the name of the cluster, used in scheduler service name self.cluster_body["metadata"] = {"name": self.cluster_name} - # Add the name of the dask autoscaler - self.autoscaler_body["metadata"] = {"name": self.autoscaler_name} - self.cluster_body["spec"]["scheduler"]["service"]["selector"][ "dask.org/cluster-name" ] = self.cluster_name - # Connect autoscaler to the cluster - self.autoscaler_body["spec"]["cluster"] = self.cluster_name - # Add image to worker and scheduler self.cluster_body["spec"]["worker"]["spec"]["containers"][0][ "image" @@ -149,8 +157,10 @@ def _prepare_cluster(self): } } - # Set max limit on autoscaler - self.autoscaler_body["spec"]["maximum"] = self.calculate_max_allowed_workers() + # Set number of workers in the cluster + self.cluster_body["spec"]["worker"]["replicas"] = ( + 0 if DASK_AUTOSCALER_ENABLED else self.calculate_max_allowed_workers() + ) # Add DASK SCHEDULER URI env variable self.cluster_body["spec"]["worker"]["spec"]["containers"][0]["env"].append( @@ -182,6 +192,17 @@ def _prepare_cluster(self): if rucio: self._add_rucio_init_container() + def _prepare_autoscaler(self): + """Prepare Dask autoscaler body.""" + # Add the name of the dask autoscaler + self.autoscaler_body["metadata"] = {"name": self.autoscaler_name} + + # Connect autoscaler to the cluster + self.autoscaler_body["spec"]["cluster"] = self.cluster_name + + # Set max limit on autoscaler + self.autoscaler_body["spec"]["maximum"] = self.calculate_max_allowed_workers() + def _add_image_pull_secrets(self): """Attach the configured image pull secrets to scheduler and worker containers.""" image_pull_secrets = [] diff --git a/tests/test_dask.py b/tests/test_dask.py index f2296772..a8fd492d 100644 --- a/tests/test_dask.py +++ b/tests/test_dask.py @@ -441,10 +441,6 @@ def test_prepare_cluster(dask_resource_manager): dask_resource_manager.cluster_body["metadata"]["name"] == dask_resource_manager.cluster_name ) - assert ( - dask_resource_manager.autoscaler_body["metadata"]["name"] - == dask_resource_manager.autoscaler_name - ) assert { "name": "DASK_SCHEDULER_URI", @@ -488,11 +484,6 @@ def test_prepare_cluster(dask_resource_manager): "worker" ]["spec"]["volumes"] - assert ( - dask_resource_manager.autoscaler_body["spec"]["cluster"] - == dask_resource_manager.cluster_name - ) - assert ( dask_resource_manager.cluster_body["spec"]["scheduler"]["service"][ "selector"