From ce84cd3b552327a46cb2ee3c5a7e49918eadb025 Mon Sep 17 00:00:00 2001 From: yokomotod Date: Tue, 28 Nov 2023 15:46:01 +0900 Subject: [PATCH] add max_child_jobs option (#24) * add max_child_jobs option * integration test * fix * fix tox * fix lint * validate max_child_jobs --- kannon/master.py | 29 ++++++--- test/integration_test/test_master_build.py | 68 +++++++++++++++++++++- tox.ini | 2 +- 3 files changed, 88 insertions(+), 11 deletions(-) diff --git a/kannon/master.py b/kannon/master.py index 5b71618..1d17cf8 100644 --- a/kannon/master.py +++ b/kannon/master.py @@ -1,9 +1,10 @@ +from __future__ import annotations + import logging import os from collections import deque from copy import deepcopy from time import sleep -from typing import Deque, Dict, List, Optional, Set import gokart from gokart.target import make_target @@ -26,7 +27,8 @@ def __init__( # kannon resources job_prefix: str, path_child_script: str = "./run_child.py", - env_to_inherit: Optional[List[str]] = None, + env_to_inherit: list[str] | None = None, + max_child_jobs: int | None = None, ) -> None: # validation if not os.path.exists(path_child_script): @@ -40,8 +42,11 @@ def __init__( if env_to_inherit is None: env_to_inherit = ["TASK_WORKSPACE_DIRECTORY"] self.env_to_inherit = env_to_inherit + if max_child_jobs is not None and max_child_jobs <= 0: + raise ValueError(f"max_child_jobs must be positive integer, but got {max_child_jobs}") + self.max_child_jobs = max_child_jobs - self.task_id_to_job_name: Dict[str, str] = dict() + self.task_id_to_job_name: dict[str, str] = dict() def build(self, root_task: gokart.TaskOnKart) -> None: # push tasks into queue @@ -49,14 +54,16 @@ def build(self, root_task: gokart.TaskOnKart) -> None: task_queue = self._create_task_queue(root_task) # consume task queue - launched_task_ids: Set[str] = set() + running_task_ids: set[str] = set() logger.info("Consuming task queue...") while task_queue: task = task_queue.popleft() if task.complete(): logger.info(f"Task {self._gen_task_info(task)} is already completed.") + if task.make_unique_id() in running_task_ids: + running_task_ids.remove(task.make_unique_id()) continue - if task.make_unique_id() in launched_task_ids: + if task.make_unique_id() in running_task_ids: logger.info(f"Task {self._gen_task_info(task)} is still running on child job.") task_queue.append(task) continue @@ -70,9 +77,13 @@ def build(self, root_task: gokart.TaskOnKart) -> None: continue # execute task if isinstance(task, TaskOnBullet): + if self.max_child_jobs is not None and len(running_task_ids) >= self.max_child_jobs: + task_queue.append(task) # re-enqueue task to check later + logger.info(f"Reach max_child_jobs, waiting to run task {self._gen_task_info(task)} on child job...") + continue logger.info(f"Trying to run task {self._gen_task_info(task)} on child job...") self._exec_bullet_task(task) - launched_task_ids.add(task.make_unique_id()) # mark as already launched task + running_task_ids.add(task.make_unique_id()) # mark as already launched task task_queue.append(task) # re-enqueue task to check if it is done elif isinstance(task, gokart.TaskOnKart): logger.info(f"Executing task {self._gen_task_info(task)} on master job...") @@ -83,9 +94,9 @@ def build(self, root_task: gokart.TaskOnKart) -> None: logger.info("All tasks completed!") - def _create_task_queue(self, root_task: gokart.TaskOnKart) -> Deque[gokart.TaskOnKart]: - task_queue: Deque[gokart.TaskOnKart] = deque() - visited_task_ids: Set[str] = set() + def _create_task_queue(self, root_task: gokart.TaskOnKart) -> deque[gokart.TaskOnKart]: + task_queue: deque[gokart.TaskOnKart] = deque() + visited_task_ids: set[str] = set() def _rec_enqueue_task(task: gokart.TaskOnKart) -> None: """Traversal task tree in post-order to push tasks into task queue.""" diff --git a/test/integration_test/test_master_build.py b/test/integration_test/test_master_build.py index b824a56..fa36a4d 100644 --- a/test/integration_test/test_master_build.py +++ b/test/integration_test/test_master_build.py @@ -41,13 +41,14 @@ def complete(self) -> bool: class MockKannon(Kannon): - def __init__(self) -> None: + def __init__(self, *, max_child_jobs: int | None = None) -> None: super().__init__( api_instance=None, template_job=client.V1Job(metadata=client.V1ObjectMeta()), job_prefix="", path_child_script=__file__, # just pass any existing file as dummy env_to_inherit=None, + max_child_jobs=max_child_jobs, ) def _exec_gokart_task(self, task: MockTaskOnKart) -> None: @@ -165,6 +166,71 @@ def requires(self) -> list[Child]: 'INFO:kannon.master:All tasks completed!', ]) + def test_three_task_on_bullet_with_max_child_jobs(self) -> None: + self.maxDiff = None + + class Child(MockTaskOnBullet): + param = luigi.IntParameter() + + c1 = Child(param=1) + c1.wait_sec = 4 + c2 = Child(param=2) + c2.wait_sec = 3 + c3 = Child(param=3) + c3.wait_sec = 2 + + class Parent(MockTaskOnKart): + + def requires(self) -> list[Child]: + return [c1, c2, c3] + + root_task = Parent() + + master = MockKannon(max_child_jobs=2) + with self.assertLogs() as cm: + master.build(root_task) + + c1_task_info = master._gen_task_info(c1) + c2_task_info = master._gen_task_info(c2) + c3_task_info = master._gen_task_info(c3) + root_task_info = master._gen_task_info(root_task) + self.assertEqual( + cm.output, + [ + 'INFO:kannon.master:Creating task queue...', + f'INFO:kannon.master:Task {c1_task_info} is pushed to task queue', + f'INFO:kannon.master:Task {c2_task_info} is pushed to task queue', + f'INFO:kannon.master:Task {c3_task_info} is pushed to task queue', + f'INFO:kannon.master:Task {root_task_info} is pushed to task queue', + 'INFO:kannon.master:Total tasks in task queue: 4', + 'INFO:kannon.master:Consuming task queue...', + f'INFO:kannon.master:Checking if task {c1_task_info} is executable...', + f'INFO:kannon.master:Trying to run task {c1_task_info} on child job...', + f'INFO:kannon.master:Checking if task {c2_task_info} is executable...', + f'INFO:kannon.master:Trying to run task {c2_task_info} on child job...', + # c3 has to wait + f'INFO:kannon.master:Checking if task {c3_task_info} is executable...', + f'INFO:kannon.master:Reach max_child_jobs, waiting to run task {c3_task_info} on child job...', + f'INFO:kannon.master:Checking if task {root_task_info} is executable...', + f'INFO:kannon.master:Task {c1_task_info} is still running on child job.', + f'INFO:kannon.master:Task {c2_task_info} is still running on child job.', + f'INFO:kannon.master:Checking if task {c3_task_info} is executable...', + f'INFO:kannon.master:Reach max_child_jobs, waiting to run task {c3_task_info} on child job...', + f'INFO:kannon.master:Checking if task {root_task_info} is executable...', + f'INFO:kannon.master:Task {c1_task_info} is already completed.', + f'INFO:kannon.master:Task {c2_task_info} is already completed.', + # now c3 can start + f'INFO:kannon.master:Checking if task {c3_task_info} is executable...', + f'INFO:kannon.master:Trying to run task {c3_task_info} on child job...', + f'INFO:kannon.master:Checking if task {root_task_info} is executable...', + f'INFO:kannon.master:Task {c3_task_info} is still running on child job.', + f'INFO:kannon.master:Checking if task {root_task_info} is executable...', + f'INFO:kannon.master:Executing task {root_task_info} on master job...', + f'INFO:kannon.master:Completed task {root_task_info} on master job.', + f'INFO:kannon.master:Task {c3_task_info} is already completed.', + 'INFO:kannon.master:All tasks completed!', + ]) + if __name__ == '__main__': unittest.main() diff --git a/tox.ini b/tox.ini index 9fc4803..6bafbd0 100644 --- a/tox.ini +++ b/tox.ini @@ -5,7 +5,7 @@ isolated_build = true [testenv] allowlist_externals = coverage skip_install = true -commands = coverage run -m unittest discover -s test +commands = coverage run -m unittest discover -v [testenv:yapf] allowlist_externals = yapf