From 9eaf650c1eb8d0eea2083d6a7a3db14e71a7fcd7 Mon Sep 17 00:00:00 2001 From: Marco Donadoni Date: Mon, 6 May 2024 15:47:15 +0200 Subject: [PATCH] perf(secrets): avoid fetching k8s secrets multiple times (#451) Cache user secrets to avoid fetching them multiple times when creating many jobs. Closes reanahub/reana-commons#455 --- reana_job_controller/config.py | 3 ++ .../kubernetes_job_manager.py | 29 +++++++++++++------ reana_job_controller/rest.py | 23 +++++++++++++-- 3 files changed, 43 insertions(+), 12 deletions(-) diff --git a/reana_job_controller/config.py b/reana_job_controller/config.py index e1bc02bb..b9be74c8 100644 --- a/reana_job_controller/config.py +++ b/reana_job_controller/config.py @@ -153,3 +153,6 @@ SLURM_SSH_AUTH_TIMEOUT = float(os.getenv("SLURM_SSH_AUTH_TIMEOUT", "60")) """Seconds to wait for SLURM SSH authentication response.""" + +REANA_USER_ID = os.getenv("REANA_USER_ID") +"""User UUID of the owner of the workflow.""" diff --git a/reana_job_controller/kubernetes_job_manager.py b/reana_job_controller/kubernetes_job_manager.py index 87f69c69..8f1783bd 100644 --- a/reana_job_controller/kubernetes_job_manager.py +++ b/reana_job_controller/kubernetes_job_manager.py @@ -39,7 +39,7 @@ current_k8s_corev1_api_client, ) from reana_commons.k8s.kerberos import get_kerberos_k8s_config -from reana_commons.k8s.secrets import REANAUserSecretsStore +from reana_commons.k8s.secrets import UserSecretsStore, UserSecrets from reana_commons.k8s.volumes import ( get_k8s_cvmfs_volumes, get_reana_shared_volume, @@ -51,6 +51,7 @@ from reana_job_controller.config import ( REANA_KUBERNETES_JOBS_MEMORY_LIMIT, REANA_KUBERNETES_JOBS_MAX_USER_MEMORY_LIMIT, + REANA_USER_ID, ) from reana_job_controller.errors import ComputingBackendSubmissionError from reana_job_controller.job_manager import JobManager @@ -81,6 +82,7 @@ def __init__( voms_proxy=False, rucio=False, kubernetes_job_timeout: Optional[int] = None, + secrets: Optional[UserSecrets] = None, **kwargs, ): """Instantiate kubernetes job manager. @@ -117,6 +119,8 @@ def __init__( :param rucio: Decides if a rucio environment should be provided for job. :type rucio: bool + :param secrets: User secrets, if none they will be fetched from k8s. + :type secrets: Optional[UserSecrets] """ super(KubernetesJobManager, self).__init__( docker_img=docker_img, @@ -127,6 +131,7 @@ def __init__( workflow_workspace=workflow_workspace, job_name=job_name, ) + self.compute_backend = "Kubernetes" self.cvmfs_mounts = cvmfs_mounts self.shared_file_system = shared_file_system @@ -137,6 +142,14 @@ def __init__( self.set_memory_limit(kubernetes_memory_limit) self.workflow_uuid = workflow_uuid self.kubernetes_job_timeout = kubernetes_job_timeout + self._secrets: Optional[UserSecrets] = secrets + + @property + def secrets(self): + """Get cached secrets if present, otherwise fetch them from k8s.""" + if self._secrets is None: + self._secrets = UserSecretsStore.fetch(REANA_USER_ID) + return self._secrets @JobManager.execution_hook def execute(self): @@ -179,15 +192,13 @@ def execute(self): }, }, } - user_id = os.getenv("REANA_USER_ID") - secrets_store = REANAUserSecretsStore(user_id) - secret_env_vars = secrets_store.get_env_secrets_as_k8s_spec() + secret_env_vars = self.secrets.get_env_secrets_as_k8s_spec() job_spec = self.job["spec"]["template"]["spec"] job_spec["containers"][0]["env"].extend(secret_env_vars) - job_spec["volumes"].append(secrets_store.get_file_secrets_volume_as_k8s_specs()) + job_spec["volumes"].append(self.secrets.get_file_secrets_volume_as_k8s_specs()) - secrets_volume_mount = secrets_store.get_secrets_volume_mount_as_k8s_spec() + secrets_volume_mount = self.secrets.get_secrets_volume_mount_as_k8s_spec() job_spec["containers"][0]["volumeMounts"].append(secrets_volume_mount) if self.env_vars: @@ -215,7 +226,7 @@ def execute(self): ) if self.kerberos: - self._add_krb5_containers(secrets_store) + self._add_krb5_containers(self.secrets) if self.voms_proxy: self._add_voms_proxy_init_container(secrets_volume_mount, secret_env_vars) @@ -444,10 +455,10 @@ def add_volumes(self, volumes): ].append(volume_mount) self.job["spec"]["template"]["spec"]["volumes"].append(volume) - def _add_krb5_containers(self, secrets_store): + def _add_krb5_containers(self, secrets): """Add krb5 init and renew containers for a job.""" krb5_config = get_kerberos_k8s_config( - secrets_store, + secrets, kubernetes_uid=self.kubernetes_uid, ) diff --git a/reana_job_controller/rest.py b/reana_job_controller/rest.py index 763cfad7..af5cf146 100644 --- a/reana_job_controller/rest.py +++ b/reana_job_controller/rest.py @@ -21,7 +21,7 @@ ) from reana_db.models import JobStatus - +from reana_commons.k8s.secrets import UserSecrets, UserSecretsStore from reana_job_controller.errors import ComputingBackendSubmissionError from reana_job_controller.job_db import ( @@ -46,6 +46,18 @@ job_schema = Job() +_SECRETS_CACHE = None +"""Cache for user secrets.""" + + +def get_cached_user_secrets() -> UserSecrets: + """Return cached user secrets.""" + global _SECRETS_CACHE + if _SECRETS_CACHE is None: + _SECRETS_CACHE = UserSecretsStore.fetch(config.REANA_USER_ID) + return _SECRETS_CACHE + + class JobCreationCondition: """Mechanism used to synchronize the creation of jobs. @@ -269,15 +281,20 @@ def create_job(): # noqa logging.error(msg, exc_info=True) update_workflow_logs(job_request["workflow_uuid"], msg) return jsonify({"job": msg}), 500 + with current_app.app_context(): job_manager_cls = current_app.config["COMPUTE_BACKENDS"][compute_backend]() try: - job_obj = job_manager_cls(**job_request) + job_obj = job_manager_cls( + **job_request, + # we pass the secrets from the local cache in order to avoid many calls + # to the k8s API when many jobs are executed at the same time + secrets=get_cached_user_secrets(), + ) except REANAKubernetesMemoryLimitExceeded as e: return jsonify({"message": e.message}), 403 except REANAKubernetesWrongMemoryFormat as e: return jsonify({"message": e.message}), 400 - if not job_creation_condition.start_creation(): return jsonify({"message": "Cannot create new jobs, shutting down"}), 400