From ce84cd3b552327a46cb2ee3c5a7e49918eadb025 Mon Sep 17 00:00:00 2001 From: yokomotod Date: Tue, 28 Nov 2023 15:46:01 +0900 Subject: [PATCH 1/2] add max_child_jobs option (#24) * add max_child_jobs option * integration test * fix * fix tox * fix lint * validate max_child_jobs --- kannon/master.py | 29 ++++++--- test/integration_test/test_master_build.py | 68 +++++++++++++++++++++- tox.ini | 2 +- 3 files changed, 88 insertions(+), 11 deletions(-) diff --git a/kannon/master.py b/kannon/master.py index 5b71618..1d17cf8 100644 --- a/kannon/master.py +++ b/kannon/master.py @@ -1,9 +1,10 @@ +from __future__ import annotations + import logging import os from collections import deque from copy import deepcopy from time import sleep -from typing import Deque, Dict, List, Optional, Set import gokart from gokart.target import make_target @@ -26,7 +27,8 @@ def __init__( # kannon resources job_prefix: str, path_child_script: str = "./run_child.py", - env_to_inherit: Optional[List[str]] = None, + env_to_inherit: list[str] | None = None, + max_child_jobs: int | None = None, ) -> None: # validation if not os.path.exists(path_child_script): @@ -40,8 +42,11 @@ def __init__( if env_to_inherit is None: env_to_inherit = ["TASK_WORKSPACE_DIRECTORY"] self.env_to_inherit = env_to_inherit + if max_child_jobs is not None and max_child_jobs <= 0: + raise ValueError(f"max_child_jobs must be positive integer, but got {max_child_jobs}") + self.max_child_jobs = max_child_jobs - self.task_id_to_job_name: Dict[str, str] = dict() + self.task_id_to_job_name: dict[str, str] = dict() def build(self, root_task: gokart.TaskOnKart) -> None: # push tasks into queue @@ -49,14 +54,16 @@ def build(self, root_task: gokart.TaskOnKart) -> None: task_queue = self._create_task_queue(root_task) # consume task queue - launched_task_ids: Set[str] = set() + running_task_ids: set[str] = set() logger.info("Consuming task queue...") while task_queue: task = task_queue.popleft() if task.complete(): logger.info(f"Task {self._gen_task_info(task)} is already completed.") + if task.make_unique_id() in running_task_ids: + running_task_ids.remove(task.make_unique_id()) continue - if task.make_unique_id() in launched_task_ids: + if task.make_unique_id() in running_task_ids: logger.info(f"Task {self._gen_task_info(task)} is still running on child job.") task_queue.append(task) continue @@ -70,9 +77,13 @@ def build(self, root_task: gokart.TaskOnKart) -> None: continue # execute task if isinstance(task, TaskOnBullet): + if self.max_child_jobs is not None and len(running_task_ids) >= self.max_child_jobs: + task_queue.append(task) # re-enqueue task to check later + logger.info(f"Reach max_child_jobs, waiting to run task {self._gen_task_info(task)} on child job...") + continue logger.info(f"Trying to run task {self._gen_task_info(task)} on child job...") self._exec_bullet_task(task) - launched_task_ids.add(task.make_unique_id()) # mark as already launched task + running_task_ids.add(task.make_unique_id()) # mark as already launched task task_queue.append(task) # re-enqueue task to check if it is done elif isinstance(task, gokart.TaskOnKart): logger.info(f"Executing task {self._gen_task_info(task)} on master job...") @@ -83,9 +94,9 @@ def build(self, root_task: gokart.TaskOnKart) -> None: logger.info("All tasks completed!") - def _create_task_queue(self, root_task: gokart.TaskOnKart) -> Deque[gokart.TaskOnKart]: - task_queue: Deque[gokart.TaskOnKart] = deque() - visited_task_ids: Set[str] = set() + def _create_task_queue(self, root_task: gokart.TaskOnKart) -> deque[gokart.TaskOnKart]: + task_queue: deque[gokart.TaskOnKart] = deque() + visited_task_ids: set[str] = set() def _rec_enqueue_task(task: gokart.TaskOnKart) -> None: """Traversal task tree in post-order to push tasks into task queue.""" diff --git a/test/integration_test/test_master_build.py b/test/integration_test/test_master_build.py index b824a56..fa36a4d 100644 --- a/test/integration_test/test_master_build.py +++ b/test/integration_test/test_master_build.py @@ -41,13 +41,14 @@ def complete(self) -> bool: class MockKannon(Kannon): - def __init__(self) -> None: + def __init__(self, *, max_child_jobs: int | None = None) -> None: super().__init__( api_instance=None, template_job=client.V1Job(metadata=client.V1ObjectMeta()), job_prefix="", path_child_script=__file__, # just pass any existing file as dummy env_to_inherit=None, + max_child_jobs=max_child_jobs, ) def _exec_gokart_task(self, task: MockTaskOnKart) -> None: @@ -165,6 +166,71 @@ def requires(self) -> list[Child]: 'INFO:kannon.master:All tasks completed!', ]) + def test_three_task_on_bullet_with_max_child_jobs(self) -> None: + self.maxDiff = None + + class Child(MockTaskOnBullet): + param = luigi.IntParameter() + + c1 = Child(param=1) + c1.wait_sec = 4 + c2 = Child(param=2) + c2.wait_sec = 3 + c3 = Child(param=3) + c3.wait_sec = 2 + + class Parent(MockTaskOnKart): + + def requires(self) -> list[Child]: + return [c1, c2, c3] + + root_task = Parent() + + master = MockKannon(max_child_jobs=2) + with self.assertLogs() as cm: + master.build(root_task) + + c1_task_info = master._gen_task_info(c1) + c2_task_info = master._gen_task_info(c2) + c3_task_info = master._gen_task_info(c3) + root_task_info = master._gen_task_info(root_task) + self.assertEqual( + cm.output, + [ + 'INFO:kannon.master:Creating task queue...', + f'INFO:kannon.master:Task {c1_task_info} is pushed to task queue', + f'INFO:kannon.master:Task {c2_task_info} is pushed to task queue', + f'INFO:kannon.master:Task {c3_task_info} is pushed to task queue', + f'INFO:kannon.master:Task {root_task_info} is pushed to task queue', + 'INFO:kannon.master:Total tasks in task queue: 4', + 'INFO:kannon.master:Consuming task queue...', + f'INFO:kannon.master:Checking if task {c1_task_info} is executable...', + f'INFO:kannon.master:Trying to run task {c1_task_info} on child job...', + f'INFO:kannon.master:Checking if task {c2_task_info} is executable...', + f'INFO:kannon.master:Trying to run task {c2_task_info} on child job...', + # c3 has to wait + f'INFO:kannon.master:Checking if task {c3_task_info} is executable...', + f'INFO:kannon.master:Reach max_child_jobs, waiting to run task {c3_task_info} on child job...', + f'INFO:kannon.master:Checking if task {root_task_info} is executable...', + f'INFO:kannon.master:Task {c1_task_info} is still running on child job.', + f'INFO:kannon.master:Task {c2_task_info} is still running on child job.', + f'INFO:kannon.master:Checking if task {c3_task_info} is executable...', + f'INFO:kannon.master:Reach max_child_jobs, waiting to run task {c3_task_info} on child job...', + f'INFO:kannon.master:Checking if task {root_task_info} is executable...', + f'INFO:kannon.master:Task {c1_task_info} is already completed.', + f'INFO:kannon.master:Task {c2_task_info} is already completed.', + # now c3 can start + f'INFO:kannon.master:Checking if task {c3_task_info} is executable...', + f'INFO:kannon.master:Trying to run task {c3_task_info} on child job...', + f'INFO:kannon.master:Checking if task {root_task_info} is executable...', + f'INFO:kannon.master:Task {c3_task_info} is still running on child job.', + f'INFO:kannon.master:Checking if task {root_task_info} is executable...', + f'INFO:kannon.master:Executing task {root_task_info} on master job...', + f'INFO:kannon.master:Completed task {root_task_info} on master job.', + f'INFO:kannon.master:Task {c3_task_info} is already completed.', + 'INFO:kannon.master:All tasks completed!', + ]) + if __name__ == '__main__': unittest.main() diff --git a/tox.ini b/tox.ini index 9fc4803..6bafbd0 100644 --- a/tox.ini +++ b/tox.ini @@ -5,7 +5,7 @@ isolated_build = true [testenv] allowlist_externals = coverage skip_install = true -commands = coverage run -m unittest discover -s test +commands = coverage run -m unittest discover -v [testenv:yapf] allowlist_externals = yapf From 588e87fe97b5e061d2e207c894ff503ac29e81d6 Mon Sep 17 00:00:00 2001 From: Yutaro Oguri <63549742+maronuu@users.noreply.github.com> Date: Tue, 5 Dec 2023 18:50:28 +0900 Subject: [PATCH 2/2] Additional job check (#37) * Add additional waiting * fix mock of kube api * add util method to check child task * fix exception * Remove uncalled assert * Add type annotation * fix --------- Co-authored-by: yutaro-oguri --- kannon/master.py | 19 ++++++++++++++++--- test/integration_test/test_master_build.py | 12 ++++++++++++ 2 files changed, 28 insertions(+), 3 deletions(-) diff --git a/kannon/master.py b/kannon/master.py index 1d17cf8..ca32dd9 100644 --- a/kannon/master.py +++ b/kannon/master.py @@ -64,8 +64,10 @@ def build(self, root_task: gokart.TaskOnKart) -> None: running_task_ids.remove(task.make_unique_id()) continue if task.make_unique_id() in running_task_ids: + # check if task is still running on child job + self._check_child_task_status(task) logger.info(f"Task {self._gen_task_info(task)} is still running on child job.") - task_queue.append(task) + task_queue.append(task) # re-enqueue task to check if it is done continue # TODO: enable user to specify duration to sleep for each task @@ -136,8 +138,7 @@ def _exec_bullet_task(self, task: TaskOnBullet) -> None: ) create_job(self.api_instance, job, self.namespace) logger.info(f"Created child job {job_name} with task {self._gen_task_info(task)}") - task_unique_id = task.make_unique_id() - self.task_id_to_job_name[task_unique_id] = job_name + self.task_id_to_job_name[task.make_unique_id()] = job_name def _create_child_job_object(self, job_name: str, task_pkl_path: str) -> client.V1Job: # TODO: use python -c to avoid dependency to execute_task.py @@ -174,6 +175,18 @@ def _gen_task_info(task: gokart.TaskOnKart) -> str: def _gen_pkl_path(task: gokart.TaskOnKart) -> str: return os.path.join(task.workspace_directory, 'kannon', f'task_obj_{task.make_unique_id()}.pkl') + def _check_child_task_status(self, task: TaskOnBullet) -> None: + if task.make_unique_id() not in self.task_id_to_job_name: + raise ValueError(f"Task {self._gen_task_info(task)} is not found in `task_id_to_job_name`") + job_name = self.task_id_to_job_name[task.make_unique_id()] + job_status = get_job_status( + self.api_instance, + job_name, + self.namespace, + ) + if job_status == JobStatus.FAILED: + raise RuntimeError(f"Task {self._gen_task_info(task)} on job {job_name} has failed.") + def _is_executable(self, task: gokart.TaskOnKart) -> bool: children = flatten(task.requires()) diff --git a/test/integration_test/test_master_build.py b/test/integration_test/test_master_build.py index fa36a4d..128ea0b 100644 --- a/test/integration_test/test_master_build.py +++ b/test/integration_test/test_master_build.py @@ -7,6 +7,7 @@ import gokart import luigi from kubernetes import client +from luigi.task import flatten from kannon import Kannon, TaskOnBullet @@ -55,8 +56,19 @@ def _exec_gokart_task(self, task: MockTaskOnKart) -> None: task.run() def _exec_bullet_task(self, task: MockTaskOnBullet) -> None: + self.task_id_to_job_name[task.make_unique_id()] = "dummy_job_name" task.run() + def _check_child_task_status(self, task: MockTaskOnBullet) -> None: + return None + + def _is_executable(self, task: MockTaskOnKart) -> bool: + children = flatten(task.requires()) + for child in children: + if not child.complete(): + return False + return True + class TestConsumeTaskQueue(unittest.TestCase):