diff --git a/kannon/master.py b/kannon/master.py index 1d17cf8..ca32dd9 100644 --- a/kannon/master.py +++ b/kannon/master.py @@ -64,8 +64,10 @@ def build(self, root_task: gokart.TaskOnKart) -> None: running_task_ids.remove(task.make_unique_id()) continue if task.make_unique_id() in running_task_ids: + # check if task is still running on child job + self._check_child_task_status(task) logger.info(f"Task {self._gen_task_info(task)} is still running on child job.") - task_queue.append(task) + task_queue.append(task) # re-enqueue task to check if it is done continue # TODO: enable user to specify duration to sleep for each task @@ -136,8 +138,7 @@ def _exec_bullet_task(self, task: TaskOnBullet) -> None: ) create_job(self.api_instance, job, self.namespace) logger.info(f"Created child job {job_name} with task {self._gen_task_info(task)}") - task_unique_id = task.make_unique_id() - self.task_id_to_job_name[task_unique_id] = job_name + self.task_id_to_job_name[task.make_unique_id()] = job_name def _create_child_job_object(self, job_name: str, task_pkl_path: str) -> client.V1Job: # TODO: use python -c to avoid dependency to execute_task.py @@ -174,6 +175,18 @@ def _gen_task_info(task: gokart.TaskOnKart) -> str: def _gen_pkl_path(task: gokart.TaskOnKart) -> str: return os.path.join(task.workspace_directory, 'kannon', f'task_obj_{task.make_unique_id()}.pkl') + def _check_child_task_status(self, task: TaskOnBullet) -> None: + if task.make_unique_id() not in self.task_id_to_job_name: + raise ValueError(f"Task {self._gen_task_info(task)} is not found in `task_id_to_job_name`") + job_name = self.task_id_to_job_name[task.make_unique_id()] + job_status = get_job_status( + self.api_instance, + job_name, + self.namespace, + ) + if job_status == JobStatus.FAILED: + raise RuntimeError(f"Task {self._gen_task_info(task)} on job {job_name} has failed.") + def _is_executable(self, task: gokart.TaskOnKart) -> bool: children = flatten(task.requires()) diff --git a/test/integration_test/test_master_build.py b/test/integration_test/test_master_build.py index fa36a4d..128ea0b 100644 --- a/test/integration_test/test_master_build.py +++ b/test/integration_test/test_master_build.py @@ -7,6 +7,7 @@ import gokart import luigi from kubernetes import client +from luigi.task import flatten from kannon import Kannon, TaskOnBullet @@ -55,8 +56,19 @@ def _exec_gokart_task(self, task: MockTaskOnKart) -> None: task.run() def _exec_bullet_task(self, task: MockTaskOnBullet) -> None: + self.task_id_to_job_name[task.make_unique_id()] = "dummy_job_name" task.run() + def _check_child_task_status(self, task: MockTaskOnBullet) -> None: + return None + + def _is_executable(self, task: MockTaskOnKart) -> bool: + children = flatten(task.requires()) + for child in children: + if not child.complete(): + return False + return True + class TestConsumeTaskQueue(unittest.TestCase):