Skip to content

Commit

Permalink
feat(dask): add additional checks for dask workflows(#600)
Browse files Browse the repository at this point in the history
  • Loading branch information
Alputer committed Sep 9, 2024
1 parent 2a7c2f1 commit f48ffe5
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 9 deletions.
7 changes: 7 additions & 0 deletions reana_workflow_controller/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import os
import json

from distutils.util import strtobool
from reana_commons.config import REANA_COMPONENT_PREFIX, SHARED_VOLUME_PATH
from reana_db.models import JobStatus, RunStatus

Expand Down Expand Up @@ -256,6 +257,12 @@ def _parse_interactive_sessions_environments(env_var):
IMAGE_PULL_SECRETS = os.getenv("IMAGE_PULL_SECRETS", "").split(",")
"""Docker image pull secrets which allow the usage of private images."""

TRAEFIK_ENABLED = strtobool(os.getenv("TRAEFIK_ENABLED", "true"))
"""Whether traefik is enabled in the cluster or not"""

DASK_ENABLED = strtobool(os.getenv("DASK_ENABLED", "true"))
"""Whether dask is enabled in the cluster or not"""

VOMSPROXY_CONTAINER_IMAGE = os.getenv(
"VOMSPROXY_CONTAINER_IMAGE", "docker.io/reanahub/reana-auth-vomsproxy:1.2.0"
)
Expand Down
9 changes: 5 additions & 4 deletions reana_workflow_controller/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
REANA_GITLAB_URL,
REANA_HOSTNAME,
REANA_JOB_STATUS_CONSUMER_PREFETCH_COUNT,
TRAEFIK_ENABLED,
)
from reana_workflow_controller.errors import REANAWorkflowControllerError
from reana_workflow_controller.k8s import delete_dask_dashboard_ingress
Expand Down Expand Up @@ -332,7 +333,7 @@ def _delete_dask_cluster(workflow: Workflow) -> None:
namespace="default",
name=f"dask-autoscaler-reana-run-dask-{workflow.id_}",
)

delete_dask_dashboard_ingress(
f"dask-dashboard-ingress-reana-run-dask-{workflow.id_}", workflow.id_
)
if TRAEFIK_ENABLED:
delete_dask_dashboard_ingress(

Check warning on line 337 in reana_workflow_controller/consumer.py

View check run for this annotation

Codecov / codecov/patch

reana_workflow_controller/consumer.py#L336-L337

Added lines #L336 - L337 were not covered by tests
f"dask-dashboard-ingress-reana-run-dask-{workflow.id_}", workflow.id_
)
4 changes: 3 additions & 1 deletion reana_workflow_controller/dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
get_reana_shared_volume,
)

from reana_workflow_controller.config import TRAEFIK_ENABLED
from reana_workflow_controller.k8s import create_dask_dashboard_ingress


Expand Down Expand Up @@ -96,7 +97,8 @@ def create_dask_resources(self):
self._prepare_cluster()
self._create_dask_cluster()
self._create_dask_autoscaler()
create_dask_dashboard_ingress(self.cluster_name, self.workflow_id)
if TRAEFIK_ENABLED:
create_dask_dashboard_ingress(self.cluster_name, self.workflow_id)

def _prepare_cluster(self):
"""Prepare Dask cluster body by adding necessary image-pull secrets, volumes, volume mounts, init containers and sidecar containers."""
Expand Down
15 changes: 11 additions & 4 deletions reana_workflow_controller/workflow_run_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@
WORKFLOW_ENGINE_SERIAL_ENV_VARS,
WORKFLOW_ENGINE_SNAKEMAKE_ENV_VARS,
WORKFLOW_ENGINE_YADAGE_ENV_VARS,
DASK_ENABLED,
)


Expand Down Expand Up @@ -371,13 +372,14 @@ def start_batch_workflow_run(
)

try:
# Create PVC needed for CVMFS repos
if self.retrieve_required_cvmfs_repos():
create_cvmfs_persistent_volume_claim()

# Create the dask cluster and required resources

if requires_dask(self.workflow):
if not DASK_ENABLED:
raise RuntimeError(

Check warning on line 379 in reana_workflow_controller/workflow_run_manager.py

View check run for this annotation

Codecov / codecov/patch

reana_workflow_controller/workflow_run_manager.py#L378-L379

Added lines #L378 - L379 were not covered by tests
"Dask workflows are not supported in this cluster"
)

DaskResourceManager(

Check warning on line 383 in reana_workflow_controller/workflow_run_manager.py

View check run for this annotation

Codecov / codecov/patch

reana_workflow_controller/workflow_run_manager.py#L383

Added line #L383 was not covered by tests
cluster_name=f"reana-run-dask-{self.workflow.id_}",
workflow_spec=self.workflow.reana_specification["workflow"],
Expand All @@ -388,6 +390,11 @@ def start_batch_workflow_run(
current_k8s_batchv1_api_client.create_namespaced_job(
namespace=REANA_RUNTIME_KUBERNETES_NAMESPACE, body=job
)

# Create PVC needed for CVMFS repos
if self.retrieve_required_cvmfs_repos():
create_cvmfs_persistent_volume_claim()

Check warning on line 396 in reana_workflow_controller/workflow_run_manager.py

View check run for this annotation

Codecov / codecov/patch

reana_workflow_controller/workflow_run_manager.py#L396

Added line #L396 was not covered by tests

except ApiException as e:
msg = "Workflow engine/job controller pod " "creation failed {}".format(e)
logging.error(msg, exc_info=True)
Expand Down

0 comments on commit f48ffe5

Please sign in to comment.