From 80fad1368532f9c933669491b31bf33f167d6b02 Mon Sep 17 00:00:00 2001 From: Alputer Date: Thu, 8 Aug 2024 16:09:54 +0200 Subject: [PATCH] intermediate commit --- reana_workflow_controller/config.py | 34 ++ reana_workflow_controller/consumer.py | 23 + reana_workflow_controller/dask.py | 464 ++++++++++++++++++ .../templates/dask_autoscaler.yaml | 5 + .../templates/dask_cluster.yaml | 55 +++ .../workflow_run_manager.py | 22 + 6 files changed, 603 insertions(+) create mode 100644 reana_workflow_controller/dask.py create mode 100644 reana_workflow_controller/templates/dask_autoscaler.yaml create mode 100644 reana_workflow_controller/templates/dask_cluster.yaml diff --git a/reana_workflow_controller/config.py b/reana_workflow_controller/config.py index a5e2e888..c278bdc7 100644 --- a/reana_workflow_controller/config.py +++ b/reana_workflow_controller/config.py @@ -206,6 +206,40 @@ def _env_vars_dict_to_k8s_list(env_vars): IMAGE_PULL_SECRETS = os.getenv("IMAGE_PULL_SECRETS", "").split(",") """Docker image pull secrets which allow the usage of private images.""" +VOMSPROXY_CONTAINER_IMAGE = os.getenv( + "VOMSPROXY_CONTAINER_IMAGE", "docker.io/reanahub/reana-auth-vomsproxy:1.2.0" +) +"""Default docker image of VOMSPROXY sidecar container.""" + +VOMSPROXY_CONTAINER_NAME = "voms-proxy" +"""Name of VOMSPROXY sidecar container.""" + +VOMSPROXY_CERT_CACHE_LOCATION = "/vomsproxy_cache/" +"""Directory of voms-proxy certificate cache. + +This directory is shared between job & VOMSPROXY container.""" + +VOMSPROXY_CERT_CACHE_FILENAME = "x509up_proxy" +"""Name of the voms-proxy certificate cache file.""" + +RUCIO_CONTAINER_IMAGE = os.getenv( + "RUCIO_CONTAINER_IMAGE", "docker.io/reanahub/reana-auth-rucio:1.1.1" +) +"""Default docker image of RUCIO sidecar container.""" + +RUCIO_CONTAINER_NAME = "reana-auth-rucio" +"""Name of RUCIO sidecar container.""" + +RUCIO_CACHE_LOCATION = "/rucio_cache/" +"""Directory of Rucio cache. + +This directory is shared between job & Rucio container.""" + +RUCIO_CFG_CACHE_FILENAME = "rucio.cfg" +"""Name of the RUCIO configuration cache file.""" + +RUCIO_CERN_BUNDLE_CACHE_FILENAME = "CERN-bundle.pem" +"""Name of the CERN Bundle cache file.""" ALIVE_STATUSES = [ RunStatus.created, diff --git a/reana_workflow_controller/consumer.py b/reana_workflow_controller/consumer.py index f7b5b59d..254f99e7 100644 --- a/reana_workflow_controller/consumer.py +++ b/reana_workflow_controller/consumer.py @@ -22,6 +22,7 @@ from reana_commons.k8s.api_client import ( current_k8s_batchv1_api_client, current_k8s_corev1_api_client, + current_k8s_custom_objects_api_client, ) from reana_commons.k8s.secrets import REANAUserSecretsStore from reana_commons.utils import ( @@ -142,6 +143,7 @@ def on_message(self, body, message): def _update_workflow_status(workflow, status, logs): """Update workflow status in DB.""" + if workflow.status != status: Workflow.update_workflow_status(Session, workflow.id_, status, logs, None) if workflow.git_ref: @@ -164,6 +166,8 @@ def _update_workflow_status(workflow, status, logs): if RunStatus.should_cleanup_job(status): try: _delete_workflow_job(workflow) + if workflow.requires_dask(): + _delete_dask_cluster(workflow) except ApiException as e: logging.error( f"Could not clean up workflow job for workflow {workflow.id_}. " @@ -291,3 +295,22 @@ def _get_workflow_engine_pod_logs(workflow: Workflow) -> str: # There might not be any pod returned by `list_namespaced_pod`, for example # when a workflow fails to be scheduled return "" + + +def _delete_dask_cluster(workflow: Workflow) -> None: + """Delete the Dask cluster resources.""" + current_k8s_custom_objects_api_client.delete_namespaced_custom_object( + group="kubernetes.dask.org", + version="v1", + plural="daskclusters", + namespace="default", + name=f"reana-run-dask-{workflow.id_}", + ) + + current_k8s_custom_objects_api_client.delete_namespaced_custom_object( + group="kubernetes.dask.org", + version="v1", + plural="daskautoscalers", + namespace="default", + name=f"dask-autoscaler-reana-run-dask-{workflow.id_}", + ) diff --git a/reana_workflow_controller/dask.py b/reana_workflow_controller/dask.py new file mode 100644 index 00000000..9fc3c914 --- /dev/null +++ b/reana_workflow_controller/dask.py @@ -0,0 +1,464 @@ +# This file is part of REANA. +# Copyright (C) 2024 CERN. +# +# REANA is free software; you can redistribute it and/or modify it +# under the terms of the MIT License; see LICENSE file for more details. + +"""Dask resource manager.""" +import logging +import os +import yaml + +from flask import current_app + +from reana_commons.config import ( + K8S_CERN_EOS_AVAILABLE, + K8S_CERN_EOS_MOUNT_CONFIGURATION, + KRB5_STATUS_FILE_LOCATION, + REANA_JOB_HOSTPATH_MOUNTS, +) +from reana_commons.k8s.api_client import current_k8s_custom_objects_api_client +from reana_commons.k8s.kerberos import get_kerberos_k8s_config +from reana_commons.k8s.secrets import REANAUserSecretsStore +from reana_commons.k8s.volumes import ( + get_workspace_volume, + get_reana_shared_volume, +) + + +class DaskResourceManager: + def __init__( + self, + cluster_name, + workflow_spec, + workflow_workspace, + user_id, + ): + self.cluster_name = cluster_name + self.autoscaler_name = f"dask-autoscaler-{cluster_name}" + self.workflow_spec = workflow_spec + self.workflow_workspace = workflow_workspace + self.user_id = user_id + + self.clusters = workflow_spec.get("resources", {}).get("dask", []) + self.current_cluster_body = None # There might be multiple dask clusters, this variable always refers to the cluster currently being processed + self.current_autoscaler_body = None # There might be multiple dask clusters, this variable always refers to the autoscaler of the cluster currently being processed + + self.secrets_store = REANAUserSecretsStore(self.user_id) + self.secret_env_vars = self.secrets_store.get_env_secrets_as_k8s_spec() + self.secrets_volume_mount = ( + self.secrets_store.get_secrets_volume_mount_as_k8s_spec() + ) + + def create_dask_resources(self): + + dask_cluster_body_template, dask_autoscaler_body_template = ( + self._load_dask_templates() + ) + + for cluster in self.clusters: + + self.current_cluster_body = dask_cluster_body_template + self.current_autoscaler_body = dask_autoscaler_body_template + cluster_image = cluster["image"] + dask_scheduler_uri = ( + f"{self.cluster_name}-scheduler.default.svc.cluster.local:8786" + ) + + self._prepare_cluster(cluster_image, dask_scheduler_uri) + self._create_dask_cluster() + self._create_dask_autoscaler() + + def _load_dask_templates(self): + """Load Dask templates from YAML files.""" + with open( + "reana_workflow_controller/templates/dask_cluster.yaml", "r" + ) as dask_cluster_yaml, open( + "reana_workflow_controller/templates/dask_autoscaler.yaml", "r" + ) as dask_autoscaler_yaml: + dask_cluster_body = yaml.safe_load(dask_cluster_yaml) + dask_autoscaler_body = yaml.safe_load(dask_autoscaler_yaml) + dask_cluster_body["spec"]["worker"]["spec"]["initContainers"] = [] + dask_cluster_body["spec"]["worker"]["spec"]["containers"][0]["env"] = [] + dask_cluster_body["spec"]["worker"]["spec"]["containers"][0][ + "volumeMounts" + ] = [] + dask_cluster_body["spec"]["worker"]["spec"]["volumes"] = [] + + return dask_cluster_body, dask_autoscaler_body + + def _prepare_cluster(self, cluster_image, dask_scheduler_uri): + """Prepare Dask cluster body by adding necessary image-pull secrets, volumes, volume mounts, init containers and sidecar containers.""" + + self._add_image_pull_secrets() + self._add_hostpath_volumes() + self._add_workspace_volume() + self._add_shared_volume() + self._add_eos_volume() + + # Add the name of the cluster, used in scheduler service name + self.current_cluster_body["metadata"] = {"name": self.cluster_name} + + # Add the name of the dask autoscaler + self.current_autoscaler_body["metadata"] = {"name": self.autoscaler_name} + + self.current_cluster_body["spec"]["scheduler"]["service"]["selector"][ + "dask.org/cluster-name" + ] = self.cluster_name + + # Connect autoscaler to the cluster + self.current_autoscaler_body["spec"]["cluster"] = self.cluster_name + + # Add image to worker and scheduler + self.current_cluster_body["spec"]["worker"]["spec"]["containers"][0][ + "image" + ] = cluster_image + self.current_cluster_body["spec"]["scheduler"]["spec"]["containers"][0][ + "image" + ] = cluster_image + + # Create the worker command + self.current_cluster_body["spec"]["worker"]["spec"]["containers"][0]["args"][ + 0 + ] = f'cd {self.workflow_workspace} && {self.current_cluster_body["spec"]["worker"]["spec"]["containers"][0]["args"][0]}' + + # Add DASK SCHEDULER URI env variable + self.current_cluster_body["spec"]["worker"]["spec"]["containers"][0][ + "env" + ].append( + {"name": "DASK_SCHEDULER_URI", "value": dask_scheduler_uri}, + ) + + # Add secrets + self.current_cluster_body["spec"]["worker"]["spec"]["containers"][0][ + "env" + ].extend(self.secret_env_vars) + + self.current_cluster_body["spec"]["worker"]["spec"]["containers"][0][ + "volumeMounts" + ].append(self.secrets_volume_mount) + + self.current_cluster_body["spec"]["worker"]["spec"]["volumes"].append( + self.secrets_store.get_file_secrets_volume_as_k8s_specs() + ) + + # @TODO: Decide how to detect if krb5, rucio and voms_proxy are needed + kerberos = False + rucio = False + voms_proxy = False + + if kerberos: + self._add_krb5_containers() + if voms_proxy: + self._add_voms_proxy_init_container() + if rucio: + self._add_rucio_init_container() + + def _add_image_pull_secrets(self): + """Attach the configured image pull secrets to the scheduler and worker containers.""" + image_pull_secrets = [] + for secret_name in current_app.config["IMAGE_PULL_SECRETS"]: + if secret_name: + image_pull_secrets.append({"name": secret_name}) + + self.current_cluster_body["spec"]["worker"]["spec"]["containers"][0][ + "imagePullSecrets" + ] = image_pull_secrets + + self.current_cluster_body["spec"]["scheduler"]["spec"]["containers"][0][ + "imagePullSecrets" + ] = image_pull_secrets + + def _add_workspace_volume(self): + """Add workspace volume to the Dask workers.""" + volume_mount, volume = get_workspace_volume(self.workflow_workspace) + self._add_volumes([(volume_mount, volume)]) + + def _add_eos_volume(self): + """Add EOS volume to the Dask cluster body.""" + if K8S_CERN_EOS_AVAILABLE: + self._add_volumes( + [ + ( + K8S_CERN_EOS_MOUNT_CONFIGURATION["volumeMounts"], + K8S_CERN_EOS_MOUNT_CONFIGURATION["volume"], + ) + ] + ) + + def _add_shared_volume(self): + """Add shared CephFS volume to Dask workers.""" + + # TODO: We should check if this volume necessary but HOW? + shared_volume = get_reana_shared_volume() + # check if shared_volume is not already added + if not any( + v["name"] == shared_volume["name"] + for v in self.current_cluster_body["spec"]["worker"]["spec"]["volumes"] + ): + self.current_cluster_body["spec"]["worker"]["spec"]["volumes"].append( + shared_volume + ) + + def _add_hostpath_volumes(self): + """Add hostPath mounts from configuration to the Dask workers.""" + volumes_to_mount = [] + for mount in REANA_JOB_HOSTPATH_MOUNTS: + volume_mount = { + "name": mount["name"], + "mountPath": mount.get("mountPath", mount["hostPath"]), + } + volume = {"name": mount["name"], "hostPath": {"path": mount["hostPath"]}} + volumes_to_mount.append((volume_mount, volume)) + + self._add_volumes(volumes_to_mount) + + def _add_volumes(self, volumes): + """Add provided volumes to the Dask cluster body.""" + for volume_mount, volume in volumes: + self.current_cluster_body["spec"]["worker"]["spec"]["containers"][0][ + "volumeMounts" + ].append(volume_mount) + self.current_cluster_body["spec"]["worker"]["spec"]["volumes"].append( + volume + ) + + def _add_krb5_containers(self): + """Add krb5 init and renew containers for Dask workers.""" + krb5_config = get_kerberos_k8s_config( + self.secrets_store, + kubernetes_uid=self.kubernetes_uid, + ) + + self.current_cluster_body["spec"]["worker"]["spec"]["volumes"].extend( + krb5_config.volumes + ) + self.current_cluster_body["spec"]["worker"]["spec"]["containers"][0][ + "volumeMounts" + ].extend(krb5_config.volume_mounts) + # Add the Kerberos token cache file location to the job container + # so every instance of Kerberos picks it up even if it doesn't read + # the configuration file. + self.current_cluster_body["spec"]["worker"]["spec"]["containers"][0][ + "env" + ].extend(krb5_config.env) + # Add Kerberos init container used to generate ticket + self.current_cluster_body["spec"]["worker"]["spec"]["initContainers"].append( + krb5_config.init_container + ) + + # Add Kerberos renew container to renew ticket periodically for long-running jobs + self.current_cluster_body["spec"]["worker"]["spec"]["containers"].append( + krb5_config.renew_container + ) + + # Extend the main job command to create a file after it's finished + existing_args = self.current_cluster_body["spec"]["worker"]["spec"][ + "containers" + ][0]["args"] + + self.current_cluster_body["spec"]["worker"]["spec"]["containers"][0]["args"] = [ + f"trap 'touch {KRB5_STATUS_FILE_LOCATION}' EXIT; " + existing_args + ] + + def _add_voms_proxy_init_container(self): + """Add sidecar container for Dask workers.""" + ticket_cache_volume = {"name": "voms-proxy-cache", "emptyDir": {}} + volume_mounts = [ + { + "name": ticket_cache_volume["name"], + "mountPath": current_app.config["VOMSPROXY_CERT_CACHE_LOCATION"], + } + ] + + voms_proxy_file_path = os.path.join( + current_app.config["VOMSPROXY_CERT_CACHE_LOCATION"], + current_app.config["VOMSPROXY_CERT_CACHE_FILENAME"], + ) + + voms_proxy_vo = os.environ.get("VONAME", "") + voms_proxy_user_file = os.environ.get("VOMSPROXY_FILE", "") + + if voms_proxy_user_file: + # multi-user deployment mode, where we rely on VOMS proxy file supplied by the user + voms_proxy_container = { + "image": current_app.config["VOMSPROXY_CONTAINER_IMAGE"], + "command": ["/bin/bash"], + "args": [ + "-c", + 'if [ ! -f "/etc/reana/secrets/{voms_proxy_user_file}" ]; then \ + echo "[ERROR] VOMSPROXY_FILE {voms_proxy_user_file} does not exist in user secrets."; \ + exit; \ + fi; \ + cp /etc/reana/secrets/{voms_proxy_user_file} {voms_proxy_file_path}; \ + chown {kubernetes_uid} {voms_proxy_file_path}'.format( + voms_proxy_user_file=voms_proxy_user_file, + voms_proxy_file_path=voms_proxy_file_path, + kubernetes_uid=self.kubernetes_uid, + ), + ], + "name": current_app.config["VOMSPROXY_CONTAINER_NAME"], + "imagePullPolicy": "IfNotPresent", + "volumeMounts": [self.secrets_volume_mount] + volume_mounts, + "env": self.secret_env_vars, + } + else: + # single-user deployment mode, where we generate VOMS proxy file in the sidecar from user secrets + voms_proxy_container = { + "image": current_app.config["VOMSPROXY_CONTAINER_IMAGE"], + "command": ["/bin/bash"], + "args": [ + "-c", + 'if [ ! -f "/etc/reana/secrets/userkey.pem" ]; then \ + echo "[ERROR] File userkey.pem does not exist in user secrets."; \ + exit; \ + fi; \ + if [ ! -f "/etc/reana/secrets/usercert.pem" ]; then \ + echo "[ERROR] File usercert.pem does not exist in user secrets."; \ + exit; \ + fi; \ + if [ -z "$VOMSPROXY_PASS" ]; then \ + echo "[ERROR] Environment variable VOMSPROXY_PASS is not set in user secrets."; \ + exit; \ + fi; \ + if [ -z "$VONAME" ]; then \ + echo "[ERROR] Environment variable VONAME is not set in user secrets."; \ + exit; \ + fi; \ + cp /etc/reana/secrets/userkey.pem /tmp/userkey.pem; \ + chmod 400 /tmp/userkey.pem; \ + echo $VOMSPROXY_PASS | base64 -d | voms-proxy-init \ + --voms {voms_proxy_vo} --key /tmp/userkey.pem \ + --cert $(readlink -f /etc/reana/secrets/usercert.pem) \ + --pwstdin --out {voms_proxy_file_path}; \ + chown {kubernetes_uid} {voms_proxy_file_path}'.format( + voms_proxy_vo=voms_proxy_vo.lower(), + voms_proxy_file_path=voms_proxy_file_path, + kubernetes_uid=self.kubernetes_uid, + ), + ], + "name": current_app.config["VOMSPROXY_CONTAINER_NAME"], + "imagePullPolicy": "IfNotPresent", + "volumeMounts": [self.secrets_volume_mount] + volume_mounts, + "env": self.secret_env_vars, + } + + self.current_cluster_body["spec"]["worker"]["spec"]["volumes"].extend( + [ticket_cache_volume] + ) + self.current_cluster_body["spec"]["worker"]["spec"]["containers"][0][ + "volumeMounts" + ].extend(volume_mounts) + + # XrootD will look for a valid grid proxy in the location pointed to + # by the environment variable $X509_USER_PROXY + self.current_cluster_body["spec"]["worker"]["spec"]["containers"][0][ + "env" + ].append({"name": "X509_USER_PROXY", "value": voms_proxy_file_path}) + + self.current_cluster_body["spec"]["worker"]["spec"]["initContainers"].append( + voms_proxy_container + ) + + def _add_rucio_init_container(self): + """Add sidecar container for Dask workers.""" + ticket_cache_volume = {"name": "rucio-cache", "emptyDir": {}} + volume_mounts = [ + { + "name": ticket_cache_volume["name"], + "mountPath": current_app.config["RUCIO_CACHE_LOCATION"], + } + ] + + rucio_config_file_path = os.path.join( + current_app.config["RUCIO_CACHE_LOCATION"], + current_app.config["RUCIO_CFG_CACHE_FILENAME"], + ) + + cern_bundle_path = os.path.join( + current_app.config["RUCIO_CACHE_LOCATION"], + current_app.config["RUCIO_CERN_BUNDLE_CACHE_FILENAME"], + ) + + rucio_account = os.environ.get("RUCIO_USERNAME", "") + voms_proxy_vo = os.environ.get("VONAME", "") + + # Detect Rucio hosts from VO names + if voms_proxy_vo == "atlas": + rucio_host = "https://voatlasrucio-server-prod.cern.ch" + rucio_auth_host = "https://voatlasrucio-auth-prod.cern.ch" + else: + rucio_host = f"https://{voms_proxy_vo}-rucio.cern.ch" + rucio_auth_host = f"https://{voms_proxy_vo}-rucio-auth.cern.ch" + + # Allow overriding detected Rucio hosts by user-provided environment variables + rucio_host = os.environ.get("RUCIO_RUCIO_HOST", rucio_host) + rucio_auth_host = os.environ.get("RUCIO_AUTH_HOST", rucio_auth_host) + + rucio_config_container = { + "image": current_app.config["RUCIO_CONTAINER_IMAGE"], + "command": ["/bin/bash"], + "args": [ + "-c", + 'if [ -z "$VONAME" ]; then \ + echo "[ERROR] Environment variable VONAME is not set in user secrets."; \ + exit; \ + fi; \ + if [ -z "$RUCIO_USERNAME" ]; then \ + echo "[ERROR] Environment variable RUCIO_USERNAME is not set in user secrets."; \ + exit; \ + fi; \ + export RUCIO_CFG_ACCOUNT={rucio_account} \ + RUCIO_CFG_CLIENT_VO={voms_proxy_vo} \ + RUCIO_CFG_RUCIO_HOST={rucio_host} \ + RUCIO_CFG_AUTH_HOST={rucio_auth_host}; \ + cp /etc/pki/tls/certs/CERN-bundle.pem {cern_bundle_path}; \ + j2 /opt/user/rucio.cfg.j2 > {rucio_config_file_path}'.format( + rucio_host=rucio_host, + rucio_auth_host=rucio_auth_host, + rucio_account=rucio_account, + voms_proxy_vo=voms_proxy_vo, + cern_bundle_path=cern_bundle_path, + rucio_config_file_path=rucio_config_file_path, + ), + ], + "name": current_app.config["RUCIO_CONTAINER_NAME"], + "imagePullPolicy": "IfNotPresent", + "volumeMounts": [self.secrets_volume_mount] + volume_mounts, + "env": self.secret_env_vars, + } + + self.current_cluster_body["spec"]["worker"]["spec"]["volumes"].extend( + [ticket_cache_volume] + ) + self.current_cluster_body["spec"]["worker"]["spec"]["containers"][0][ + "volumeMounts" + ].extend(volume_mounts) + + self.current_cluster_body["spec"]["worker"]["spec"]["containers"][0][ + "env" + ].append({"name": "RUCIO_CONFIG", "value": rucio_config_file_path}) + + self.current_cluster_body["spec"]["worker"]["spec"]["initContainers"].append( + rucio_config_container + ) + + def _create_dask_cluster(self): + """Create the Dask cluster resource.""" + current_k8s_custom_objects_api_client.create_namespaced_custom_object( + group="kubernetes.dask.org", + version="v1", + plural="daskclusters", + namespace="default", + body=self.current_cluster_body, + ) + + def _create_dask_autoscaler(self): + """Create the Dask autoscaler resource.""" + current_k8s_custom_objects_api_client.create_namespaced_custom_object( + group="kubernetes.dask.org", + version="v1", + plural="daskautoscalers", + namespace="default", + body=self.current_autoscaler_body, + ) diff --git a/reana_workflow_controller/templates/dask_autoscaler.yaml b/reana_workflow_controller/templates/dask_autoscaler.yaml new file mode 100644 index 00000000..07fce7d6 --- /dev/null +++ b/reana_workflow_controller/templates/dask_autoscaler.yaml @@ -0,0 +1,5 @@ +apiVersion: kubernetes.dask.org/v1 +kind: DaskAutoscaler +spec: + minimum: 0 + maximum: 10 \ No newline at end of file diff --git a/reana_workflow_controller/templates/dask_cluster.yaml b/reana_workflow_controller/templates/dask_cluster.yaml new file mode 100644 index 00000000..12098eb9 --- /dev/null +++ b/reana_workflow_controller/templates/dask_cluster.yaml @@ -0,0 +1,55 @@ +apiVersion: kubernetes.dask.org/v1 +kind: DaskCluster +spec: + worker: + replicas: 0 + spec: + containers: + - name: worker + imagePullPolicy: "IfNotPresent" + command: ["/bin/sh", "-c"] + args: + - exec dask-worker --name $(DASK_WORKER_NAME) --dashboard --dashboard-address 8788 + ports: + - name: http-dashboard + containerPort: 8788 + protocol: TCP + scheduler: + spec: + containers: + - name: scheduler + imagePullPolicy: "IfNotPresent" + args: + - dask-scheduler + ports: + - name: tcp-comm + containerPort: 8786 + protocol: TCP + - name: http-dashboard + containerPort: 8787 + protocol: TCP + readinessProbe: + httpGet: + port: http-dashboard + path: /health + initialDelaySeconds: 5 + periodSeconds: 10 + livenessProbe: + httpGet: + port: http-dashboard + path: /health + initialDelaySeconds: 15 + periodSeconds: 20 + service: + type: ClusterIP + selector: + dask.org/component: scheduler + ports: + - name: tcp-comm + protocol: TCP + port: 8786 + targetPort: "tcp-comm" + - name: http-dashboard + protocol: TCP + port: 8787 + targetPort: "http-dashboard" \ No newline at end of file diff --git a/reana_workflow_controller/workflow_run_manager.py b/reana_workflow_controller/workflow_run_manager.py index 3eee0b93..25fe69a5 100644 --- a/reana_workflow_controller/workflow_run_manager.py +++ b/reana_workflow_controller/workflow_run_manager.py @@ -51,6 +51,8 @@ from reana_workflow_controller.errors import REANAInteractiveSessionError +from reana_workflow_controller.dask import DaskResourceManager + from reana_workflow_controller.k8s import ( build_interactive_k8s_objects, delete_k8s_ingress_object, @@ -296,6 +298,7 @@ def start_batch_workflow_run( options to be overwritten or added to the current workflow run. :param type: Dict """ + workflow_run_name = self._workflow_run_name_generator("batch") job = self._create_job_spec( workflow_run_name, @@ -308,6 +311,16 @@ def start_batch_workflow_run( if self.retrieve_required_cvmfs_repos(): create_cvmfs_persistent_volume_claim() + # Create the dask cluster and required resources + + if self.workflow.requires_dask(): + DaskResourceManager( + cluster_name=f"reana-run-dask-{self.workflow.id_}", + workflow_spec=self.workflow.reana_specification["workflow"], + workflow_workspace=self.workflow.workspace_path, + user_id=self.workflow.owner_id, + ).create_dask_resources() + current_k8s_batchv1_api_client.create_namespaced_job( namespace=REANA_RUNTIME_KUBERNETES_NAMESPACE, body=job ) @@ -527,6 +540,7 @@ def _create_job_spec( command=["/bin/bash", "-c"], args=command, ) + workflow_engine_env_vars.extend( [ { @@ -643,6 +657,7 @@ def _create_job_spec( {"name": "WORKSPACE_PATHS", "value": json.dumps(WORKSPACE_PATHS)}, ] ) + job_controller_container.env.extend(job_controller_env_vars) job_controller_container.env.extend(job_controller_env_secrets) if REANA_RUNTIME_JOBS_KUBERNETES_NODE_LABEL: @@ -652,6 +667,13 @@ def _create_job_spec( "value": os.getenv("REANA_RUNTIME_JOBS_KUBERNETES_NODE_LABEL"), }, ) + if self.workflow.requires_dask(): + job_controller_container.env.append( + { + "name": "DASK_SCHEDULER_URI", + "value": f"reana-run-dask-{self.workflow.id_}-scheduler.default.svc.cluster.local:8786", + }, + ) secrets_volume_mount = secrets_store.get_secrets_volume_mount_as_k8s_spec() job_controller_container.volume_mounts = [workspace_mount, secrets_volume_mount]