From d561a8127de587f0da2bbf8792509b4b2b19984e Mon Sep 17 00:00:00 2001 From: Meir Tseitlin Date: Tue, 30 Jan 2024 11:20:31 -0600 Subject: [PATCH 01/16] Auth update --- src/k8s_job_scheduler/job_manager.py | 25 +++++++++++++++---------- tests/conftest.py | 7 ++++--- 2 files changed, 19 insertions(+), 13 deletions(-) diff --git a/src/k8s_job_scheduler/job_manager.py b/src/k8s_job_scheduler/job_manager.py index da384de..8d561e8 100644 --- a/src/k8s_job_scheduler/job_manager.py +++ b/src/k8s_job_scheduler/job_manager.py @@ -2,6 +2,7 @@ import logging from kubernetes import client, config +from kubernetes.client.rest import ApiException __author__ = "Meir Tseitlin" __copyright__ = "Imubit" @@ -11,6 +12,8 @@ config.load_kube_config() +K8S_DEFAULT_NAMESPACE = "py-k8s-job-scheduler" + K8S_STATUS_MAP = { "ready": "READY", "active": "ACTIVE", @@ -24,26 +27,28 @@ class JobManager: DELETE_PROPAGATION_POLICY = "Foreground" - def __init__(self, namespace, docker_image, env, cluster_conf=None): + def __init__( + self, docker_image, env=None, namespace=K8S_DEFAULT_NAMESPACE, cluster_conf=None + ): self._namespace = namespace self._docker_image = docker_image self._env = env or {} # Init Kubernetes - self._core_api = client.CoreV1Api(cluster_conf) - self._batch_api = client.BatchV1Api(cluster_conf) + self._cluster_conf = cluster_conf or config.load_kube_config() + + self._core_api = client.CoreV1Api(self._cluster_conf) + self._batch_api = client.BatchV1Api(self._cluster_conf) def init(self): # Create namespace if not exists - namespaces = self._core_api.list_namespace() - all_namespaces = [] - for ns in namespaces.items: - all_namespaces.append(ns.metadata.name) + try: + self._core_api.read_namespace_status(self._namespace) + except ApiException as e: + if e.status != 404: + raise e - if self._namespace in all_namespaces: - log.info(f"Namespace {self._namespace} already exists. Reusing.") - else: namespace_metadata = client.V1ObjectMeta(name=self._namespace) self._core_api.create_namespace( client.V1Namespace(metadata=namespace_metadata) diff --git a/tests/conftest.py b/tests/conftest.py index d9b84f5..10de968 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -12,9 +12,8 @@ import psycopg2 import pytest -from k8s_job_scheduler.job_manager import JobManager +from k8s_job_scheduler.job_manager import K8S_DEFAULT_NAMESPACE, JobManager -K8S_NAMESPACE = "k8s-job-scheduler" DOCKER_IMAGE_PYTHON = "python:3.11.7-slim-bullseye" DOCKER_IMAGE_POSTGRES = "postgres:15" POSTGRES_USER = "postgres" @@ -31,7 +30,9 @@ def jobman(request, docker_image=DOCKER_IMAGE_PYTHON, env=None): docker_image = request.param.get("docker_image", docker_image) env = request.param.get("env", env) - jobman = JobManager(namespace=K8S_NAMESPACE, docker_image=docker_image, env=env) + jobman = JobManager( + docker_image=docker_image, env=env, namespace=K8S_DEFAULT_NAMESPACE + ) jobman.init() # Clean old pods From 639f649e504944dc0dcdc3530833f9eab64f638d Mon Sep 17 00:00:00 2001 From: Meir Tseitlin Date: Mon, 5 Feb 2024 16:55:08 -0600 Subject: [PATCH 02/16] add scheduled_job_status --- src/k8s_job_scheduler/job_manager.py | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/src/k8s_job_scheduler/job_manager.py b/src/k8s_job_scheduler/job_manager.py index 8d561e8..455764a 100644 --- a/src/k8s_job_scheduler/job_manager.py +++ b/src/k8s_job_scheduler/job_manager.py @@ -179,11 +179,21 @@ def delete_job(self, job_name): return "succeeded" in status and status["succeeded"] > 0 - def list_scheduled_jobs(self): + def list_scheduled_jobs(self, include_details=False): ret = self._batch_api.list_namespaced_cron_job(namespace=self._namespace) + if include_details: + return ret.items + return [i.metadata.name for i in ret.items] + def scheduled_job_status(self, job_name): + api_response = self._batch_api.read_namespaced_cron_job_status( + job_name, self._namespace + ) + + return api_response + def create_scheduled_job(self, schedule, cmd, *args, **kwargs): job_name = f"job-kjs-{JobManager._k8s_fqn(cmd)}" pod_name = f"pod-kjs-{JobManager._k8s_fqn(cmd)}" From 0a6a8d894776dd251c24a5b400d9b69637f7b363 Mon Sep 17 00:00:00 2001 From: Meir Tseitlin Date: Mon, 12 Feb 2024 16:22:54 -0600 Subject: [PATCH 03/16] temp --- README.md | 14 ++-- src/k8s_job_scheduler/job_executor.py | 34 ++++++++ src/k8s_job_scheduler/job_func_def.py | 80 +++++++++++++++++++ src/k8s_job_scheduler/job_manager.py | 109 +++++++++++++++++++++----- tests/test_job_manager.py | 38 ++++++--- 5 files changed, 237 insertions(+), 38 deletions(-) create mode 100644 src/k8s_job_scheduler/job_executor.py create mode 100644 src/k8s_job_scheduler/job_func_def.py diff --git a/README.md b/README.md index 5f6d89d..ff50ed4 100644 --- a/README.md +++ b/README.md @@ -14,14 +14,12 @@ # k8s-job-scheduler -> Add a short description here! +A package for managing Kubernetes jobs and cron jobs from Python. It allows running CLI or Python code using native Kubernetes job engine. -A longer description of your project goes here... +> This package is a continuous effort started by [Roemer Claasen](https://gitlab.com/roemer) in his [kubernetes-job](https://gitlab.com/roemer/kubernetes-job) work. +## Installation - - -## Note - -This project has been set up using PyScaffold 4.5. For details and usage -information on PyScaffold see https://pyscaffold.org/. +```python +pip install k8s-job-scheduler +``` diff --git a/src/k8s_job_scheduler/job_executor.py b/src/k8s_job_scheduler/job_executor.py new file mode 100644 index 0000000..338a75a --- /dev/null +++ b/src/k8s_job_scheduler/job_executor.py @@ -0,0 +1,34 @@ +import base64 +import logging +import os +import pickle +import sys +import zlib + +logging.basicConfig(level=logging.DEBUG, stream=sys.stdout) + +logger = logging.getLogger(__name__) + +K8S_ENV_VAR_NAME = "K8S_JOB_FUNC" + + +def execute(): + try: + func_def = os.getenv(K8S_ENV_VAR_NAME) + if not func_def: + print( + f"Environment var '{K8S_ENV_VAR_NAME}' is not set, nothing to execute." + ) + sys.exit(-1) + + current_job = pickle.loads( + zlib.decompress(base64.urlsafe_b64decode(func_def.encode())) + ) + return current_job.execute() + except Exception as e: + logger.fatal(e) + raise + + +if __name__ == "__main__": + execute() diff --git a/src/k8s_job_scheduler/job_func_def.py b/src/k8s_job_scheduler/job_func_def.py new file mode 100644 index 0000000..0aa89e5 --- /dev/null +++ b/src/k8s_job_scheduler/job_func_def.py @@ -0,0 +1,80 @@ +import base64 +import datetime +import logging +import pickle +import zlib + +logger = logging.getLogger(__name__) + + +class JobMeta: + """Helper class to hold job meta information""" + + def __init__(self, name: str, dt_scheduled: datetime, host: str): + self.name = name + self.dt_scheduled = dt_scheduled + self.host = host + + name: str = "[JOB-NAME]" + """Unique job name""" + + dt_scheduled: datetime = datetime.datetime.min + """Job scheduled datetime""" + + host: str = "[HOST]" + """Host responsible for spawning the job""" + + +class JobFuncDef: + """Helper class to hold the job function definition + + :param func: Pointer to the job function + :param args: Args for the job function + :param kwargs: Kwargs for the job function + :param meta: Metadata for the job + """ + + func = None + """Pointer to the job function""" + + args = None + """Args for the job function""" + + kwargs = None + """Kwargs for the job function""" + + meta: JobMeta = None + """Metadata for the job""" + + def __init__(self, func, args=None, kwargs=None, meta: JobMeta = None): + """ + Initialize the job function definition + + :param func: Pointer to the job function + :param args: Args for the job function + :param kwargs: Kwargs for the job function + :param meta: Metadata for the job + """ + self.func = func + self.args = args or [] + self.kwargs = kwargs or {} + self.meta = meta or JobMeta() + + def dump(self) -> str: + """Dump the job function definition to a base64 string""" + return base64.urlsafe_b64encode(zlib.compress(pickle.dumps(self))).decode() + + def execute(self): + """Execute the job function""" + logger.info( + f"=== Starting job {self.meta.name}, submitted from {self.meta.host} " + f"at {self.meta.dt_scheduled.isoformat()} ===" + ) + logger.debug(f"Job func: {self.func.__name__}") + + return self.func(*self.args, **self.kwargs) + + @staticmethod + def load(s: str) -> "JobFuncDef": + """Load the job function definition from a base64 string""" + return pickle.loads(zlib.decompress(base64.urlsafe_b64decode(s.encode()))) diff --git a/src/k8s_job_scheduler/job_manager.py b/src/k8s_job_scheduler/job_manager.py index 455764a..f1bb6f8 100644 --- a/src/k8s_job_scheduler/job_manager.py +++ b/src/k8s_job_scheduler/job_manager.py @@ -1,11 +1,14 @@ +import datetime as dt import json import logging +import socket from kubernetes import client, config from kubernetes.client.rest import ApiException +from .job_func_def import JobFuncDef, JobMeta + __author__ = "Meir Tseitlin" -__copyright__ = "Imubit" __license__ = "LGPL-3.0-only" log = logging.getLogger(__name__) @@ -13,6 +16,7 @@ config.load_kube_config() K8S_DEFAULT_NAMESPACE = "py-k8s-job-scheduler" +K8S_ENV_VAR_NAME = "K8S_JOB_FUNC" K8S_STATUS_MAP = { "ready": "READY", @@ -24,6 +28,15 @@ } +def _k8s_fqn(name): + return name.replace("_", "-") + + +def _gen_id(prefix: str, name: str, dt: dt) -> str: + """Generate a job id from the name and the given datetime""" + return f"kjs-{prefix}-{_k8s_fqn(name)}-{dt.strftime('%Y%m%d%H%M%S%f')}" + + class JobManager: DELETE_PROPAGATION_POLICY = "Foreground" @@ -131,24 +144,70 @@ def job_logs(self, job_name): else None ) - @staticmethod - def _k8s_fqn(name): - return name.replace("_", "-") + def create_instant_job(self, func, cmd="python", *args, **kwargs): + dt_scheduled = dt.datetime.utcnow() - def create_instant_job(self, cmd, *args, **kwargs): - job_name = f"job-kjs-{JobManager._k8s_fqn(cmd)}" - pod_name = f"pod-kjs-{JobManager._k8s_fqn(cmd)}" + job_name = _gen_id("job", cmd, dt_scheduled) + pod_name = _gen_id("pod", cmd, dt_scheduled) + labels = {"job_name": job_name, "type": "scheduled_func", "cmd": cmd} - container = self._gen_container_specs(cmd, *args, **kwargs) + if "labels" in kwargs: + labels.update(kwargs["labels"]) + del kwargs["labels"] + + # serialize function call + job_descriptor = JobFuncDef( + func=func, + args=args, + kwargs=kwargs, + meta=JobMeta(job_name, dt_scheduled, socket.gethostname()), + ) + + container = self._gen_container_specs( + cmd, {K8S_ENV_VAR_NAME: job_descriptor.dump()}, *args, **kwargs + ) api_response = self._batch_api.create_namespaced_job( namespace=self._namespace, body=client.V1Job( api_version="batch/v1", kind="Job", - metadata=client.V1ObjectMeta( - name=job_name, labels={"job_name": job_name} + metadata=client.V1ObjectMeta(name=job_name, labels=labels), + spec=client.V1JobSpec( + backoff_limit=0, + template=client.V1JobTemplateSpec( + spec=client.V1PodSpec( + restart_policy="Never", containers=[container] + ), + metadata=client.V1ObjectMeta( + name=pod_name, labels={"pod_name": pod_name} + ), + ), ), + ), + ) + + return api_response.metadata.name + + def create_instant_cli_job(self, cmd, *args, **kwargs): + dt_scheduled = dt.datetime.utcnow() + + job_name = _gen_id("cli-job", cmd, dt_scheduled) + pod_name = _gen_id("pod", cmd, dt_scheduled) + labels = {"job_name": job_name, "type": "scheduled_cli", "cmd": cmd} + + if "labels" in kwargs: + labels.update(kwargs["labels"]) + del kwargs["labels"] + + container = self._gen_container_specs(cmd, {}, *args, **kwargs) + + api_response = self._batch_api.create_namespaced_job( + namespace=self._namespace, + body=client.V1Job( + api_version="batch/v1", + kind="Job", + metadata=client.V1ObjectMeta(name=job_name, labels=labels), spec=client.V1JobSpec( backoff_limit=0, template=client.V1JobTemplateSpec( @@ -194,20 +253,24 @@ def scheduled_job_status(self, job_name): return api_response - def create_scheduled_job(self, schedule, cmd, *args, **kwargs): - job_name = f"job-kjs-{JobManager._k8s_fqn(cmd)}" - pod_name = f"pod-kjs-{JobManager._k8s_fqn(cmd)}" + def create_scheduled_cli_job(self, schedule, cmd, *args, **kwargs): + job_name = _gen_id("cron-job", cmd) + pod_name = _gen_id("pod", cmd) + + labels = {"job_name": job_name, "type": "scheduled_cli", "cmd": cmd} + + if "labels" in kwargs: + labels.update(kwargs["labels"]) + del kwargs["labels"] - container = self._gen_container_specs(cmd, *args, **kwargs) + container = self._gen_container_specs(cmd, {}, *args, **kwargs) api_response = self._batch_api.create_namespaced_cron_job( namespace=self._namespace, body=client.V1CronJob( api_version="batch/v1", kind="CronJob", - metadata=client.V1ObjectMeta( - name=job_name, labels={"job_name": job_name} - ), + metadata=client.V1ObjectMeta(name=job_name, labels=labels), spec=client.V1CronJobSpec( schedule=schedule, job_template=client.V1JobTemplateSpec( @@ -240,9 +303,15 @@ def delete_scheduled_job(self, job_name): return True - def _gen_container_specs(self, cmd, *args, **kwargs): + def _gen_container_specs(self, cmd, system_env, *args, **kwargs): + dt_scheduled = dt.datetime.utcnow() + args_arr = [f"{a}" for a in args] + [f"--{k}={v}" for k, v in kwargs.items()] - container_name = f"cont-kjs-{JobManager._k8s_fqn(cmd)}" + container_name = _gen_id("cont", cmd, dt_scheduled) + + env_var = [client.V1EnvVar(name=k, value=v) for k, v in self._env.items()] + [ + client.V1EnvVar(name=k, value=v) for k, v in system_env.items() + ] # Create container container = client.V1Container( @@ -251,7 +320,7 @@ def _gen_container_specs(self, cmd, *args, **kwargs): image_pull_policy="IfNotPresent", # Always / Never / IfNotPresent command=[cmd], args=args_arr, - env=[client.V1EnvVar(name=k, value=v) for k, v in self._env.items()], + env=env_var, ) logging.info( diff --git a/tests/test_job_manager.py b/tests/test_job_manager.py index 57fdf13..8ea4d31 100644 --- a/tests/test_job_manager.py +++ b/tests/test_job_manager.py @@ -7,8 +7,8 @@ def test_success(jobman): cmd = "python" - job_name = jobman.create_instant_job(cmd, "--help") - assert job_name.startswith(f'job-kjs-{cmd.replace("_", "-")}') + job_name = jobman.create_instant_cli_job(cmd, "--help") + assert job_name.startswith(f'kjs-cli-job-{cmd.replace("_", "-")}') assert jobman.list_jobs() == [job_name] # assert jobman.job_status(job_name).active == 1 @@ -16,7 +16,7 @@ def test_success(jobman): time.sleep(10) pods = jobman.list_pods(job_name=job_name) - assert pods[0].startswith(f'job-kjs-{cmd.replace("_", "-")}') + assert pods[0].startswith(f'kjs-cli-job-{cmd.replace("_", "-")}') assert len(jobman.job_logs(job_name)) > 10 @@ -34,8 +34,8 @@ def test_success(jobman): def test_failed(jobman): cmd = "python" - job_name = jobman.create_instant_job(cmd, "---h") - assert job_name.startswith(f'job-kjs-{cmd.replace("_", "-")}') + job_name = jobman.create_instant_cli_job(cmd, "---h") + assert job_name.startswith(f'kjs-cli-job-{cmd.replace("_", "-")}') assert jobman.list_jobs() == [job_name] time.sleep(10) @@ -44,7 +44,7 @@ def test_failed(jobman): assert details["reason"] == "BackoffLimitExceeded" pods = jobman.list_pods(job_name=job_name) - assert pods[0].startswith(f'job-kjs-{cmd.replace("_", "-")}') + assert pods[0].startswith(f'kjs-cli-job-{cmd.replace("_", "-")}') assert len(jobman.job_logs(job_name)) > 10 @@ -60,8 +60,8 @@ def test_failed(jobman): def test_env(jobman): cmd = "printenv" - job_name = jobman.create_instant_job(cmd, "TEST_VAR") - assert job_name.startswith(f'job-kjs-{cmd.replace("_", "-")}') + job_name = jobman.create_instant_cli_job(cmd, "TEST_VAR") + assert job_name.startswith("kjs-cli-job-printenv-") assert jobman.list_jobs() == [job_name] time.sleep(10) @@ -72,11 +72,29 @@ def test_env(jobman): assert jobman.job_logs(job_name).startswith("hi_there") +def _add(a, b): + result = a + b + return result + + +def test_instant_func_job(jobman): + job_name = jobman.create_instant_job(func=_add, a=3, b=5) + assert job_name.startswith("kjs-job-python-") + assert jobman.list_jobs() == [job_name] + + time.sleep(10) + + status, _ = jobman.job_status(job_name) + + print(jobman.job_logs(job_name)) + # assert status == "SUCCEEDED" + + def test_scheduled_job(jobman): cron = "*/1 * * * *" cmd = "printenv" - job_name = jobman.create_scheduled_job(cron, cmd, "TEST_VAR") + job_name = jobman.create_scheduled_cli_job(cron, cmd, "TEST_VAR") assert jobman.list_scheduled_jobs() == [job_name] time.sleep(10) @@ -99,7 +117,7 @@ def test_scheduled_job(jobman): def test_db_access(jobman, psql): cmd = "psql" - job_name = jobman.create_instant_job( + job_name = jobman.create_instant_cli_job( cmd, username=POSTGRES_USER, host=POSTGRES_HOST ) assert job_name.startswith(f'job-kjs-{cmd.replace("_", "-")}') From 2c848ed7471d2adf064edf4aff73ebaa40c69188 Mon Sep 17 00:00:00 2001 From: Meir Tseitlin Date: Mon, 12 Feb 2024 16:40:36 -0600 Subject: [PATCH 04/16] temp --- src/k8s_job_scheduler/job_executor.py | 5 +++-- src/k8s_job_scheduler/job_func_def.py | 7 ++++--- src/k8s_job_scheduler/job_manager.py | 6 +++++- tests/test_job_manager.py | 3 ++- 4 files changed, 14 insertions(+), 7 deletions(-) diff --git a/src/k8s_job_scheduler/job_executor.py b/src/k8s_job_scheduler/job_executor.py index 338a75a..7827601 100644 --- a/src/k8s_job_scheduler/job_executor.py +++ b/src/k8s_job_scheduler/job_executor.py @@ -1,10 +1,11 @@ import base64 import logging import os -import pickle import sys import zlib +import dill + logging.basicConfig(level=logging.DEBUG, stream=sys.stdout) logger = logging.getLogger(__name__) @@ -21,7 +22,7 @@ def execute(): ) sys.exit(-1) - current_job = pickle.loads( + current_job = dill.loads( zlib.decompress(base64.urlsafe_b64decode(func_def.encode())) ) return current_job.execute() diff --git a/src/k8s_job_scheduler/job_func_def.py b/src/k8s_job_scheduler/job_func_def.py index 0aa89e5..be59697 100644 --- a/src/k8s_job_scheduler/job_func_def.py +++ b/src/k8s_job_scheduler/job_func_def.py @@ -1,9 +1,10 @@ import base64 import datetime import logging -import pickle import zlib +import dill + logger = logging.getLogger(__name__) @@ -62,7 +63,7 @@ def __init__(self, func, args=None, kwargs=None, meta: JobMeta = None): def dump(self) -> str: """Dump the job function definition to a base64 string""" - return base64.urlsafe_b64encode(zlib.compress(pickle.dumps(self))).decode() + return base64.urlsafe_b64encode(zlib.compress(dill.dumps(self))).decode() def execute(self): """Execute the job function""" @@ -77,4 +78,4 @@ def execute(self): @staticmethod def load(s: str) -> "JobFuncDef": """Load the job function definition from a base64 string""" - return pickle.loads(zlib.decompress(base64.urlsafe_b64decode(s.encode()))) + return dill.loads(zlib.decompress(base64.urlsafe_b64decode(s.encode()))) diff --git a/src/k8s_job_scheduler/job_manager.py b/src/k8s_job_scheduler/job_manager.py index f1bb6f8..6bb4ed1 100644 --- a/src/k8s_job_scheduler/job_manager.py +++ b/src/k8s_job_scheduler/job_manager.py @@ -2,6 +2,7 @@ import json import logging import socket +import types from kubernetes import client, config from kubernetes.client.rest import ApiException @@ -157,7 +158,8 @@ def create_instant_job(self, func, cmd="python", *args, **kwargs): # serialize function call job_descriptor = JobFuncDef( - func=func, + # func=func, + func=types.FunctionType(func.__code__, {}), args=args, kwargs=kwargs, meta=JobMeta(job_name, dt_scheduled, socket.gethostname()), @@ -167,6 +169,8 @@ def create_instant_job(self, func, cmd="python", *args, **kwargs): cmd, {K8S_ENV_VAR_NAME: job_descriptor.dump()}, *args, **kwargs ) + print(container) + api_response = self._batch_api.create_namespaced_job( namespace=self._namespace, body=client.V1Job( diff --git a/tests/test_job_manager.py b/tests/test_job_manager.py index 8ea4d31..624ae37 100644 --- a/tests/test_job_manager.py +++ b/tests/test_job_manager.py @@ -74,6 +74,7 @@ def test_env(jobman): def _add(a, b): result = a + b + print(result) return result @@ -86,7 +87,7 @@ def test_instant_func_job(jobman): status, _ = jobman.job_status(job_name) - print(jobman.job_logs(job_name)) + # print(jobman.job_logs(job_name)) # assert status == "SUCCEEDED" From b2be14272200ed8ccc0dfaeec43ac283b32e27e6 Mon Sep 17 00:00:00 2001 From: Meir Tseitlin Date: Tue, 13 Feb 2024 10:15:06 -0600 Subject: [PATCH 05/16] temp --- setup.cfg | 1 + src/k8s_job_scheduler/job_executor.py | 6 +++--- src/k8s_job_scheduler/job_manager.py | 30 ++++++++++++++++++++------- tests/test_job_manager.py | 7 ++++--- 4 files changed, 31 insertions(+), 13 deletions(-) diff --git a/setup.cfg b/setup.cfg index ab59f79..2f33f28 100644 --- a/setup.cfg +++ b/setup.cfg @@ -50,6 +50,7 @@ package_dir = install_requires = importlib-metadata; python_version<"3.8" kubernetes + dill [options.packages.find] where = src diff --git a/src/k8s_job_scheduler/job_executor.py b/src/k8s_job_scheduler/job_executor.py index 7827601..9a8964c 100644 --- a/src/k8s_job_scheduler/job_executor.py +++ b/src/k8s_job_scheduler/job_executor.py @@ -10,15 +10,15 @@ logger = logging.getLogger(__name__) -K8S_ENV_VAR_NAME = "K8S_JOB_FUNC" +JOB_PYTHON_FUNC_ENV_VAR = "JOB_PYTHON_FUNC" def execute(): try: - func_def = os.getenv(K8S_ENV_VAR_NAME) + func_def = os.getenv(JOB_PYTHON_FUNC_ENV_VAR) if not func_def: print( - f"Environment var '{K8S_ENV_VAR_NAME}' is not set, nothing to execute." + f"Environment var '{JOB_PYTHON_FUNC_ENV_VAR}' is not set, nothing to execute." ) sys.exit(-1) diff --git a/src/k8s_job_scheduler/job_manager.py b/src/k8s_job_scheduler/job_manager.py index 6bb4ed1..d507bde 100644 --- a/src/k8s_job_scheduler/job_manager.py +++ b/src/k8s_job_scheduler/job_manager.py @@ -1,6 +1,7 @@ import datetime as dt import json import logging +import os import socket import types @@ -14,10 +15,14 @@ log = logging.getLogger(__name__) +basedir = os.path.abspath(os.path.dirname(__file__)) + config.load_kube_config() K8S_DEFAULT_NAMESPACE = "py-k8s-job-scheduler" -K8S_ENV_VAR_NAME = "K8S_JOB_FUNC" +JOB_PYTHON_FUNC_ENV_VAR = "JOB_PYTHON_FUNC" +JOB_PYTHON_EXECUTOR_ENV_VAR = "JOB_PYTHON_EXEC" +JOB_PYTHON_EXECUTOR_SCRIPT_PATH = "/".join([basedir, "job_executor.py"]) K8S_STATUS_MAP = { "ready": "READY", @@ -145,7 +150,7 @@ def job_logs(self, job_name): else None ) - def create_instant_job(self, func, cmd="python", *args, **kwargs): + def create_instant_python_job(self, func, cmd="python", *args, **kwargs): dt_scheduled = dt.datetime.utcnow() job_name = _gen_id("job", cmd, dt_scheduled) @@ -165,12 +170,21 @@ def create_instant_job(self, func, cmd="python", *args, **kwargs): meta=JobMeta(job_name, dt_scheduled, socket.gethostname()), ) + with open(JOB_PYTHON_EXECUTOR_SCRIPT_PATH, "r") as f: + executor_str = f.read() + + sysenv = { + JOB_PYTHON_FUNC_ENV_VAR: job_descriptor.dump(), + JOB_PYTHON_EXECUTOR_ENV_VAR: executor_str, + } + container = self._gen_container_specs( - cmd, {K8S_ENV_VAR_NAME: job_descriptor.dump()}, *args, **kwargs + "bash", + sysenv, + "-c", + f"pip install dill;printenv {JOB_PYTHON_EXECUTOR_ENV_VAR} > job_executor.py; python job_executor.py", ) - print(container) - api_response = self._batch_api.create_namespaced_job( namespace=self._namespace, body=client.V1Job( @@ -258,8 +272,10 @@ def scheduled_job_status(self, job_name): return api_response def create_scheduled_cli_job(self, schedule, cmd, *args, **kwargs): - job_name = _gen_id("cron-job", cmd) - pod_name = _gen_id("pod", cmd) + dt_scheduled = dt.datetime.utcnow() + + job_name = _gen_id("cron-job", cmd, dt_scheduled) + pod_name = _gen_id("pod", cmd, dt_scheduled) labels = {"job_name": job_name, "type": "scheduled_cli", "cmd": cmd} diff --git a/tests/test_job_manager.py b/tests/test_job_manager.py index 624ae37..75ff533 100644 --- a/tests/test_job_manager.py +++ b/tests/test_job_manager.py @@ -78,16 +78,17 @@ def _add(a, b): return result -def test_instant_func_job(jobman): - job_name = jobman.create_instant_job(func=_add, a=3, b=5) +def test_instant_python_job(jobman): + job_name = jobman.create_instant_python_job(func=_add, a=3, b=5) assert job_name.startswith("kjs-job-python-") assert jobman.list_jobs() == [job_name] time.sleep(10) status, _ = jobman.job_status(job_name) + print(status) - # print(jobman.job_logs(job_name)) + print(jobman.job_logs(job_name)) # assert status == "SUCCEEDED" From a9d0e38e1aa5da500713b2489d5f282b46535d7b Mon Sep 17 00:00:00 2001 From: Meir Tseitlin Date: Tue, 13 Feb 2024 13:40:52 -0600 Subject: [PATCH 06/16] temp --- src/k8s_job_scheduler/job_executor.py | 12 +++- src/k8s_job_scheduler/job_func_def.py | 81 --------------------------- src/k8s_job_scheduler/job_manager.py | 43 ++++++++------ tests/conftest.py | 2 +- tests/test_job_manager.py | 8 ++- 5 files changed, 41 insertions(+), 105 deletions(-) delete mode 100644 src/k8s_job_scheduler/job_func_def.py diff --git a/src/k8s_job_scheduler/job_executor.py b/src/k8s_job_scheduler/job_executor.py index 9a8964c..0e9ab0d 100644 --- a/src/k8s_job_scheduler/job_executor.py +++ b/src/k8s_job_scheduler/job_executor.py @@ -17,7 +17,7 @@ def execute(): try: func_def = os.getenv(JOB_PYTHON_FUNC_ENV_VAR) if not func_def: - print( + logger.error( f"Environment var '{JOB_PYTHON_FUNC_ENV_VAR}' is not set, nothing to execute." ) sys.exit(-1) @@ -25,7 +25,15 @@ def execute(): current_job = dill.loads( zlib.decompress(base64.urlsafe_b64decode(func_def.encode())) ) - return current_job.execute() + + logger.info( + f"=== Starting job {current_job['name']}, submitted from {current_job['host']} " + f"at {current_job['dt_scheduled'].isoformat()} ===" + ) + logger.debug(f"Job func: {current_job['func'].__name__}") + + return current_job["func"](*current_job["args"], **current_job["kwargs"]) + except Exception as e: logger.fatal(e) raise diff --git a/src/k8s_job_scheduler/job_func_def.py b/src/k8s_job_scheduler/job_func_def.py deleted file mode 100644 index be59697..0000000 --- a/src/k8s_job_scheduler/job_func_def.py +++ /dev/null @@ -1,81 +0,0 @@ -import base64 -import datetime -import logging -import zlib - -import dill - -logger = logging.getLogger(__name__) - - -class JobMeta: - """Helper class to hold job meta information""" - - def __init__(self, name: str, dt_scheduled: datetime, host: str): - self.name = name - self.dt_scheduled = dt_scheduled - self.host = host - - name: str = "[JOB-NAME]" - """Unique job name""" - - dt_scheduled: datetime = datetime.datetime.min - """Job scheduled datetime""" - - host: str = "[HOST]" - """Host responsible for spawning the job""" - - -class JobFuncDef: - """Helper class to hold the job function definition - - :param func: Pointer to the job function - :param args: Args for the job function - :param kwargs: Kwargs for the job function - :param meta: Metadata for the job - """ - - func = None - """Pointer to the job function""" - - args = None - """Args for the job function""" - - kwargs = None - """Kwargs for the job function""" - - meta: JobMeta = None - """Metadata for the job""" - - def __init__(self, func, args=None, kwargs=None, meta: JobMeta = None): - """ - Initialize the job function definition - - :param func: Pointer to the job function - :param args: Args for the job function - :param kwargs: Kwargs for the job function - :param meta: Metadata for the job - """ - self.func = func - self.args = args or [] - self.kwargs = kwargs or {} - self.meta = meta or JobMeta() - - def dump(self) -> str: - """Dump the job function definition to a base64 string""" - return base64.urlsafe_b64encode(zlib.compress(dill.dumps(self))).decode() - - def execute(self): - """Execute the job function""" - logger.info( - f"=== Starting job {self.meta.name}, submitted from {self.meta.host} " - f"at {self.meta.dt_scheduled.isoformat()} ===" - ) - logger.debug(f"Job func: {self.func.__name__}") - - return self.func(*self.args, **self.kwargs) - - @staticmethod - def load(s: str) -> "JobFuncDef": - """Load the job function definition from a base64 string""" - return dill.loads(zlib.decompress(base64.urlsafe_b64decode(s.encode()))) diff --git a/src/k8s_job_scheduler/job_manager.py b/src/k8s_job_scheduler/job_manager.py index d507bde..b325d54 100644 --- a/src/k8s_job_scheduler/job_manager.py +++ b/src/k8s_job_scheduler/job_manager.py @@ -1,15 +1,16 @@ -import datetime as dt +import base64 +import datetime import json import logging import os import socket import types +import zlib +import dill from kubernetes import client, config from kubernetes.client.rest import ApiException -from .job_func_def import JobFuncDef, JobMeta - __author__ = "Meir Tseitlin" __license__ = "LGPL-3.0-only" @@ -38,7 +39,7 @@ def _k8s_fqn(name): return name.replace("_", "-") -def _gen_id(prefix: str, name: str, dt: dt) -> str: +def _gen_id(prefix: str, name: str, dt: datetime) -> str: """Generate a job id from the name and the given datetime""" return f"kjs-{prefix}-{_k8s_fqn(name)}-{dt.strftime('%Y%m%d%H%M%S%f')}" @@ -118,6 +119,8 @@ def job_status(self, job_name): if api_response.status.ready: return K8S_STATUS_MAP["ready"], None + elif api_response.status.active: + return K8S_STATUS_MAP["active"], None elif api_response.status.terminating: return K8S_STATUS_MAP["terminating"], None elif api_response.status.succeeded: @@ -151,7 +154,7 @@ def job_logs(self, job_name): ) def create_instant_python_job(self, func, cmd="python", *args, **kwargs): - dt_scheduled = dt.datetime.utcnow() + dt_scheduled = datetime.datetime.utcnow() job_name = _gen_id("job", cmd, dt_scheduled) pod_name = _gen_id("pod", cmd, dt_scheduled) @@ -161,20 +164,24 @@ def create_instant_python_job(self, func, cmd="python", *args, **kwargs): labels.update(kwargs["labels"]) del kwargs["labels"] - # serialize function call - job_descriptor = JobFuncDef( - # func=func, - func=types.FunctionType(func.__code__, {}), - args=args, - kwargs=kwargs, - meta=JobMeta(job_name, dt_scheduled, socket.gethostname()), - ) + job_descriptor = { + "func": types.FunctionType(func.__code__, {}), + "args": args, + "kwargs": kwargs, + "name": job_name, + "dt_scheduled": dt_scheduled, + "host": str(socket.gethostname()), + } with open(JOB_PYTHON_EXECUTOR_SCRIPT_PATH, "r") as f: executor_str = f.read() + pcl = base64.urlsafe_b64encode( + zlib.compress(dill.dumps(job_descriptor)) + ).decode() + sysenv = { - JOB_PYTHON_FUNC_ENV_VAR: job_descriptor.dump(), + JOB_PYTHON_FUNC_ENV_VAR: pcl, JOB_PYTHON_EXECUTOR_ENV_VAR: executor_str, } @@ -182,7 +189,7 @@ def create_instant_python_job(self, func, cmd="python", *args, **kwargs): "bash", sysenv, "-c", - f"pip install dill;printenv {JOB_PYTHON_EXECUTOR_ENV_VAR} > job_executor.py; python job_executor.py", + f"pip install dill; printenv {JOB_PYTHON_EXECUTOR_ENV_VAR} > job_executor.py; {cmd} job_executor.py", ) api_response = self._batch_api.create_namespaced_job( @@ -208,7 +215,7 @@ def create_instant_python_job(self, func, cmd="python", *args, **kwargs): return api_response.metadata.name def create_instant_cli_job(self, cmd, *args, **kwargs): - dt_scheduled = dt.datetime.utcnow() + dt_scheduled = datetime.datetime.utcnow() job_name = _gen_id("cli-job", cmd, dt_scheduled) pod_name = _gen_id("pod", cmd, dt_scheduled) @@ -272,7 +279,7 @@ def scheduled_job_status(self, job_name): return api_response def create_scheduled_cli_job(self, schedule, cmd, *args, **kwargs): - dt_scheduled = dt.datetime.utcnow() + dt_scheduled = datetime.datetime.utcnow() job_name = _gen_id("cron-job", cmd, dt_scheduled) pod_name = _gen_id("pod", cmd, dt_scheduled) @@ -324,7 +331,7 @@ def delete_scheduled_job(self, job_name): return True def _gen_container_specs(self, cmd, system_env, *args, **kwargs): - dt_scheduled = dt.datetime.utcnow() + dt_scheduled = datetime.datetime.utcnow() args_arr = [f"{a}" for a in args] + [f"--{k}={v}" for k, v in kwargs.items()] container_name = _gen_id("cont", cmd, dt_scheduled) diff --git a/tests/conftest.py b/tests/conftest.py index 10de968..77fc16d 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -14,7 +14,7 @@ from k8s_job_scheduler.job_manager import K8S_DEFAULT_NAMESPACE, JobManager -DOCKER_IMAGE_PYTHON = "python:3.11.7-slim-bullseye" +DOCKER_IMAGE_PYTHON = "python:3.11.1-slim-bullseye" DOCKER_IMAGE_POSTGRES = "postgres:15" POSTGRES_USER = "postgres" POSTGRES_PASS = "q1234567" diff --git a/tests/test_job_manager.py b/tests/test_job_manager.py index 75ff533..ec8bb82 100644 --- a/tests/test_job_manager.py +++ b/tests/test_job_manager.py @@ -72,14 +72,14 @@ def test_env(jobman): assert jobman.job_logs(job_name).startswith("hi_there") -def _add(a, b): +def _func_add(a, b): result = a + b print(result) return result def test_instant_python_job(jobman): - job_name = jobman.create_instant_python_job(func=_add, a=3, b=5) + job_name = jobman.create_instant_python_job(func=_func_add, a=3, b=5) assert job_name.startswith("kjs-job-python-") assert jobman.list_jobs() == [job_name] @@ -89,7 +89,9 @@ def test_instant_python_job(jobman): print(status) print(jobman.job_logs(job_name)) - # assert status == "SUCCEEDED" + assert status == "SUCCEEDED" + + assert jobman.job_logs(job_name).endswith("8\n") def test_scheduled_job(jobman): From b93a0e2fbf6f632a45bbb08b2a92b4ea73fba1ca Mon Sep 17 00:00:00 2001 From: Meir Tseitlin Date: Tue, 13 Feb 2024 13:46:02 -0600 Subject: [PATCH 07/16] temp --- src/k8s_job_scheduler/job_manager.py | 4 ++-- .../{job_executor.py => python_executor.py} | 2 +- tests/test_job_manager.py | 5 +---- 3 files changed, 4 insertions(+), 7 deletions(-) rename src/k8s_job_scheduler/{job_executor.py => python_executor.py} (97%) diff --git a/src/k8s_job_scheduler/job_manager.py b/src/k8s_job_scheduler/job_manager.py index b325d54..a33f5aa 100644 --- a/src/k8s_job_scheduler/job_manager.py +++ b/src/k8s_job_scheduler/job_manager.py @@ -23,7 +23,7 @@ K8S_DEFAULT_NAMESPACE = "py-k8s-job-scheduler" JOB_PYTHON_FUNC_ENV_VAR = "JOB_PYTHON_FUNC" JOB_PYTHON_EXECUTOR_ENV_VAR = "JOB_PYTHON_EXEC" -JOB_PYTHON_EXECUTOR_SCRIPT_PATH = "/".join([basedir, "job_executor.py"]) +JOB_PYTHON_EXECUTOR_SCRIPT_PATH = "/".join([basedir, "python_executor.py"]) K8S_STATUS_MAP = { "ready": "READY", @@ -189,7 +189,7 @@ def create_instant_python_job(self, func, cmd="python", *args, **kwargs): "bash", sysenv, "-c", - f"pip install dill; printenv {JOB_PYTHON_EXECUTOR_ENV_VAR} > job_executor.py; {cmd} job_executor.py", + f"pip install dill; printenv {JOB_PYTHON_EXECUTOR_ENV_VAR} > executor.py; {cmd} executor.py", ) api_response = self._batch_api.create_namespaced_job( diff --git a/src/k8s_job_scheduler/job_executor.py b/src/k8s_job_scheduler/python_executor.py similarity index 97% rename from src/k8s_job_scheduler/job_executor.py rename to src/k8s_job_scheduler/python_executor.py index 0e9ab0d..ce94253 100644 --- a/src/k8s_job_scheduler/job_executor.py +++ b/src/k8s_job_scheduler/python_executor.py @@ -40,4 +40,4 @@ def execute(): if __name__ == "__main__": - execute() + sys.exit(execute()) diff --git a/tests/test_job_manager.py b/tests/test_job_manager.py index ec8bb82..5d48b11 100644 --- a/tests/test_job_manager.py +++ b/tests/test_job_manager.py @@ -75,7 +75,7 @@ def test_env(jobman): def _func_add(a, b): result = a + b print(result) - return result + return 0 def test_instant_python_job(jobman): @@ -86,9 +86,6 @@ def test_instant_python_job(jobman): time.sleep(10) status, _ = jobman.job_status(job_name) - print(status) - - print(jobman.job_logs(job_name)) assert status == "SUCCEEDED" assert jobman.job_logs(job_name).endswith("8\n") From e82b1458a951db60daa7b3d8a1f8cfdc6248c5bd Mon Sep 17 00:00:00 2001 From: Meir Tseitlin Date: Tue, 13 Feb 2024 13:49:04 -0600 Subject: [PATCH 08/16] temp --- tests/conftest.py | 31 --------------------------- tests/test_job_manager.py | 45 +-------------------------------------- 2 files changed, 1 insertion(+), 75 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index 77fc16d..71ef016 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -9,19 +9,11 @@ import time -import psycopg2 import pytest from k8s_job_scheduler.job_manager import K8S_DEFAULT_NAMESPACE, JobManager DOCKER_IMAGE_PYTHON = "python:3.11.1-slim-bullseye" -DOCKER_IMAGE_POSTGRES = "postgres:15" -POSTGRES_USER = "postgres" -POSTGRES_PASS = "q1234567" -POSTGRES_HOST = "postgres_db" -POSTGRES_URI = ( - f"postgresql://{POSTGRES_USER}:{POSTGRES_PASS}@{POSTGRES_HOST}:5432/postgres" -) @pytest.fixture @@ -50,26 +42,3 @@ def jobman(request, docker_image=DOCKER_IMAGE_PYTHON, env=None): time.sleep(0.3) return jobman - - -@pytest.fixture -def psql(): - conn = psycopg2.connect(dsn=POSTGRES_URI) - conn.autocommit = True - - # Drop all tables - with conn.cursor() as curs: - curs.execute( - """DO $$ DECLARE - r RECORD; - BEGIN - -- if the schema you operate on is not "current", you will want to - -- replace current_schema() in query with 'schematodeletetablesfrom' - -- *and* update the generate 'DROP...' accordingly. - FOR r IN (SELECT tablename FROM pg_tables WHERE schemaname = current_schema()) LOOP - EXECUTE 'DROP TABLE IF EXISTS ' || quote_ident(r.tablename) || ' CASCADE'; - END LOOP; - END $$;""" - ) - - return conn diff --git a/tests/test_job_manager.py b/tests/test_job_manager.py index 5d48b11..556b577 100644 --- a/tests/test_job_manager.py +++ b/tests/test_job_manager.py @@ -1,7 +1,6 @@ import time import pytest -from conftest import DOCKER_IMAGE_POSTGRES, POSTGRES_HOST, POSTGRES_USER def test_success(jobman): @@ -101,7 +100,7 @@ def test_scheduled_job(jobman): time.sleep(10) # assert job_name.startswith(f'job-kjs-{cmd.replace("_", "-")}') - # assert jobman.list_jobs() == [job_name] + assert jobman.list_scheduled_jobs() == [job_name] assert jobman.delete_scheduled_job(job_name) @@ -109,45 +108,3 @@ def test_scheduled_job(jobman): assert jobman.list_scheduled_jobs() == [] assert jobman.list_pods() == [] - - -@pytest.mark.skip(reason="Not fully implemented") -@pytest.mark.parametrize( - "jobman", [dict(docker_image=DOCKER_IMAGE_POSTGRES)], indirect=True -) -def test_db_access(jobman, psql): - cmd = "psql" - - job_name = jobman.create_instant_cli_job( - cmd, username=POSTGRES_USER, host=POSTGRES_HOST - ) - assert job_name.startswith(f'job-kjs-{cmd.replace("_", "-")}') - assert jobman.list_jobs() == [job_name] - - time.sleep(10) - - print(jobman.job_status(job_name)) - - print(jobman.job_logs(job_name)) - return - - assert jobman.job_status(job_name) == "SUCCESS" - - pods = jobman.list_pods(job_name=job_name) - assert pods[0].startswith(f'job-kjs-{cmd.replace("_", "-")}') - - print(jobman.job_logs(job_name)) - - with psql.cursor() as curs: - curs.execute( - "CREATE TABLE IF NOT EXISTS test1 ( \ - id integer PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY, \ - num integer NOT NULL default 0 \ - );" - ) - - curs.execute("INSERT INTO test1 (num) VALUES (5);") - - curs.execute("SELECT num from test1;") - - print(f"{curs.fetchone()}") From d404e5d3d2a57fbc76d381c82aabd040b7c98d53 Mon Sep 17 00:00:00 2001 From: Meir Tseitlin Date: Tue, 13 Feb 2024 13:52:36 -0600 Subject: [PATCH 09/16] temp --- README.md | 2 -- setup.cfg | 1 - tests/conftest.py | 9 --------- tests/test_job_manager.py | 3 ++- 4 files changed, 2 insertions(+), 13 deletions(-) diff --git a/README.md b/README.md index ff50ed4..4a34298 100644 --- a/README.md +++ b/README.md @@ -16,8 +16,6 @@ A package for managing Kubernetes jobs and cron jobs from Python. It allows running CLI or Python code using native Kubernetes job engine. -> This package is a continuous effort started by [Roemer Claasen](https://gitlab.com/roemer) in his [kubernetes-job](https://gitlab.com/roemer/kubernetes-job) work. - ## Installation ```python diff --git a/setup.cfg b/setup.cfg index 2f33f28..709d3e1 100644 --- a/setup.cfg +++ b/setup.cfg @@ -67,7 +67,6 @@ testing = setuptools pytest pytest-cov - psycopg2 [options.entry_points] # Add here console scripts like: diff --git a/tests/conftest.py b/tests/conftest.py index 71ef016..9db5d17 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,12 +1,3 @@ -""" - Dummy conftest.py for k8s_job_scheduler. - - If you don't know what this is for, just leave it empty. - Read more about conftest.py under: - - https://docs.pytest.org/en/stable/fixture.html - - https://docs.pytest.org/en/stable/writing_plugins.html -""" - import time import pytest diff --git a/tests/test_job_manager.py b/tests/test_job_manager.py index 556b577..28509bb 100644 --- a/tests/test_job_manager.py +++ b/tests/test_job_manager.py @@ -10,7 +10,8 @@ def test_success(jobman): assert job_name.startswith(f'kjs-cli-job-{cmd.replace("_", "-")}') assert jobman.list_jobs() == [job_name] - # assert jobman.job_status(job_name).active == 1 + status, details = jobman.job_status(job_name) + assert status == "ACTIVE" time.sleep(10) From 66eea6cfcb03726ee85632fd878bf044b1633358 Mon Sep 17 00:00:00 2001 From: Meir Tseitlin Date: Tue, 13 Feb 2024 13:56:17 -0600 Subject: [PATCH 10/16] temp --- src/k8s_job_scheduler/job_manager.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/k8s_job_scheduler/job_manager.py b/src/k8s_job_scheduler/job_manager.py index a33f5aa..55e60fb 100644 --- a/src/k8s_job_scheduler/job_manager.py +++ b/src/k8s_job_scheduler/job_manager.py @@ -153,7 +153,9 @@ def job_logs(self, job_name): else None ) - def create_instant_python_job(self, func, cmd="python", *args, **kwargs): + def create_instant_python_job( + self, func, cmd="python", dynamic_dill_install=True, *args, **kwargs + ): dt_scheduled = datetime.datetime.utcnow() job_name = _gen_id("job", cmd, dt_scheduled) @@ -189,7 +191,8 @@ def create_instant_python_job(self, func, cmd="python", *args, **kwargs): "bash", sysenv, "-c", - f"pip install dill; printenv {JOB_PYTHON_EXECUTOR_ENV_VAR} > executor.py; {cmd} executor.py", + f"{'pip install dill; ' if dynamic_dill_install else ''}" + f"printenv {JOB_PYTHON_EXECUTOR_ENV_VAR} > executor.py; {cmd} executor.py", ) api_response = self._batch_api.create_namespaced_job( From fdbe6b1532275e21ecafbdbe23d46887a24f5f38 Mon Sep 17 00:00:00 2001 From: Meir Tseitlin Date: Thu, 29 Feb 2024 09:59:07 -0600 Subject: [PATCH 11/16] temp --- README.md | 25 ++++++++++++++++++++++++- src/k8s_job_scheduler/job_manager.py | 26 ++++++++++++++++++++++++-- 2 files changed, 48 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 4a34298..20a956c 100644 --- a/README.md +++ b/README.md @@ -14,10 +14,33 @@ # k8s-job-scheduler -A package for managing Kubernetes jobs and cron jobs from Python. It allows running CLI or Python code using native Kubernetes job engine. +A package for managing Kubernetes jobs and cron jobs from Python. It allows running CLI scripts and Python function within native Kubernetes job engine. ## Installation ```python pip install k8s-job-scheduler ``` + +## Getting Started + +```commandline +from k8s_job_scheduler import JobManager + +def add(a, b): + return a + b + +manager = JobManager(docker_image="python:3.11.1-slim-bullseye") +job = manager.create_job(add, 1, 2) + +``` + +This example will create a Kubernetes job and run the function `add` with arguments 1 and 2 inside Python Docker container. + + +## Other Prerequisites + +### Executing Python functions withing Kubernetes containers + +* Docker images should include Python interpreter and all the dependencies required to execute the function. +* `dill` package is used to send the execution function and it's arguments when Docker container is created. If you wish to use standard Python Docker images or custom images which does not have `dill` package preinstalled, it is possible to specify `dynamic_dill_install=True` when calling `create_instant_python_job`. In this case `dill` will be dynamically installed before running the code. diff --git a/src/k8s_job_scheduler/job_manager.py b/src/k8s_job_scheduler/job_manager.py index 55e60fb..269e848 100644 --- a/src/k8s_job_scheduler/job_manager.py +++ b/src/k8s_job_scheduler/job_manager.py @@ -107,11 +107,33 @@ def delete_pod(self, pod_name): return api_response - def list_jobs(self): + def list_jobs(self, include_details=False): ret = self._batch_api.list_namespaced_job(namespace=self._namespace) + if include_details: + return ret.items + return [i.metadata.name for i in ret.items] + @staticmethod + def parse_status(api_response): + if api_response.status.ready: + return K8S_STATUS_MAP["ready"] + elif api_response.status.active: + return K8S_STATUS_MAP["active"] + elif api_response.status.terminating: + return K8S_STATUS_MAP["terminating"] + elif api_response.status.succeeded: + return K8S_STATUS_MAP["succeeded"] + elif api_response.status.failed: + return K8S_STATUS_MAP["failed"] + else: + print(api_response.status) + return K8S_STATUS_MAP["missing"] + + def read_job_status(self, job_name): + return self._batch_api.read_namespaced_job_status(job_name, self._namespace) + def job_status(self, job_name): api_response = self._batch_api.read_namespaced_job_status( job_name, self._namespace @@ -160,7 +182,7 @@ def create_instant_python_job( job_name = _gen_id("job", cmd, dt_scheduled) pod_name = _gen_id("pod", cmd, dt_scheduled) - labels = {"job_name": job_name, "type": "scheduled_func", "cmd": cmd} + labels = {"job_name": job_name, "type": "instant_func", "cmd": cmd} if "labels" in kwargs: labels.update(kwargs["labels"]) From 8cadbb00a65242129bb509ed659f780a808f1507 Mon Sep 17 00:00:00 2001 From: Meir Tseitlin Date: Wed, 8 May 2024 16:13:35 -0500 Subject: [PATCH 12/16] change job naming convention --- src/k8s_job_scheduler/job_manager.py | 4 ++-- tests/test_job_manager.py | 12 ++++++------ 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/src/k8s_job_scheduler/job_manager.py b/src/k8s_job_scheduler/job_manager.py index 269e848..1a82d58 100644 --- a/src/k8s_job_scheduler/job_manager.py +++ b/src/k8s_job_scheduler/job_manager.py @@ -180,7 +180,7 @@ def create_instant_python_job( ): dt_scheduled = datetime.datetime.utcnow() - job_name = _gen_id("job", cmd, dt_scheduled) + job_name = _gen_id("inst-python-job", cmd, dt_scheduled) pod_name = _gen_id("pod", cmd, dt_scheduled) labels = {"job_name": job_name, "type": "instant_func", "cmd": cmd} @@ -242,7 +242,7 @@ def create_instant_python_job( def create_instant_cli_job(self, cmd, *args, **kwargs): dt_scheduled = datetime.datetime.utcnow() - job_name = _gen_id("cli-job", cmd, dt_scheduled) + job_name = _gen_id("inst-cli-job", cmd, dt_scheduled) pod_name = _gen_id("pod", cmd, dt_scheduled) labels = {"job_name": job_name, "type": "scheduled_cli", "cmd": cmd} diff --git a/tests/test_job_manager.py b/tests/test_job_manager.py index 28509bb..ac62318 100644 --- a/tests/test_job_manager.py +++ b/tests/test_job_manager.py @@ -7,7 +7,7 @@ def test_success(jobman): cmd = "python" job_name = jobman.create_instant_cli_job(cmd, "--help") - assert job_name.startswith(f'kjs-cli-job-{cmd.replace("_", "-")}') + assert job_name.startswith(f'kjs-inst-cli-job-{cmd.replace("_", "-")}') assert jobman.list_jobs() == [job_name] status, details = jobman.job_status(job_name) @@ -16,7 +16,7 @@ def test_success(jobman): time.sleep(10) pods = jobman.list_pods(job_name=job_name) - assert pods[0].startswith(f'kjs-cli-job-{cmd.replace("_", "-")}') + assert pods[0].startswith(f'kjs-inst-cli-job-{cmd.replace("_", "-")}') assert len(jobman.job_logs(job_name)) > 10 @@ -35,7 +35,7 @@ def test_failed(jobman): cmd = "python" job_name = jobman.create_instant_cli_job(cmd, "---h") - assert job_name.startswith(f'kjs-cli-job-{cmd.replace("_", "-")}') + assert job_name.startswith(f'kjs-inst-cli-job-{cmd.replace("_", "-")}') assert jobman.list_jobs() == [job_name] time.sleep(10) @@ -44,7 +44,7 @@ def test_failed(jobman): assert details["reason"] == "BackoffLimitExceeded" pods = jobman.list_pods(job_name=job_name) - assert pods[0].startswith(f'kjs-cli-job-{cmd.replace("_", "-")}') + assert pods[0].startswith(f'kjs-inst-cli-job-{cmd.replace("_", "-")}') assert len(jobman.job_logs(job_name)) > 10 @@ -61,7 +61,7 @@ def test_env(jobman): cmd = "printenv" job_name = jobman.create_instant_cli_job(cmd, "TEST_VAR") - assert job_name.startswith("kjs-cli-job-printenv-") + assert job_name.startswith("kjs-inst-cli-job-printenv-") assert jobman.list_jobs() == [job_name] time.sleep(10) @@ -80,7 +80,7 @@ def _func_add(a, b): def test_instant_python_job(jobman): job_name = jobman.create_instant_python_job(func=_func_add, a=3, b=5) - assert job_name.startswith("kjs-job-python-") + assert job_name.startswith("kjs-inst-python-job-") assert jobman.list_jobs() == [job_name] time.sleep(10) From c48e358ce0c83552be40fec8e853201e0d44e90d Mon Sep 17 00:00:00 2001 From: Meir Tseitlin Date: Wed, 8 May 2024 16:41:09 -0500 Subject: [PATCH 13/16] change job naming convention --- src/k8s_job_scheduler/job_manager.py | 4 ++-- tests/test_job_manager.py | 12 ++++++------ 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/src/k8s_job_scheduler/job_manager.py b/src/k8s_job_scheduler/job_manager.py index 1a82d58..2f169b5 100644 --- a/src/k8s_job_scheduler/job_manager.py +++ b/src/k8s_job_scheduler/job_manager.py @@ -180,7 +180,7 @@ def create_instant_python_job( ): dt_scheduled = datetime.datetime.utcnow() - job_name = _gen_id("inst-python-job", cmd, dt_scheduled) + job_name = _gen_id("inst-job", cmd, dt_scheduled) pod_name = _gen_id("pod", cmd, dt_scheduled) labels = {"job_name": job_name, "type": "instant_func", "cmd": cmd} @@ -242,7 +242,7 @@ def create_instant_python_job( def create_instant_cli_job(self, cmd, *args, **kwargs): dt_scheduled = datetime.datetime.utcnow() - job_name = _gen_id("inst-cli-job", cmd, dt_scheduled) + job_name = _gen_id("inst-job-cli", cmd, dt_scheduled) pod_name = _gen_id("pod", cmd, dt_scheduled) labels = {"job_name": job_name, "type": "scheduled_cli", "cmd": cmd} diff --git a/tests/test_job_manager.py b/tests/test_job_manager.py index ac62318..c8a9e37 100644 --- a/tests/test_job_manager.py +++ b/tests/test_job_manager.py @@ -7,7 +7,7 @@ def test_success(jobman): cmd = "python" job_name = jobman.create_instant_cli_job(cmd, "--help") - assert job_name.startswith(f'kjs-inst-cli-job-{cmd.replace("_", "-")}') + assert job_name.startswith(f'kjs-inst-job-cli-{cmd.replace("_", "-")}') assert jobman.list_jobs() == [job_name] status, details = jobman.job_status(job_name) @@ -16,7 +16,7 @@ def test_success(jobman): time.sleep(10) pods = jobman.list_pods(job_name=job_name) - assert pods[0].startswith(f'kjs-inst-cli-job-{cmd.replace("_", "-")}') + assert pods[0].startswith(f'kjs-inst-job-cli-{cmd.replace("_", "-")}') assert len(jobman.job_logs(job_name)) > 10 @@ -35,7 +35,7 @@ def test_failed(jobman): cmd = "python" job_name = jobman.create_instant_cli_job(cmd, "---h") - assert job_name.startswith(f'kjs-inst-cli-job-{cmd.replace("_", "-")}') + assert job_name.startswith(f'kjs-inst-job-cli-{cmd.replace("_", "-")}') assert jobman.list_jobs() == [job_name] time.sleep(10) @@ -44,7 +44,7 @@ def test_failed(jobman): assert details["reason"] == "BackoffLimitExceeded" pods = jobman.list_pods(job_name=job_name) - assert pods[0].startswith(f'kjs-inst-cli-job-{cmd.replace("_", "-")}') + assert pods[0].startswith(f'kjs-inst-job-cli-{cmd.replace("_", "-")}') assert len(jobman.job_logs(job_name)) > 10 @@ -61,7 +61,7 @@ def test_env(jobman): cmd = "printenv" job_name = jobman.create_instant_cli_job(cmd, "TEST_VAR") - assert job_name.startswith("kjs-inst-cli-job-printenv-") + assert job_name.startswith("kjs-inst-job-cli-printenv-") assert jobman.list_jobs() == [job_name] time.sleep(10) @@ -80,7 +80,7 @@ def _func_add(a, b): def test_instant_python_job(jobman): job_name = jobman.create_instant_python_job(func=_func_add, a=3, b=5) - assert job_name.startswith("kjs-inst-python-job-") + assert job_name.startswith("kjs-inst-job-") assert jobman.list_jobs() == [job_name] time.sleep(10) From d18092a41b3ec62e2742c007c583a657cc576401 Mon Sep 17 00:00:00 2001 From: Meir Tseitlin Date: Wed, 8 May 2024 17:28:53 -0500 Subject: [PATCH 14/16] change job naming convention --- src/k8s_job_scheduler/job_manager.py | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) diff --git a/src/k8s_job_scheduler/job_manager.py b/src/k8s_job_scheduler/job_manager.py index 2f169b5..0dff1df 100644 --- a/src/k8s_job_scheduler/job_manager.py +++ b/src/k8s_job_scheduler/job_manager.py @@ -229,9 +229,7 @@ def create_instant_python_job( spec=client.V1PodSpec( restart_policy="Never", containers=[container] ), - metadata=client.V1ObjectMeta( - name=pod_name, labels={"pod_name": pod_name} - ), + metadata=client.V1ObjectMeta(name=pod_name, labels=labels), ), ), ), @@ -244,7 +242,7 @@ def create_instant_cli_job(self, cmd, *args, **kwargs): job_name = _gen_id("inst-job-cli", cmd, dt_scheduled) pod_name = _gen_id("pod", cmd, dt_scheduled) - labels = {"job_name": job_name, "type": "scheduled_cli", "cmd": cmd} + labels = {"job_name": job_name, "type": "instant_cli", "cmd": cmd} if "labels" in kwargs: labels.update(kwargs["labels"]) @@ -264,9 +262,7 @@ def create_instant_cli_job(self, cmd, *args, **kwargs): spec=client.V1PodSpec( restart_policy="Never", containers=[container] ), - metadata=client.V1ObjectMeta( - name=pod_name, labels={"pod_name": pod_name} - ), + metadata=client.V1ObjectMeta(name=pod_name, labels=labels), ), ), ), @@ -333,9 +329,7 @@ def create_scheduled_cli_job(self, schedule, cmd, *args, **kwargs): ), ), ), - metadata=client.V1ObjectMeta( - name=pod_name, labels={"pod_name": pod_name} - ), + metadata=client.V1ObjectMeta(name=pod_name, labels=labels), ), ), ), From d20ee414823cd90df1786e14da10ed7b78500a63 Mon Sep 17 00:00:00 2001 From: Meir Tseitlin Date: Mon, 20 May 2024 16:39:24 -0500 Subject: [PATCH 15/16] temp --- src/k8s_job_scheduler/job_manager.py | 34 +++++++++++++++++++++++++--- 1 file changed, 31 insertions(+), 3 deletions(-) diff --git a/src/k8s_job_scheduler/job_manager.py b/src/k8s_job_scheduler/job_manager.py index 0dff1df..980885d 100644 --- a/src/k8s_job_scheduler/job_manager.py +++ b/src/k8s_job_scheduler/job_manager.py @@ -163,12 +163,40 @@ def job_logs(self, job_name): # Get pods pods = self.list_pods(job_name=job_name) - all_logs = [ - self._core_api.read_namespaced_pod_log(pod, self._namespace) for pod in pods + all_status = [ + self._core_api.read_namespaced_pod_status(pod, self._namespace) + for pod in pods ] + # Check status before pulling logs + active_pods = [ + pod.metadata.name for pod in all_status if pod.status.phase == "active" + ] + inactive_pods = { + pod.metadata.name: pod for pod in all_status if pod.status.phase != "active" + } + + inactive_statuses = { + pod: ( + f"Pod {pod} is inactive.
" + f"Phase: {inactive_pods[pod].status.phase}
" + f'Container state: {inactive_pods[pod].status.container_statuses[0].state if inactive_pods[pod].status.container_statuses else "N/A"}
' # noqa: E501 + f"Conditions: {inactive_pods[pod].status.conditions}" + ) + for pod in inactive_pods + } + + all_logs = { + pod: self._core_api.read_namespaced_pod_log_with_http_info( + pod, self._namespace + ) + for pod in active_pods + } + + all_logs.update(inactive_statuses) + return ( - all_logs[0] + next(iter(all_logs.values())) if len(all_logs) == 1 else all_logs if len(all_logs) > 1 From 895750f86d680d2be57dbeafe230aa6b35d15a76 Mon Sep 17 00:00:00 2001 From: Meir Tseitlin Date: Tue, 21 May 2024 16:17:51 -0500 Subject: [PATCH 16/16] temp --- src/k8s_job_scheduler/job_manager.py | 26 +++++++++++++++----------- 1 file changed, 15 insertions(+), 11 deletions(-) diff --git a/src/k8s_job_scheduler/job_manager.py b/src/k8s_job_scheduler/job_manager.py index 980885d..189b5de 100644 --- a/src/k8s_job_scheduler/job_manager.py +++ b/src/k8s_job_scheduler/job_manager.py @@ -169,28 +169,32 @@ def job_logs(self, job_name): ] # Check status before pulling logs - active_pods = [ - pod.metadata.name for pod in all_status if pod.status.phase == "active" + pods_with_logs = [ + pod.metadata.name + for pod in all_status + if pod.status.phase in ["active", "Succeeded", "Running"] ] - inactive_pods = { - pod.metadata.name: pod for pod in all_status if pod.status.phase != "active" + pods_with_no_log = { + pod.metadata.name: pod + for pod in all_status + if pod.status.phase not in ["active", "Succeeded", "Running"] } inactive_statuses = { pod: ( - f"Pod {pod} is inactive.
" - f"Phase: {inactive_pods[pod].status.phase}
" - f'Container state: {inactive_pods[pod].status.container_statuses[0].state if inactive_pods[pod].status.container_statuses else "N/A"}
' # noqa: E501 - f"Conditions: {inactive_pods[pod].status.conditions}" + f"Pod {pod} has no logs.
" + f"Phase: {pods_with_no_log[pod].status.phase}
" + f'Container state: {pods_with_no_log[pod].status.container_statuses[0].state if pods_with_no_log[pod].status.container_statuses else "N/A"}
' # noqa: E501 + f"Conditions: {pods_with_no_log[pod].status.conditions}" ) - for pod in inactive_pods + for pod in pods_with_no_log } all_logs = { pod: self._core_api.read_namespaced_pod_log_with_http_info( pod, self._namespace - ) - for pod in active_pods + )[0].replace("\\n", "
") + for pod in pods_with_logs } all_logs.update(inactive_statuses)