Skip to content

Commit

Permalink
feat(dask): integrate dask into reana (#600)
Browse files Browse the repository at this point in the history
  • Loading branch information
Alputer committed Sep 5, 2024
1 parent 680e288 commit 0e4ff9e
Show file tree
Hide file tree
Showing 10 changed files with 1,254 additions and 1 deletion.
1 change: 1 addition & 0 deletions MANIFEST.in
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ recursive-include docs *.txt
recursive-include reana_workflow_controller *.html
recursive-include reana_workflow_controller *.py
recursive-include reana_workflow_controller/templates *.template
recursive-include reana_workflow_controller/templates *.yaml
recursive-include scripts *.py
recursive-include tests *.py
recursive-include tests *.finished
Expand Down
34 changes: 34 additions & 0 deletions reana_workflow_controller/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,40 @@ def _parse_interactive_sessions_environments(env_var):
IMAGE_PULL_SECRETS = os.getenv("IMAGE_PULL_SECRETS", "").split(",")
"""Docker image pull secrets which allow the usage of private images."""

VOMSPROXY_CONTAINER_IMAGE = os.getenv(
"VOMSPROXY_CONTAINER_IMAGE", "docker.io/reanahub/reana-auth-vomsproxy:1.2.0"
)
"""Default docker image of VOMSPROXY sidecar container."""

VOMSPROXY_CONTAINER_NAME = "voms-proxy"
"""Name of VOMSPROXY sidecar container."""

VOMSPROXY_CERT_CACHE_LOCATION = "/vomsproxy_cache/"
"""Directory of voms-proxy certificate cache.
This directory is shared between job & VOMSPROXY container."""

VOMSPROXY_CERT_CACHE_FILENAME = "x509up_proxy"
"""Name of the voms-proxy certificate cache file."""

RUCIO_CONTAINER_IMAGE = os.getenv(
"RUCIO_CONTAINER_IMAGE", "docker.io/reanahub/reana-auth-rucio:1.1.1"
)
"""Default docker image of RUCIO sidecar container."""

RUCIO_CONTAINER_NAME = "reana-auth-rucio"
"""Name of RUCIO sidecar container."""

RUCIO_CACHE_LOCATION = "/rucio_cache/"
"""Directory of Rucio cache.
This directory is shared between job & Rucio container."""

RUCIO_CFG_CACHE_FILENAME = "rucio.cfg"
"""Name of the RUCIO configuration cache file."""

RUCIO_CERN_BUNDLE_CACHE_FILENAME = "CERN-bundle.pem"
"""Name of the CERN Bundle cache file."""

ALIVE_STATUSES = [
RunStatus.created,
Expand Down
32 changes: 31 additions & 1 deletion reana_workflow_controller/consumer.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# -*- coding: utf-8 -*-
#
# This file is part of REANA.
# Copyright (C) 2018, 2019, 2020, 2021, 2022, 2023 CERN.
# Copyright (C) 2018, 2019, 2020, 2021, 2022, 2023, 2024 CERN.
#
# REANA is free software; you can redistribute it and/or modify it
# under the terms of the MIT License; see LICENSE file for more details.
Expand All @@ -22,6 +22,8 @@
from reana_commons.k8s.api_client import (
current_k8s_batchv1_api_client,
current_k8s_corev1_api_client,
current_k8s_custom_objects_api_client,
current_k8s_networking_api_client,
)
from reana_commons.k8s.secrets import UserSecretsStore
from reana_commons.utils import (
Expand All @@ -43,6 +45,9 @@
REANA_JOB_STATUS_CONSUMER_PREFETCH_COUNT,
)
from reana_workflow_controller.errors import REANAWorkflowControllerError
from reana_workflow_controller.k8s import delete_dask_dashboard_ingress

from reana_workflow_controller.dask import requires_dask

try:
from urllib import parse as urlparse
Expand Down Expand Up @@ -164,6 +169,8 @@ def _update_workflow_status(workflow, status, logs):
if RunStatus.should_cleanup_job(status):
try:
_delete_workflow_job(workflow)
if requires_dask(workflow):
_delete_dask_cluster(workflow)

Check warning on line 173 in reana_workflow_controller/consumer.py

View check run for this annotation

Codecov / codecov/patch

reana_workflow_controller/consumer.py#L173

Added line #L173 was not covered by tests
except ApiException as e:
logging.error(
f"Could not clean up workflow job for workflow {workflow.id_}. "
Expand Down Expand Up @@ -305,3 +312,26 @@ def _get_workflow_engine_pod_logs(workflow: Workflow) -> str:
# There might not be any pod returned by `list_namespaced_pod`, for example
# when a workflow fails to be scheduled
return ""


def _delete_dask_cluster(workflow: Workflow) -> None:
"""Delete the Dask cluster resources."""
current_k8s_custom_objects_api_client.delete_namespaced_custom_object(

Check warning on line 319 in reana_workflow_controller/consumer.py

View check run for this annotation

Codecov / codecov/patch

reana_workflow_controller/consumer.py#L319

Added line #L319 was not covered by tests
group="kubernetes.dask.org",
version="v1",
plural="daskclusters",
namespace="default",
name=f"reana-run-dask-{workflow.id_}",
)

current_k8s_custom_objects_api_client.delete_namespaced_custom_object(

Check warning on line 327 in reana_workflow_controller/consumer.py

View check run for this annotation

Codecov / codecov/patch

reana_workflow_controller/consumer.py#L327

Added line #L327 was not covered by tests
group="kubernetes.dask.org",
version="v1",
plural="daskautoscalers",
namespace="default",
name=f"dask-autoscaler-reana-run-dask-{workflow.id_}",
)

delete_dask_dashboard_ingress(

Check warning on line 335 in reana_workflow_controller/consumer.py

View check run for this annotation

Codecov / codecov/patch

reana_workflow_controller/consumer.py#L335

Added line #L335 was not covered by tests
f"dask-dashboard-ingress-reana-run-dask-{workflow.id_}", workflow.id_
)
Loading

0 comments on commit 0e4ff9e

Please sign in to comment.