Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into feature/additional_jo…
Browse files Browse the repository at this point in the history
…b_check
  • Loading branch information
yutaro-oguri committed Dec 1, 2023
2 parents eb30b12 + ce84cd3 commit 0ab310b
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 9 deletions.
27 changes: 20 additions & 7 deletions kannon/master.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
from __future__ import annotations

import logging
import os
from collections import deque
from copy import deepcopy
from time import sleep
from typing import Deque, Dict, List, Optional, Set

import gokart
from gokart.target import make_target
Expand All @@ -26,7 +27,8 @@ def __init__(
# kannon resources
job_prefix: str,
path_child_script: str = "./run_child.py",
env_to_inherit: Optional[List[str]] = None,
env_to_inherit: list[str] | None = None,
max_child_jobs: int | None = None,
) -> None:
# validation
if not os.path.exists(path_child_script):
Expand All @@ -40,22 +42,28 @@ def __init__(
if env_to_inherit is None:
env_to_inherit = ["TASK_WORKSPACE_DIRECTORY"]
self.env_to_inherit = env_to_inherit
if max_child_jobs is not None and max_child_jobs <= 0:
raise ValueError(f"max_child_jobs must be positive integer, but got {max_child_jobs}")
self.max_child_jobs = max_child_jobs

self.task_id_to_job_name: Dict[str, str] = dict()
self.task_id_to_job_name: dict[str, str] = dict()

def build(self, root_task: gokart.TaskOnKart) -> None:
# push tasks into queue
logger.info("Creating task queue...")
task_queue = self._create_task_queue(root_task)

# consume task queue
running_task_ids: set[str] = set()
logger.info("Consuming task queue...")
while task_queue:
task = task_queue.popleft()
if task.complete():
logger.info(f"Task {self._gen_task_info(task)} is already completed.")
if task.make_unique_id() in running_task_ids:
running_task_ids.remove(task.make_unique_id())
continue
if task.make_unique_id() in self.task_id_to_job_name:
if task.make_unique_id() in running_task_ids:
# check if task is still running on child job
assert self._check_child_task_status(task), f"Child task {self._gen_task_info(task)} failed."
logger.info(f"Task {self._gen_task_info(task)} is still running on child job.")
Expand All @@ -71,8 +79,13 @@ def build(self, root_task: gokart.TaskOnKart) -> None:
continue
# execute task
if isinstance(task, TaskOnBullet):
if self.max_child_jobs is not None and len(running_task_ids) >= self.max_child_jobs:
task_queue.append(task) # re-enqueue task to check later
logger.info(f"Reach max_child_jobs, waiting to run task {self._gen_task_info(task)} on child job...")
continue
logger.info(f"Trying to run task {self._gen_task_info(task)} on child job...")
self._exec_bullet_task(task)
running_task_ids.add(task.make_unique_id()) # mark as already launched task
task_queue.append(task) # re-enqueue task to check if it is done
elif isinstance(task, gokart.TaskOnKart):
logger.info(f"Executing task {self._gen_task_info(task)} on master job...")
Expand All @@ -83,9 +96,9 @@ def build(self, root_task: gokart.TaskOnKart) -> None:

logger.info("All tasks completed!")

def _create_task_queue(self, root_task: gokart.TaskOnKart) -> Deque[gokart.TaskOnKart]:
task_queue: Deque[gokart.TaskOnKart] = deque()
visited_task_ids: Set[str] = set()
def _create_task_queue(self, root_task: gokart.TaskOnKart) -> deque[gokart.TaskOnKart]:
task_queue: deque[gokart.TaskOnKart] = deque()
visited_task_ids: set[str] = set()

def _rec_enqueue_task(task: gokart.TaskOnKart) -> None:
"""Traversal task tree in post-order to push tasks into task queue."""
Expand Down
68 changes: 67 additions & 1 deletion test/integration_test/test_master_build.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,14 @@ def complete(self) -> bool:

class MockKannon(Kannon):

def __init__(self) -> None:
def __init__(self, *, max_child_jobs: int | None = None) -> None:
super().__init__(
api_instance=None,
template_job=client.V1Job(metadata=client.V1ObjectMeta()),
job_prefix="",
path_child_script=__file__, # just pass any existing file as dummy
env_to_inherit=None,
max_child_jobs=max_child_jobs,
)

def _exec_gokart_task(self, task: MockTaskOnKart) -> None:
Expand Down Expand Up @@ -177,6 +178,71 @@ def requires(self) -> list[Child]:
'INFO:kannon.master:All tasks completed!',
])

