Skip to content

Commit

Permalink
feat(dask): integrate dask into reana (reanahub#600)
Browse files Browse the repository at this point in the history
  • Loading branch information
Alputer committed Aug 28, 2024
1 parent ba50def commit 6171e8e
Show file tree
Hide file tree
Showing 7 changed files with 705 additions and 1 deletion.
34 changes: 34 additions & 0 deletions reana_workflow_controller/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,40 @@ def _env_vars_dict_to_k8s_list(env_vars):
IMAGE_PULL_SECRETS = os.getenv("IMAGE_PULL_SECRETS", "").split(",")
"""Docker image pull secrets which allow the usage of private images."""

VOMSPROXY_CONTAINER_IMAGE = os.getenv(
"VOMSPROXY_CONTAINER_IMAGE", "docker.io/reanahub/reana-auth-vomsproxy:1.2.0"
)
"""Default docker image of VOMSPROXY sidecar container."""

VOMSPROXY_CONTAINER_NAME = "voms-proxy"
"""Name of VOMSPROXY sidecar container."""

VOMSPROXY_CERT_CACHE_LOCATION = "/vomsproxy_cache/"
"""Directory of voms-proxy certificate cache.
This directory is shared between job & VOMSPROXY container."""

VOMSPROXY_CERT_CACHE_FILENAME = "x509up_proxy"
"""Name of the voms-proxy certificate cache file."""

RUCIO_CONTAINER_IMAGE = os.getenv(
"RUCIO_CONTAINER_IMAGE", "docker.io/reanahub/reana-auth-rucio:1.1.1"
)
"""Default docker image of RUCIO sidecar container."""

RUCIO_CONTAINER_NAME = "reana-auth-rucio"
"""Name of RUCIO sidecar container."""

RUCIO_CACHE_LOCATION = "/rucio_cache/"
"""Directory of Rucio cache.
This directory is shared between job & Rucio container."""

RUCIO_CFG_CACHE_FILENAME = "rucio.cfg"
"""Name of the RUCIO configuration cache file."""

RUCIO_CERN_BUNDLE_CACHE_FILENAME = "CERN-bundle.pem"
"""Name of the CERN Bundle cache file."""

ALIVE_STATUSES = [
RunStatus.created,
Expand Down
31 changes: 30 additions & 1 deletion reana_workflow_controller/consumer.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# -*- coding: utf-8 -*-
#
# This file is part of REANA.
# Copyright (C) 2018, 2019, 2020, 2021, 2022, 2023 CERN.
# Copyright (C) 2018, 2019, 2020, 2021, 2022, 2023, 2024 CERN.
#
# REANA is free software; you can redistribute it and/or modify it
# under the terms of the MIT License; see LICENSE file for more details.
Expand All @@ -22,6 +22,8 @@
from reana_commons.k8s.api_client import (
current_k8s_batchv1_api_client,
current_k8s_corev1_api_client,
current_k8s_custom_objects_api_client,
current_k8s_networking_api_client,
)
from reana_commons.k8s.secrets import REANAUserSecretsStore
from reana_commons.utils import (
Expand All @@ -43,6 +45,7 @@
REANA_JOB_STATUS_CONSUMER_PREFETCH_COUNT,
)
from reana_workflow_controller.errors import REANAWorkflowControllerError
from reana_workflow_controller.k8s import delete_dask_dashboard_ingress

try:
from urllib import parse as urlparse
Expand Down Expand Up @@ -142,6 +145,7 @@ def on_message(self, body, message):

def _update_workflow_status(workflow, status, logs):
"""Update workflow status in DB."""

if workflow.status != status:
Workflow.update_workflow_status(Session, workflow.id_, status, logs, None)
if workflow.git_ref:
Expand All @@ -164,6 +168,8 @@ def _update_workflow_status(workflow, status, logs):
if RunStatus.should_cleanup_job(status):
try:
_delete_workflow_job(workflow)
if workflow.requires_dask():
_delete_dask_cluster(workflow)
except ApiException as e:
logging.error(
f"Could not clean up workflow job for workflow {workflow.id_}. "
Expand Down Expand Up @@ -291,3 +297,26 @@ def _get_workflow_engine_pod_logs(workflow: Workflow) -> str:
# There might not be any pod returned by `list_namespaced_pod`, for example
# when a workflow fails to be scheduled
return ""


def _delete_dask_cluster(workflow: Workflow) -> None:
"""Delete the Dask cluster resources."""
current_k8s_custom_objects_api_client.delete_namespaced_custom_object(
group="kubernetes.dask.org",
version="v1",
plural="daskclusters",
namespace="default",
name=f"reana-run-dask-{workflow.id_}",
)

current_k8s_custom_objects_api_client.delete_namespaced_custom_object(
group="kubernetes.dask.org",
version="v1",
plural="daskautoscalers",
namespace="default",
name=f"dask-autoscaler-reana-run-dask-{workflow.id_}",
)

delete_dask_dashboard_ingress(
f"dask-dashboard-ingress-reana-run-dask-{workflow.id_}", workflow.id_
)
Loading

0 comments on commit 6171e8e

Please sign in to comment.