Skip to content

Commit

Permalink
intermediate commit
Browse files Browse the repository at this point in the history
  • Loading branch information
Alputer committed Aug 21, 2024
1 parent ba50def commit 80fad13
Show file tree
Hide file tree
Showing 6 changed files with 603 additions and 0 deletions.
34 changes: 34 additions & 0 deletions reana_workflow_controller/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,40 @@ def _env_vars_dict_to_k8s_list(env_vars):
IMAGE_PULL_SECRETS = os.getenv("IMAGE_PULL_SECRETS", "").split(",")
"""Docker image pull secrets which allow the usage of private images."""

VOMSPROXY_CONTAINER_IMAGE = os.getenv(
"VOMSPROXY_CONTAINER_IMAGE", "docker.io/reanahub/reana-auth-vomsproxy:1.2.0"
)
"""Default docker image of VOMSPROXY sidecar container."""

VOMSPROXY_CONTAINER_NAME = "voms-proxy"
"""Name of VOMSPROXY sidecar container."""

VOMSPROXY_CERT_CACHE_LOCATION = "/vomsproxy_cache/"
"""Directory of voms-proxy certificate cache.
This directory is shared between job & VOMSPROXY container."""

VOMSPROXY_CERT_CACHE_FILENAME = "x509up_proxy"
"""Name of the voms-proxy certificate cache file."""

RUCIO_CONTAINER_IMAGE = os.getenv(
"RUCIO_CONTAINER_IMAGE", "docker.io/reanahub/reana-auth-rucio:1.1.1"
)
"""Default docker image of RUCIO sidecar container."""

RUCIO_CONTAINER_NAME = "reana-auth-rucio"
"""Name of RUCIO sidecar container."""

RUCIO_CACHE_LOCATION = "/rucio_cache/"
"""Directory of Rucio cache.
This directory is shared between job & Rucio container."""

RUCIO_CFG_CACHE_FILENAME = "rucio.cfg"
"""Name of the RUCIO configuration cache file."""

RUCIO_CERN_BUNDLE_CACHE_FILENAME = "CERN-bundle.pem"
"""Name of the CERN Bundle cache file."""

ALIVE_STATUSES = [
RunStatus.created,
Expand Down
23 changes: 23 additions & 0 deletions reana_workflow_controller/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from reana_commons.k8s.api_client import (
current_k8s_batchv1_api_client,
current_k8s_corev1_api_client,
current_k8s_custom_objects_api_client,
)
from reana_commons.k8s.secrets import REANAUserSecretsStore
from reana_commons.utils import (
Expand Down Expand Up @@ -142,6 +143,7 @@ def on_message(self, body, message):

def _update_workflow_status(workflow, status, logs):
"""Update workflow status in DB."""

if workflow.status != status:
Workflow.update_workflow_status(Session, workflow.id_, status, logs, None)
if workflow.git_ref:
Expand All @@ -164,6 +166,8 @@ def _update_workflow_status(workflow, status, logs):
if RunStatus.should_cleanup_job(status):
try:
_delete_workflow_job(workflow)
if workflow.requires_dask():
_delete_dask_cluster(workflow)
except ApiException as e:
logging.error(
f"Could not clean up workflow job for workflow {workflow.id_}. "
Expand Down Expand Up @@ -291,3 +295,22 @@ def _get_workflow_engine_pod_logs(workflow: Workflow) -> str:
# There might not be any pod returned by `list_namespaced_pod`, for example
# when a workflow fails to be scheduled
return ""


def _delete_dask_cluster(workflow: Workflow) -> None:
"""Delete the Dask cluster resources."""
current_k8s_custom_objects_api_client.delete_namespaced_custom_object(
group="kubernetes.dask.org",
version="v1",
plural="daskclusters",
namespace="default",
name=f"reana-run-dask-{workflow.id_}",
)

current_k8s_custom_objects_api_client.delete_namespaced_custom_object(
group="kubernetes.dask.org",
version="v1",
plural="daskautoscalers",
namespace="default",
name=f"dask-autoscaler-reana-run-dask-{workflow.id_}",
)
Loading

0 comments on commit 80fad13

Please sign in to comment.