def test_three_task_on_bullet_with_max_child_jobs(self) -> None:
self.maxDiff = None

class Child(MockTaskOnBullet):
param = luigi.IntParameter()

c1 = Child(param=1)
c1.wait_sec = 4
c2 = Child(param=2)
c2.wait_sec = 3
c3 = Child(param=3)
c3.wait_sec = 2

class Parent(MockTaskOnKart):

def requires(self) -> list[Child]:
return [c1, c2, c3]

root_task = Parent()

master = MockKannon(max_child_jobs=2)
with self.assertLogs() as cm:
master.build(root_task)

c1_task_info = master._gen_task_info(c1)
c2_task_info = master._gen_task_info(c2)
c3_task_info = master._gen_task_info(c3)
root_task_info = master._gen_task_info(root_task)
self.assertEqual(
cm.output,
[
'INFO:kannon.master:Creating task queue...',
f'INFO:kannon.master:Task {c1_task_info} is pushed to task queue',
f'INFO:kannon.master:Task {c2_task_info} is pushed to task queue',
f'INFO:kannon.master:Task {c3_task_info} is pushed to task queue',
f'INFO:kannon.master:Task {root_task_info} is pushed to task queue',
'INFO:kannon.master:Total tasks in task queue: 4',
'INFO:kannon.master:Consuming task queue...',
f'INFO:kannon.master:Checking if task {c1_task_info} is executable...',
f'INFO:kannon.master:Trying to run task {c1_task_info} on child job...',
f'INFO:kannon.master:Checking if task {c2_task_info} is executable...',
f'INFO:kannon.master:Trying to run task {c2_task_info} on child job...',
# c3 has to wait
f'INFO:kannon.master:Checking if task {c3_task_info} is executable...',
f'INFO:kannon.master:Reach max_child_jobs, waiting to run task {c3_task_info} on child job...',
f'INFO:kannon.master:Checking if task {root_task_info} is executable...',
f'INFO:kannon.master:Task {c1_task_info} is still running on child job.',
f'INFO:kannon.master:Task {c2_task_info} is still running on child job.',
f'INFO:kannon.master:Checking if task {c3_task_info} is executable...',
f'INFO:kannon.master:Reach max_child_jobs, waiting to run task {c3_task_info} on child job...',
f'INFO:kannon.master:Checking if task {root_task_info} is executable...',
f'INFO:kannon.master:Task {c1_task_info} is already completed.',
f'INFO:kannon.master:Task {c2_task_info} is already completed.',
# now c3 can start
f'INFO:kannon.master:Checking if task {c3_task_info} is executable...',
f'INFO:kannon.master:Trying to run task {c3_task_info} on child job...',
f'INFO:kannon.master:Checking if task {root_task_info} is executable...',
f'INFO:kannon.master:Task {c3_task_info} is still running on child job.',
f'INFO:kannon.master:Checking if task {root_task_info} is executable...',
f'INFO:kannon.master:Executing task {root_task_info} on master job...',
f'INFO:kannon.master:Completed task {root_task_info} on master job.',
f'INFO:kannon.master:Task {c3_task_info} is already completed.',
'INFO:kannon.master:All tasks completed!',
])


if __name__ == '__main__':
unittest.main()
2 changes: 1 addition & 1 deletion tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ isolated_build = true
[testenv]
allowlist_externals = coverage
skip_install = true
commands = coverage run -m unittest discover -s test
commands = coverage run -m unittest discover -v

[testenv:yapf]
allowlist_externals = yapf
Expand Down

0 comments on commit 0ab310b

Please sign in to comment.