diff --git a/kannon/master.py b/kannon/master.py index 6d675cf..fc42067 100644 --- a/kannon/master.py +++ b/kannon/master.py @@ -1,9 +1,11 @@ +from __future__ import annotations + import logging import os from collections import deque from copy import deepcopy from time import sleep -from typing import Deque, Dict, List, Optional, Set +from typing import Optional, List import gokart from gokart.target import make_target @@ -29,6 +31,7 @@ def __init__( env_to_inherit: Optional[List[str]] = None, master_pod_name: Optional[str] = None, master_pod_uid: Optional[str] = None, + max_child_jobs: int | None = None, ) -> None: # validation if not os.path.exists(path_child_script): @@ -42,10 +45,15 @@ def __init__( if env_to_inherit is None: env_to_inherit = ["TASK_WORKSPACE_DIRECTORY"] self.env_to_inherit = env_to_inherit + self.master_pod_name = master_pod_name self.master_pod_uid = master_pod_uid - self.task_id_to_job_name: Dict[str, str] = dict() + if max_child_jobs is not None and max_child_jobs <= 0: + raise ValueError(f"max_child_jobs must be positive integer, but got {max_child_jobs}") + self.max_child_jobs = max_child_jobs + + self.task_id_to_job_name: dict[str, str] = dict() def build(self, root_task: gokart.TaskOnKart) -> None: # push tasks into queue @@ -53,15 +61,18 @@ def build(self, root_task: gokart.TaskOnKart) -> None: task_queue = self._create_task_queue(root_task) # consume task queue + running_task_ids: set[str] = set() logger.info("Consuming task queue...") while task_queue: task = task_queue.popleft() if task.complete(): logger.info(f"Task {self._gen_task_info(task)} is already completed.") + if task.make_unique_id() in running_task_ids: + running_task_ids.remove(task.make_unique_id()) continue - if task.make_unique_id() in self.task_id_to_job_name: + if task.make_unique_id() in running_task_ids: # check if task is still running on child job - assert self._check_child_task_status(task), f"Child task {self._gen_task_info(task)} failed." + self._check_child_task_status(task) logger.info(f"Task {self._gen_task_info(task)} is still running on child job.") task_queue.append(task) # re-enqueue task to check if it is done continue @@ -75,8 +86,13 @@ def build(self, root_task: gokart.TaskOnKart) -> None: continue # execute task if isinstance(task, TaskOnBullet): + if self.max_child_jobs is not None and len(running_task_ids) >= self.max_child_jobs: + task_queue.append(task) # re-enqueue task to check later + logger.info(f"Reach max_child_jobs, waiting to run task {self._gen_task_info(task)} on child job...") + continue logger.info(f"Trying to run task {self._gen_task_info(task)} on child job...") self._exec_bullet_task(task) + running_task_ids.add(task.make_unique_id()) # mark as already launched task task_queue.append(task) # re-enqueue task to check if it is done elif isinstance(task, gokart.TaskOnKart): logger.info(f"Executing task {self._gen_task_info(task)} on master job...") @@ -87,9 +103,9 @@ def build(self, root_task: gokart.TaskOnKart) -> None: logger.info("All tasks completed!") - def _create_task_queue(self, root_task: gokart.TaskOnKart) -> Deque[gokart.TaskOnKart]: - task_queue: Deque[gokart.TaskOnKart] = deque() - visited_task_ids: Set[str] = set() + def _create_task_queue(self, root_task: gokart.TaskOnKart) -> deque[gokart.TaskOnKart]: + task_queue: deque[gokart.TaskOnKart] = deque() + visited_task_ids: set[str] = set() def _rec_enqueue_task(task: gokart.TaskOnKart) -> None: """Traversal task tree in post-order to push tasks into task queue.""" @@ -180,7 +196,7 @@ def _gen_task_info(task: gokart.TaskOnKart) -> str: def _gen_pkl_path(task: gokart.TaskOnKart) -> str: return os.path.join(task.workspace_directory, 'kannon', f'task_obj_{task.make_unique_id()}.pkl') - def _check_child_task_status(self, task: TaskOnBullet) -> bool: + def _check_child_task_status(self, task: TaskOnBullet) -> None: if task.make_unique_id() not in self.task_id_to_job_name: raise ValueError(f"Task {self._gen_task_info(task)} is not found in `task_id_to_job_name`") job_name = self.task_id_to_job_name[task.make_unique_id()] @@ -191,7 +207,6 @@ def _check_child_task_status(self, task: TaskOnBullet) -> bool: ) if job_status == JobStatus.FAILED: raise RuntimeError(f"Task {self._gen_task_info(task)} on job {job_name} has failed.") - return True def _is_executable(self, task: gokart.TaskOnKart) -> bool: children = flatten(task.requires()) diff --git a/test/integration_test/test_master_build.py b/test/integration_test/test_master_build.py index 0534f04..128ea0b 100644 --- a/test/integration_test/test_master_build.py +++ b/test/integration_test/test_master_build.py @@ -42,13 +42,14 @@ def complete(self) -> bool: class MockKannon(Kannon): - def __init__(self) -> None: + def __init__(self, *, max_child_jobs: int | None = None) -> None: super().__init__( api_instance=None, template_job=client.V1Job(metadata=client.V1ObjectMeta()), job_prefix="", path_child_script=__file__, # just pass any existing file as dummy env_to_inherit=None, + max_child_jobs=max_child_jobs, ) def _exec_gokart_task(self, task: MockTaskOnKart) -> None: @@ -58,8 +59,8 @@ def _exec_bullet_task(self, task: MockTaskOnBullet) -> None: self.task_id_to_job_name[task.make_unique_id()] = "dummy_job_name" task.run() - def _check_child_task_status(self, task: MockTaskOnBullet) -> bool: - return True + def _check_child_task_status(self, task: MockTaskOnBullet) -> None: + return None def _is_executable(self, task: MockTaskOnKart) -> bool: children = flatten(task.requires()) @@ -177,6 +178,71 @@ def requires(self) -> list[Child]: 'INFO:kannon.master:All tasks completed!', ]) + def test_three_task_on_bullet_with_max_child_jobs(self) -> None: + self.maxDiff = None + + class Child(MockTaskOnBullet): + param = luigi.IntParameter() + + c1 = Child(param=1) + c1.wait_sec = 4 + c2 = Child(param=2) + c2.wait_sec = 3 + c3 = Child(param=3) + c3.wait_sec = 2 + + class Parent(MockTaskOnKart): + + def requires(self) -> list[Child]: + return [c1, c2, c3] + + root_task = Parent() + + master = MockKannon(max_child_jobs=2) + with self.assertLogs() as cm: + master.build(root_task) + + c1_task_info = master._gen_task_info(c1) + c2_task_info = master._gen_task_info(c2) + c3_task_info = master._gen_task_info(c3) + root_task_info = master._gen_task_info(root_task) + self.assertEqual( + cm.output, + [ + 'INFO:kannon.master:Creating task queue...', + f'INFO:kannon.master:Task {c1_task_info} is pushed to task queue', + f'INFO:kannon.master:Task {c2_task_info} is pushed to task queue', + f'INFO:kannon.master:Task {c3_task_info} is pushed to task queue', + f'INFO:kannon.master:Task {root_task_info} is pushed to task queue', + 'INFO:kannon.master:Total tasks in task queue: 4', + 'INFO:kannon.master:Consuming task queue...', + f'INFO:kannon.master:Checking if task {c1_task_info} is executable...', + f'INFO:kannon.master:Trying to run task {c1_task_info} on child job...', + f'INFO:kannon.master:Checking if task {c2_task_info} is executable...', + f'INFO:kannon.master:Trying to run task {c2_task_info} on child job...', + # c3 has to wait + f'INFO:kannon.master:Checking if task {c3_task_info} is executable...', + f'INFO:kannon.master:Reach max_child_jobs, waiting to run task {c3_task_info} on child job...', + f'INFO:kannon.master:Checking if task {root_task_info} is executable...', + f'INFO:kannon.master:Task {c1_task_info} is still running on child job.', + f'INFO:kannon.master:Task {c2_task_info} is still running on child job.', + f'INFO:kannon.master:Checking if task {c3_task_info} is executable...', + f'INFO:kannon.master:Reach max_child_jobs, waiting to run task {c3_task_info} on child job...', + f'INFO:kannon.master:Checking if task {root_task_info} is executable...', + f'INFO:kannon.master:Task {c1_task_info} is already completed.', + f'INFO:kannon.master:Task {c2_task_info} is already completed.', + # now c3 can start + f'INFO:kannon.master:Checking if task {c3_task_info} is executable...', + f'INFO:kannon.master:Trying to run task {c3_task_info} on child job...', + f'INFO:kannon.master:Checking if task {root_task_info} is executable...', + f'INFO:kannon.master:Task {c3_task_info} is still running on child job.', + f'INFO:kannon.master:Checking if task {root_task_info} is executable...', + f'INFO:kannon.master:Executing task {root_task_info} on master job...', + f'INFO:kannon.master:Completed task {root_task_info} on master job.', + f'INFO:kannon.master:Task {c3_task_info} is already completed.', + 'INFO:kannon.master:All tasks completed!', + ]) + if __name__ == '__main__': unittest.main() diff --git a/tox.ini b/tox.ini index 9fc4803..6bafbd0 100644 --- a/tox.ini +++ b/tox.ini @@ -5,7 +5,7 @@ isolated_build = true [testenv] allowlist_externals = coverage skip_install = true -commands = coverage run -m unittest discover -s test +commands = coverage run -m unittest discover -v [testenv:yapf] allowlist_externals = yapf