From 370090789fc9eba1ee609dee76ef55bf8938e627 Mon Sep 17 00:00:00 2001 From: yutaro-oguri Date: Fri, 30 Jun 2023 13:52:31 +0900 Subject: [PATCH 01/30] Add a validation of job status --- kannon/master.py | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/kannon/master.py b/kannon/master.py index a210786..b70ccc7 100644 --- a/kannon/master.py +++ b/kannon/master.py @@ -49,16 +49,25 @@ def build(self, root_task: gokart.TaskOnKart) -> None: task_queue = self._create_task_queue(root_task) # consume task queue - launched_task_ids: Set[str] = set() + running_task_ids: Set[str] = set() logger.info("Consuming task queue...") while task_queue: task = task_queue.popleft() if task.complete(): logger.info(f"Task {self._gen_task_info(task)} is already completed.") continue - if task.make_unique_id() in launched_task_ids: + if task.make_unique_id() in running_task_ids: + # check if task is still running on child job + job_name = self.task_id_to_job_name[task.make_unique_id()] + job_status = get_job_status( + self.api_instance, + job_name, + self.namespace, + ) + if job_status == JobStatus.FAILED: + raise RuntimeError(f"Task {self._gen_task_info(task)} on job {job_name} has failed.") logger.info(f"Task {self._gen_task_info(task)} is still running on child job.") - task_queue.append(task) + task_queue.append(task) # re-enqueue task to check if it is done continue # TODO: enable user to specify duration to sleep for each task @@ -72,7 +81,7 @@ def build(self, root_task: gokart.TaskOnKart) -> None: if isinstance(task, TaskOnBullet): logger.info(f"Trying to run task {self._gen_task_info(task)} on child job...") self._exec_bullet_task(task) - launched_task_ids.add(task.make_unique_id()) # mark as already launched task + running_task_ids.add(task.make_unique_id()) # mark as already launched task task_queue.append(task) # re-enqueue task to check if it is done elif isinstance(task, gokart.TaskOnKart): logger.info(f"Executing task {self._gen_task_info(task)} on master job...") From 593a3e30853567ae5053e7c8f2191c36e4e81c8b Mon Sep 17 00:00:00 2001 From: yutaro-oguri Date: Fri, 30 Jun 2023 15:14:26 +0900 Subject: [PATCH 02/30] Add job status check --- kannon/master.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/kannon/master.py b/kannon/master.py index b70ccc7..d0da706 100644 --- a/kannon/master.py +++ b/kannon/master.py @@ -49,14 +49,13 @@ def build(self, root_task: gokart.TaskOnKart) -> None: task_queue = self._create_task_queue(root_task) # consume task queue - running_task_ids: Set[str] = set() logger.info("Consuming task queue...") while task_queue: task = task_queue.popleft() if task.complete(): logger.info(f"Task {self._gen_task_info(task)} is already completed.") continue - if task.make_unique_id() in running_task_ids: + if task.make_unique_id() in self.task_id_to_job_name: # check if task is still running on child job job_name = self.task_id_to_job_name[task.make_unique_id()] job_status = get_job_status( @@ -81,7 +80,6 @@ def build(self, root_task: gokart.TaskOnKart) -> None: if isinstance(task, TaskOnBullet): logger.info(f"Trying to run task {self._gen_task_info(task)} on child job...") self._exec_bullet_task(task) - running_task_ids.add(task.make_unique_id()) # mark as already launched task task_queue.append(task) # re-enqueue task to check if it is done elif isinstance(task, gokart.TaskOnKart): logger.info(f"Executing task {self._gen_task_info(task)} on master job...") @@ -134,8 +132,7 @@ def _exec_bullet_task(self, task: TaskOnBullet) -> None: ) create_job(self.api_instance, job, self.namespace) logger.info(f"Created child job {job_name} with task {self._gen_task_info(task)}") - task_unique_id = task.make_unique_id() - self.task_id_to_job_name[task_unique_id] = job_name + self.task_id_to_job_name[task.make_unique_id()] = job_name def _create_child_job_object(self, job_name: str, task_pkl_path: str) -> client.V1Job: # TODO: use python -c to avoid dependency to execute_task.py From a4994f9a3bccc7d703c6da04ae09e30561cc7ddf Mon Sep 17 00:00:00 2001 From: yutaro-oguri Date: Fri, 25 Aug 2023 15:00:28 +0900 Subject: [PATCH 03/30] Add owner reference --- kannon/master.py | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/kannon/master.py b/kannon/master.py index 8b68f88..49e277f 100644 --- a/kannon/master.py +++ b/kannon/master.py @@ -27,6 +27,8 @@ def __init__( job_prefix: str, path_child_script: str = "./run_child.py", env_to_inherit: Optional[List[str]] = None, + master_pod_name: Optional[str] = None, + master_pod_uid: Optional[str] = None, ) -> None: # validation if not os.path.exists(path_child_script): @@ -40,6 +42,8 @@ def __init__( if env_to_inherit is None: env_to_inherit = ["TASK_WORKSPACE_DIRECTORY"] self.env_to_inherit = env_to_inherit + self.master_pod_name = master_pod_name + self.master_pod_uid = master_pod_uid self.task_id_to_job_name: Dict[str, str] = dict() @@ -158,6 +162,18 @@ def _create_child_job_object(self, job_name: str, task_pkl_path: str) -> client. job.spec.template.spec.containers[0].env = child_envs # replace job name job.metadata.name = job_name + # add owner reference from child to parent if master pod info is available + if self.master_pod_name and self.master_pod_uid: + owner_reference = client.V1OwnerReference( + api_version="batch/v1", + kind="Pod", + name=self.master_pod_name, # owner pod name + uid=self.master_pod_uid, # owner pod uid + ) + if job.metadata.owner_references: + job.metadata.owner_references.append(owner_reference) + else: + job.metadata.owner_references = [owner_reference] return job From f0a875c967417d16c1e1ed2c92ccabc0d90ea560 Mon Sep 17 00:00:00 2001 From: yutaro-oguri Date: Fri, 25 Aug 2023 16:41:50 +0900 Subject: [PATCH 04/30] Add warning when owner reference is not set --- kannon/master.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/kannon/master.py b/kannon/master.py index 49e277f..400cc00 100644 --- a/kannon/master.py +++ b/kannon/master.py @@ -174,6 +174,8 @@ def _create_child_job_object(self, job_name: str, task_pkl_path: str) -> client. job.metadata.owner_references.append(owner_reference) else: job.metadata.owner_references = [owner_reference] + else: + logger.warning("Owner reference is not set because master pod info is not provided.") return job From 8108568bee724585cf75f727f8bae7483ca877f5 Mon Sep 17 00:00:00 2001 From: yutaro-oguri Date: Thu, 31 Aug 2023 11:52:37 +0900 Subject: [PATCH 05/30] Inject config param --- kannon/config_params.py | 47 +++++++++++++++++++++++++++++++++++++++++ kannon/master.py | 5 +++++ 2 files changed, 52 insertions(+) create mode 100644 kannon/config_params.py diff --git a/kannon/config_params.py b/kannon/config_params.py new file mode 100644 index 0000000..6ce0004 --- /dev/null +++ b/kannon/config_params.py @@ -0,0 +1,47 @@ +from typing import Any, Dict, Optional, Type + +import gokart.config_params +import luigi + + +class inherits_config_params: + + def __init__(self, config_class: luigi.Config, parameter_alias: Optional[Dict[str, str]] = None): + """ + Decorates task to inherit parameter value of `config_class`. + + * config_class: Inherit parameter value of this task to decorated task. Only parameter values exist in both tasks are inherited. + * parameter_alias: Dictionary to map paramter names between config_class task and decorated task. + key: config_class's parameter name. value: decorated task's parameter name. + """ + + self._config_class: luigi.Config = config_class + self._parameter_alias: Dict[str, str] = parameter_alias if parameter_alias is not None else {} + + def __call__(self, task_class: Type[gokart.TaskOnKart]): # type: ignore + # wrap task to prevent task name from being changed + @luigi.task._task_wraps(task_class) + class Wrapped(task_class): + __is_decorated_inherits_config_params = True + __config_params: Dict[str, Any] = dict() + + @classmethod + def inject_config_params(cls) -> None: + for param_key, param_value in self._config_class().param_kwargs.items(): + task_param_key = self._parameter_alias.get(param_key, param_key) + + if hasattr(cls, task_param_key): + cls.__config_params[task_param_key] = param_value + + @classmethod + def get_param_values(cls, params, args, kwargs): # type: ignore + # fill from config params that are also included in kwargs + for param_key, param_value in cls.__config_params.items(): + task_param_key = self._parameter_alias.get(param_key, param_key) + + if task_param_key not in kwargs: + kwargs[task_param_key] = param_value + + return super(Wrapped, cls).get_param_values(params, args, kwargs) + + return Wrapped diff --git a/kannon/master.py b/kannon/master.py index 400cc00..b01ef96 100644 --- a/kannon/master.py +++ b/kannon/master.py @@ -125,6 +125,11 @@ def _exec_gokart_task(self, task: gokart.TaskOnKart) -> None: raise RuntimeError(f"Task {self._gen_task_info(task)} on job master has failed.") def _exec_bullet_task(self, task: TaskOnBullet) -> None: + # If task is decorated with inherits_config_params, then unwrap it. + if hasattr(task, "__is_decorated_inherits_config_params"): + assert task.__is_decorated_inherits_config_params + logger.info(f"Task {self._gen_task_info(task)} is decorated with inherits_config_params") + task.inject_config_params() # inject config param on master job # Save task instance as pickle object pkl_path = self._gen_pkl_path(task) make_target(pkl_path).dump(task) From 2ff41e9b66c462c03299dc6d6acf81ef1cd3c54c Mon Sep 17 00:00:00 2001 From: yutaro-oguri Date: Thu, 31 Aug 2023 12:36:44 +0900 Subject: [PATCH 06/30] Fix config params --- kannon/config_params.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/kannon/config_params.py b/kannon/config_params.py index 6ce0004..be161c8 100644 --- a/kannon/config_params.py +++ b/kannon/config_params.py @@ -30,8 +30,8 @@ def inject_config_params(cls) -> None: for param_key, param_value in self._config_class().param_kwargs.items(): task_param_key = self._parameter_alias.get(param_key, param_key) - if hasattr(cls, task_param_key): - cls.__config_params[task_param_key] = param_value + # if hasattr(cls, task_param_key): + cls.__config_params[task_param_key] = param_value @classmethod def get_param_values(cls, params, args, kwargs): # type: ignore From 0546b93a0693c4d27c2482dfd6631935628de68c Mon Sep 17 00:00:00 2001 From: yutaro-oguri Date: Thu, 31 Aug 2023 15:07:03 +0900 Subject: [PATCH 07/30] Fix --- kannon/config_params.py | 16 +++++----------- 1 file changed, 5 insertions(+), 11 deletions(-) diff --git a/kannon/config_params.py b/kannon/config_params.py index be161c8..7be6197 100644 --- a/kannon/config_params.py +++ b/kannon/config_params.py @@ -1,6 +1,6 @@ from typing import Any, Dict, Optional, Type -import gokart.config_params +import gokart import luigi @@ -30,18 +30,12 @@ def inject_config_params(cls) -> None: for param_key, param_value in self._config_class().param_kwargs.items(): task_param_key = self._parameter_alias.get(param_key, param_key) - # if hasattr(cls, task_param_key): - cls.__config_params[task_param_key] = param_value + if hasattr(cls, task_param_key) and task_param_key not in cls.__config_params: + cls.__config_params[task_param_key] = param_value @classmethod def get_param_values(cls, params, args, kwargs): # type: ignore - # fill from config params that are also included in kwargs - for param_key, param_value in cls.__config_params.items(): - task_param_key = self._parameter_alias.get(param_key, param_key) - - if task_param_key not in kwargs: - kwargs[task_param_key] = param_value - - return super(Wrapped, cls).get_param_values(params, args, kwargs) + cls.__config_params.update(kwargs) + return super(Wrapped, cls).get_param_values(params, args, cls.__config_params) return Wrapped From f52dda63f2abc45282d3e9aa3b18caf563ef25ad Mon Sep 17 00:00:00 2001 From: yutaro-oguri Date: Thu, 31 Aug 2023 15:50:43 +0900 Subject: [PATCH 08/30] Fix where to check inheritance --- kannon/config_params.py | 9 +++++---- kannon/master.py | 11 ++++++----- 2 files changed, 11 insertions(+), 9 deletions(-) diff --git a/kannon/config_params.py b/kannon/config_params.py index 7be6197..3daa966 100644 --- a/kannon/config_params.py +++ b/kannon/config_params.py @@ -1,3 +1,4 @@ +from copy import deepcopy from typing import Any, Dict, Optional, Type import gokart @@ -29,13 +30,13 @@ class Wrapped(task_class): def inject_config_params(cls) -> None: for param_key, param_value in self._config_class().param_kwargs.items(): task_param_key = self._parameter_alias.get(param_key, param_key) - - if hasattr(cls, task_param_key) and task_param_key not in cls.__config_params: + if hasattr(cls, task_param_key): cls.__config_params[task_param_key] = param_value @classmethod def get_param_values(cls, params, args, kwargs): # type: ignore - cls.__config_params.update(kwargs) - return super(Wrapped, cls).get_param_values(params, args, cls.__config_params) + kwargs_dict = deepcopy(cls.__config_params) + kwargs_dict.update(kwargs) + return super(Wrapped, cls).get_param_values(params, args, kwargs_dict) return Wrapped diff --git a/kannon/master.py b/kannon/master.py index b01ef96..875e8c5 100644 --- a/kannon/master.py +++ b/kannon/master.py @@ -51,6 +51,12 @@ def build(self, root_task: gokart.TaskOnKart) -> None: # push tasks into queue logger.info("Creating task queue...") task_queue = self._create_task_queue(root_task) + for task in task_queue: + # If task is decorated with inherits_config_params, then unwrap it. + if hasattr(task, "__is_decorated_inherits_config_params"): + assert task.__is_decorated_inherits_config_params + logger.info(f"Task {self._gen_task_info(task)} is decorated with inherits_config_params") + task.inject_config_params() # inject config param on master job # consume task queue logger.info("Consuming task queue...") @@ -125,11 +131,6 @@ def _exec_gokart_task(self, task: gokart.TaskOnKart) -> None: raise RuntimeError(f"Task {self._gen_task_info(task)} on job master has failed.") def _exec_bullet_task(self, task: TaskOnBullet) -> None: - # If task is decorated with inherits_config_params, then unwrap it. - if hasattr(task, "__is_decorated_inherits_config_params"): - assert task.__is_decorated_inherits_config_params - logger.info(f"Task {self._gen_task_info(task)} is decorated with inherits_config_params") - task.inject_config_params() # inject config param on master job # Save task instance as pickle object pkl_path = self._gen_pkl_path(task) make_target(pkl_path).dump(task) From 5985372d719b4920f8a15815502d590aac098805 Mon Sep 17 00:00:00 2001 From: yutaro-oguri Date: Thu, 31 Aug 2023 16:22:16 +0900 Subject: [PATCH 09/30] Inject in get_param_values --- kannon/config_params.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/kannon/config_params.py b/kannon/config_params.py index 3daa966..757c597 100644 --- a/kannon/config_params.py +++ b/kannon/config_params.py @@ -35,6 +35,11 @@ def inject_config_params(cls) -> None: @classmethod def get_param_values(cls, params, args, kwargs): # type: ignore + try: + # if config params can be injected, then inject + cls.inject_config_params() + except Exception: + pass kwargs_dict = deepcopy(cls.__config_params) kwargs_dict.update(kwargs) return super(Wrapped, cls).get_param_values(params, args, kwargs_dict) From 6aa926322c0860372ba56428ecaa4e8997a5312d Mon Sep 17 00:00:00 2001 From: yutaro-oguri Date: Thu, 31 Aug 2023 17:05:58 +0900 Subject: [PATCH 10/30] update --- kannon/config_params.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/kannon/config_params.py b/kannon/config_params.py index 757c597..6ba0219 100644 --- a/kannon/config_params.py +++ b/kannon/config_params.py @@ -41,7 +41,7 @@ def get_param_values(cls, params, args, kwargs): # type: ignore except Exception: pass kwargs_dict = deepcopy(cls.__config_params) - kwargs_dict.update(kwargs) - return super(Wrapped, cls).get_param_values(params, args, kwargs_dict) + kwargs.update(kwargs_dict) + return super(Wrapped, cls).get_param_values(params, args, kwargs) return Wrapped From 3d2cb23d3fa55e861c23858041edc739f4a798d8 Mon Sep 17 00:00:00 2001 From: yutaro-oguri Date: Thu, 31 Aug 2023 18:03:48 +0900 Subject: [PATCH 11/30] Set injection flag --- kannon/config_params.py | 17 ++++++++++------- kannon/master.py | 11 +++++------ 2 files changed, 15 insertions(+), 13 deletions(-) diff --git a/kannon/config_params.py b/kannon/config_params.py index 6ba0219..2a1134b 100644 --- a/kannon/config_params.py +++ b/kannon/config_params.py @@ -1,4 +1,3 @@ -from copy import deepcopy from typing import Any, Dict, Optional, Type import gokart @@ -25,9 +24,11 @@ def __call__(self, task_class: Type[gokart.TaskOnKart]): # type: ignore class Wrapped(task_class): __is_decorated_inherits_config_params = True __config_params: Dict[str, Any] = dict() + __do_injection = True @classmethod def inject_config_params(cls) -> None: + cls.__config_params.clear() # clear config param cache for param_key, param_value in self._config_class().param_kwargs.items(): task_param_key = self._parameter_alias.get(param_key, param_key) if hasattr(cls, task_param_key): @@ -35,13 +36,15 @@ def inject_config_params(cls) -> None: @classmethod def get_param_values(cls, params, args, kwargs): # type: ignore - try: - # if config params can be injected, then inject + if cls.__do_injection: cls.inject_config_params() - except Exception: - pass - kwargs_dict = deepcopy(cls.__config_params) - kwargs.update(kwargs_dict) + for param_key, param_value in cls.__config_params.items(): + if hasattr(cls, param_key) and param_key not in kwargs: + kwargs[param_key] = param_value return super(Wrapped, cls).get_param_values(params, args, kwargs) + @classmethod + def set_injection_flag(cls, flag: bool): # type: ignore + cls.__do_injection = flag + return Wrapped diff --git a/kannon/master.py b/kannon/master.py index 875e8c5..ff6ca75 100644 --- a/kannon/master.py +++ b/kannon/master.py @@ -51,12 +51,6 @@ def build(self, root_task: gokart.TaskOnKart) -> None: # push tasks into queue logger.info("Creating task queue...") task_queue = self._create_task_queue(root_task) - for task in task_queue: - # If task is decorated with inherits_config_params, then unwrap it. - if hasattr(task, "__is_decorated_inherits_config_params"): - assert task.__is_decorated_inherits_config_params - logger.info(f"Task {self._gen_task_info(task)} is decorated with inherits_config_params") - task.inject_config_params() # inject config param on master job # consume task queue logger.info("Consuming task queue...") @@ -131,6 +125,11 @@ def _exec_gokart_task(self, task: gokart.TaskOnKart) -> None: raise RuntimeError(f"Task {self._gen_task_info(task)} on job master has failed.") def _exec_bullet_task(self, task: TaskOnBullet) -> None: + # If task is decorated with inherits_config_params, then unwrap it. + if hasattr(task, "__is_decorated_inherits_config_params"): + assert task.__is_decorated_inherits_config_params + logger.info(f"Task {self._gen_task_info(task)} is decorated with inherits_config_params") + task.set_injection_flag(False) # Save task instance as pickle object pkl_path = self._gen_pkl_path(task) make_target(pkl_path).dump(task) From d85fe81b5c72954a8dd470e6f77503376b231a8d Mon Sep 17 00:00:00 2001 From: yutaro-oguri Date: Fri, 1 Sep 2023 00:09:43 +0900 Subject: [PATCH 12/30] Update kannon --- kannon/config_params.py | 2 +- kannon/master.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/kannon/config_params.py b/kannon/config_params.py index 2a1134b..1dca766 100644 --- a/kannon/config_params.py +++ b/kannon/config_params.py @@ -22,7 +22,7 @@ def __call__(self, task_class: Type[gokart.TaskOnKart]): # type: ignore # wrap task to prevent task name from being changed @luigi.task._task_wraps(task_class) class Wrapped(task_class): - __is_decorated_inherits_config_params = True + is_decorated_inherits_config_params = True __config_params: Dict[str, Any] = dict() __do_injection = True diff --git a/kannon/master.py b/kannon/master.py index ff6ca75..3a18fc1 100644 --- a/kannon/master.py +++ b/kannon/master.py @@ -126,8 +126,8 @@ def _exec_gokart_task(self, task: gokart.TaskOnKart) -> None: def _exec_bullet_task(self, task: TaskOnBullet) -> None: # If task is decorated with inherits_config_params, then unwrap it. - if hasattr(task, "__is_decorated_inherits_config_params"): - assert task.__is_decorated_inherits_config_params + if hasattr(task, "is_decorated_inherits_config_params"): + assert task.is_decorated_inherits_config_params logger.info(f"Task {self._gen_task_info(task)} is decorated with inherits_config_params") task.set_injection_flag(False) # Save task instance as pickle object From 3542d459e4011d3b60cf7a19bee12fc86fe6af04 Mon Sep 17 00:00:00 2001 From: yutaro-oguri Date: Thu, 7 Sep 2023 11:08:21 +0900 Subject: [PATCH 13/30] inject before submitting --- kannon/master.py | 1 + 1 file changed, 1 insertion(+) diff --git a/kannon/master.py b/kannon/master.py index 3a18fc1..babec4a 100644 --- a/kannon/master.py +++ b/kannon/master.py @@ -129,6 +129,7 @@ def _exec_bullet_task(self, task: TaskOnBullet) -> None: if hasattr(task, "is_decorated_inherits_config_params"): assert task.is_decorated_inherits_config_params logger.info(f"Task {self._gen_task_info(task)} is decorated with inherits_config_params") + task.inject_config_params() task.set_injection_flag(False) # Save task instance as pickle object pkl_path = self._gen_pkl_path(task) From 12c850b3a9861196d77114c11bcde72abdbffc79 Mon Sep 17 00:00:00 2001 From: yutaro-oguri Date: Thu, 7 Sep 2023 14:01:41 +0900 Subject: [PATCH 14/30] debug msg --- kannon/master.py | 1 + 1 file changed, 1 insertion(+) diff --git a/kannon/master.py b/kannon/master.py index babec4a..ac6949d 100644 --- a/kannon/master.py +++ b/kannon/master.py @@ -126,6 +126,7 @@ def _exec_gokart_task(self, task: gokart.TaskOnKart) -> None: def _exec_bullet_task(self, task: TaskOnBullet) -> None: # If task is decorated with inherits_config_params, then unwrap it. + logger.debug(f"Task.type = {type(task)}") if hasattr(task, "is_decorated_inherits_config_params"): assert task.is_decorated_inherits_config_params logger.info(f"Task {self._gen_task_info(task)} is decorated with inherits_config_params") From df8be769be9c4a18ebd20a72de71b65a9ef20f3a Mon Sep 17 00:00:00 2001 From: yutaro-oguri Date: Thu, 7 Sep 2023 15:45:48 +0900 Subject: [PATCH 15/30] debug --- kannon/master.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kannon/master.py b/kannon/master.py index ac6949d..9800d82 100644 --- a/kannon/master.py +++ b/kannon/master.py @@ -126,7 +126,7 @@ def _exec_gokart_task(self, task: gokart.TaskOnKart) -> None: def _exec_bullet_task(self, task: TaskOnBullet) -> None: # If task is decorated with inherits_config_params, then unwrap it. - logger.debug(f"Task.type = {type(task)}") + logger.info(f"Task on bullet type = {type(task)}") if hasattr(task, "is_decorated_inherits_config_params"): assert task.is_decorated_inherits_config_params logger.info(f"Task {self._gen_task_info(task)} is decorated with inherits_config_params") From 56fc57eeb9b4665463289f7e41a7c2113c605915 Mon Sep 17 00:00:00 2001 From: yutaro-oguri Date: Thu, 7 Sep 2023 17:52:49 +0900 Subject: [PATCH 16/30] Change injection flag --- kannon/master.py | 20 +++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/kannon/master.py b/kannon/master.py index 9800d82..b0913f5 100644 --- a/kannon/master.py +++ b/kannon/master.py @@ -51,6 +51,15 @@ def build(self, root_task: gokart.TaskOnKart) -> None: # push tasks into queue logger.info("Creating task queue...") task_queue = self._create_task_queue(root_task) + + # flip flag + for task in task_queue: + try: + task.set_injection_flag(False) + logger.info("Task is decorated with inherits_config_params.") + except: + pass + # logger.info("Task is not decorated with inherits_config_params.") # consume task queue logger.info("Consuming task queue...") @@ -127,11 +136,12 @@ def _exec_gokart_task(self, task: gokart.TaskOnKart) -> None: def _exec_bullet_task(self, task: TaskOnBullet) -> None: # If task is decorated with inherits_config_params, then unwrap it. logger.info(f"Task on bullet type = {type(task)}") - if hasattr(task, "is_decorated_inherits_config_params"): - assert task.is_decorated_inherits_config_params - logger.info(f"Task {self._gen_task_info(task)} is decorated with inherits_config_params") - task.inject_config_params() - task.set_injection_flag(False) + + # if hasattr(task, "is_decorated_inherits_config_params"): + # assert task.is_decorated_inherits_config_params + # logger.info(f"Task {self._gen_task_info(task)} is decorated with inherits_config_params") + # task.inject_config_params() + # task.set_injection_flag(False) # Save task instance as pickle object pkl_path = self._gen_pkl_path(task) make_target(pkl_path).dump(task) From 24dc43d4ceee515272d3f997fda8c2ac176be696 Mon Sep 17 00:00:00 2001 From: yutaro-oguri Date: Thu, 7 Sep 2023 19:13:26 +0900 Subject: [PATCH 17/30] fix --- kannon/master.py | 23 +++++++---------------- 1 file changed, 7 insertions(+), 16 deletions(-) diff --git a/kannon/master.py b/kannon/master.py index b0913f5..4440437 100644 --- a/kannon/master.py +++ b/kannon/master.py @@ -51,15 +51,6 @@ def build(self, root_task: gokart.TaskOnKart) -> None: # push tasks into queue logger.info("Creating task queue...") task_queue = self._create_task_queue(root_task) - - # flip flag - for task in task_queue: - try: - task.set_injection_flag(False) - logger.info("Task is decorated with inherits_config_params.") - except: - pass - # logger.info("Task is not decorated with inherits_config_params.") # consume task queue logger.info("Consuming task queue...") @@ -92,7 +83,13 @@ def build(self, root_task: gokart.TaskOnKart) -> None: # execute task if isinstance(task, TaskOnBullet): logger.info(f"Trying to run task {self._gen_task_info(task)} on child job...") - self._exec_bullet_task(task) + if hasattr(task, "is_decorated_inherits_config_params"): + task.set_injection_flag(False) + logger.info(f"Task {self._gen_task_info(task)} is decorated with inherits_config_params.") + self._exec_bullet_task(task) + task.set_injection_flag(True) + else: + self._exec_bullet_task(task) task_queue.append(task) # re-enqueue task to check if it is done elif isinstance(task, gokart.TaskOnKart): logger.info(f"Executing task {self._gen_task_info(task)} on master job...") @@ -136,12 +133,6 @@ def _exec_gokart_task(self, task: gokart.TaskOnKart) -> None: def _exec_bullet_task(self, task: TaskOnBullet) -> None: # If task is decorated with inherits_config_params, then unwrap it. logger.info(f"Task on bullet type = {type(task)}") - - # if hasattr(task, "is_decorated_inherits_config_params"): - # assert task.is_decorated_inherits_config_params - # logger.info(f"Task {self._gen_task_info(task)} is decorated with inherits_config_params") - # task.inject_config_params() - # task.set_injection_flag(False) # Save task instance as pickle object pkl_path = self._gen_pkl_path(task) make_target(pkl_path).dump(task) From a6742199c86c27c306d04be3f36245912baca7df Mon Sep 17 00:00:00 2001 From: yutaro-oguri Date: Thu, 7 Sep 2023 19:17:36 +0900 Subject: [PATCH 18/30] fix --- kannon/master.py | 1 + 1 file changed, 1 insertion(+) diff --git a/kannon/master.py b/kannon/master.py index 4440437..f92a46d 100644 --- a/kannon/master.py +++ b/kannon/master.py @@ -84,6 +84,7 @@ def build(self, root_task: gokart.TaskOnKart) -> None: if isinstance(task, TaskOnBullet): logger.info(f"Trying to run task {self._gen_task_info(task)} on child job...") if hasattr(task, "is_decorated_inherits_config_params"): + task.inject_config_params() task.set_injection_flag(False) logger.info(f"Task {self._gen_task_info(task)} is decorated with inherits_config_params.") self._exec_bullet_task(task) From 8fda80380bc0737d3bf3dabb98fde693ee4f5508 Mon Sep 17 00:00:00 2001 From: yutaro-oguri Date: Fri, 29 Sep 2023 17:03:01 +0900 Subject: [PATCH 19/30] Set inject flag false for gokart.task --- kannon/master.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/kannon/master.py b/kannon/master.py index f92a46d..36eb37f 100644 --- a/kannon/master.py +++ b/kannon/master.py @@ -85,7 +85,7 @@ def build(self, root_task: gokart.TaskOnKart) -> None: logger.info(f"Trying to run task {self._gen_task_info(task)} on child job...") if hasattr(task, "is_decorated_inherits_config_params"): task.inject_config_params() - task.set_injection_flag(False) + task.set_injection_flag(False) # prevent injection in child pod logger.info(f"Task {self._gen_task_info(task)} is decorated with inherits_config_params.") self._exec_bullet_task(task) task.set_injection_flag(True) @@ -94,7 +94,13 @@ def build(self, root_task: gokart.TaskOnKart) -> None: task_queue.append(task) # re-enqueue task to check if it is done elif isinstance(task, gokart.TaskOnKart): logger.info(f"Executing task {self._gen_task_info(task)} on master job...") - self._exec_gokart_task(task) + if hasattr(task, "is_decorated_inherits_config_params"): + logger.info(f"Task {self._gen_task_info(task)} is decorated with inherits_config_params.") + task.set_injection_flag(True) # inject in master pod + self._exec_gokart_task(task) + task.set_injection_flag(False) # prevent injection in child pod + else: + self._exec_gokart_task(task) logger.info(f"Completed task {self._gen_task_info(task)} on master job.") else: raise TypeError(f"Invalid task type: {type(task)}") From 3b7e4a3fdc1be5499515664c50040a2b55a81a6a Mon Sep 17 00:00:00 2001 From: yutaro-oguri Date: Thu, 19 Oct 2023 15:50:44 +0900 Subject: [PATCH 20/30] Pass config files via remote cache --- example/run_child.py | 5 ++++- kannon/master.py | 44 ++++++++++++++++++++++++++------------------ 2 files changed, 30 insertions(+), 19 deletions(-) diff --git a/example/run_child.py b/example/run_child.py index 0757871..8730927 100644 --- a/example/run_child.py +++ b/example/run_child.py @@ -1,5 +1,6 @@ """ This script requires to be defined by user. """ import logging +from typing import List import fire import gokart @@ -9,9 +10,11 @@ logging.basicConfig(level=logging.INFO) -def main(task_pkl_path: str) -> None: +def main(task_pkl_path: str, remote_config_paths: List[str]) -> None: # Load luigi config luigi.configuration.LuigiConfigParser.add_config_path("./conf/base.ini") + for path in remote_config_paths: + luigi.configuration.LuigiConfigParser.add_config_path(path) # Parse a serialized gokart.TaskOnKart task: gokart.TaskOnKart = make_target(task_pkl_path).load() diff --git a/kannon/master.py b/kannon/master.py index 36eb37f..336d6f0 100644 --- a/kannon/master.py +++ b/kannon/master.py @@ -6,6 +6,7 @@ from typing import Deque, Dict, List, Optional, Set import gokart +import luigi from gokart.target import make_target from kubernetes import client from luigi.task import flatten @@ -48,6 +49,19 @@ def __init__( self.task_id_to_job_name: Dict[str, str] = dict() def build(self, root_task: gokart.TaskOnKart) -> None: + # check all config file paths exists + luigi_parser_instance = luigi.configuration.get_config() + config_paths = luigi_parser_instance._config_paths + for config_path in config_paths: + assert os.path.exists(config_path), f"Config file {config_path} does not exits." + # save configs to remote cache + workspace_dir = os.environ.get("TASK_WORKSPACE_DIRECTORY") + remote_config_dir = os.path.join(workspace_dir, "kannon", "conf") + remote_config_paths = [os.path.join(remote_config_dir, os.path.basename(config_path)) for config_path in config_paths] + for remote_config_path in remote_config_paths: + with open(config_path, "r") as f: + make_target(remote_config_path).dump(f) + # push tasks into queue logger.info("Creating task queue...") task_queue = self._create_task_queue(root_task) @@ -83,24 +97,11 @@ def build(self, root_task: gokart.TaskOnKart) -> None: # execute task if isinstance(task, TaskOnBullet): logger.info(f"Trying to run task {self._gen_task_info(task)} on child job...") - if hasattr(task, "is_decorated_inherits_config_params"): - task.inject_config_params() - task.set_injection_flag(False) # prevent injection in child pod - logger.info(f"Task {self._gen_task_info(task)} is decorated with inherits_config_params.") - self._exec_bullet_task(task) - task.set_injection_flag(True) - else: - self._exec_bullet_task(task) + self._exec_bullet_task(task, remote_config_paths) task_queue.append(task) # re-enqueue task to check if it is done elif isinstance(task, gokart.TaskOnKart): logger.info(f"Executing task {self._gen_task_info(task)} on master job...") - if hasattr(task, "is_decorated_inherits_config_params"): - logger.info(f"Task {self._gen_task_info(task)} is decorated with inherits_config_params.") - task.set_injection_flag(True) # inject in master pod - self._exec_gokart_task(task) - task.set_injection_flag(False) # prevent injection in child pod - else: - self._exec_gokart_task(task) + self._exec_gokart_task(task) logger.info(f"Completed task {self._gen_task_info(task)} on master job.") else: raise TypeError(f"Invalid task type: {type(task)}") @@ -137,8 +138,7 @@ def _exec_gokart_task(self, task: gokart.TaskOnKart) -> None: except Exception: raise RuntimeError(f"Task {self._gen_task_info(task)} on job master has failed.") - def _exec_bullet_task(self, task: TaskOnBullet) -> None: - # If task is decorated with inherits_config_params, then unwrap it. + def _exec_bullet_task(self, task: TaskOnBullet, remote_config_paths: list) -> None: logger.info(f"Task on bullet type = {type(task)}") # Save task instance as pickle object pkl_path = self._gen_pkl_path(task) @@ -148,18 +148,26 @@ def _exec_bullet_task(self, task: TaskOnBullet) -> None: job = self._create_child_job_object( job_name=job_name, task_pkl_path=pkl_path, + remote_config_paths=remote_config_paths, ) create_job(self.api_instance, job, self.namespace) logger.info(f"Created child job {job_name} with task {self._gen_task_info(task)}") self.task_id_to_job_name[task.make_unique_id()] = job_name - def _create_child_job_object(self, job_name: str, task_pkl_path: str) -> client.V1Job: + def _create_child_job_object( + self, + job_name: str, + task_pkl_path: str, + remote_config_paths: str, + ) -> client.V1Job: # TODO: use python -c to avoid dependency to execute_task.py cmd = [ "python", self.path_child_script, "--task-pkl-path", f"'{task_pkl_path}'", + "--remote-config-paths", + ",".join(remote_config_paths), ] job = deepcopy(self.template_job) # replace command From 5aaf51ca9660661f4b62bfe1676e2e146d42bfd6 Mon Sep 17 00:00:00 2001 From: yutaro-oguri Date: Thu, 19 Oct 2023 16:55:23 +0900 Subject: [PATCH 21/30] Update gokart dep --- poetry.lock | 111 +++++++------------------------------------------ pyproject.toml | 2 +- 2 files changed, 17 insertions(+), 96 deletions(-) diff --git a/poetry.lock b/poetry.lock index c710d7f..5647e5a 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,10 +1,9 @@ -# This file is automatically @generated by Poetry 1.4.2 and should not be changed by hand. +# This file is automatically @generated by Poetry 1.6.1 and should not be changed by hand. [[package]] name = "apscheduler" version = "3.10.1" description = "In-process task scheduler with Cron-like capabilities" -category = "main" optional = false python-versions = ">=3.6" files = [ @@ -16,7 +15,7 @@ files = [ pytz = "*" setuptools = ">=0.7" six = ">=1.4.0" -tzlocal = ">=2.0,<3.0.0 || >=4.0.0" +tzlocal = ">=2.0,<3.dev0 || >=4.dev0" [package.extras] doc = ["sphinx", "sphinx-rtd-theme"] @@ -34,7 +33,6 @@ zookeeper = ["kazoo"] name = "async-timeout" version = "4.0.2" description = "Timeout context manager for asyncio programs" -category = "main" optional = false python-versions = ">=3.6" files = [ @@ -46,7 +44,6 @@ files = [ name = "backports-zoneinfo" version = "0.2.1" description = "Backport of the standard library zoneinfo module" -category = "main" optional = false python-versions = ">=3.6" files = [ @@ -75,7 +72,6 @@ tzdata = ["tzdata"] name = "boto3" version = "1.7.84" description = "The AWS SDK for Python" -category = "main" optional = false python-versions = "*" files = [ @@ -92,7 +88,6 @@ s3transfer = ">=0.1.10,<0.2.0" name = "botocore" version = "1.10.84" description = "Low-level, data-driven core of boto 3." -category = "main" optional = false python-versions = "*" files = [ @@ -109,7 +104,6 @@ python-dateutil = {version = ">=2.1,<3.0.0", markers = "python_version >= \"2.7\ name = "cachetools" version = "5.3.0" description = "Extensible memoizing collections and decorators" -category = "main" optional = false python-versions = "~=3.7" files = [ @@ -121,7 +115,6 @@ files = [ name = "certifi" version = "2023.5.7" description = "Python package for providing Mozilla's CA Bundle." -category = "main" optional = false python-versions = ">=3.6" files = [ @@ -133,7 +126,6 @@ files = [ name = "chardet" version = "5.1.0" description = "Universal encoding detector for Python 3" -category = "dev" optional = false python-versions = ">=3.7" files = [ @@ -145,7 +137,6 @@ files = [ name = "charset-normalizer" version = "3.1.0" description = "The Real First Universal Charset Detector. Open, modern and actively maintained alternative to Chardet." -category = "main" optional = false python-versions = ">=3.7.0" files = [ @@ -230,7 +221,6 @@ files = [ name = "colorama" version = "0.4.6" description = "Cross-platform colored terminal text." -category = "main" optional = false python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,!=3.4.*,!=3.5.*,!=3.6.*,>=2.7" files = [ @@ -242,7 +232,6 @@ files = [ name = "contourpy" version = "1.0.7" description = "Python library for calculating contours of 2D quadrilateral grids" -category = "main" optional = false python-versions = ">=3.8" files = [ @@ -317,7 +306,6 @@ test-no-images = ["pytest"] name = "coverage" version = "7.2.5" description = "Code coverage measurement for Python" -category = "dev" optional = false python-versions = ">=3.7" files = [ @@ -381,7 +369,6 @@ toml = ["tomli"] name = "cycler" version = "0.11.0" description = "Composable style cycles" -category = "main" optional = false python-versions = ">=3.6" files = [ @@ -393,7 +380,6 @@ files = [ name = "distlib" version = "0.3.6" description = "Distribution utilities" -category = "dev" optional = false python-versions = "*" files = [ @@ -405,7 +391,6 @@ files = [ name = "docutils" version = "0.20" description = "Docutils -- Python Documentation Utilities" -category = "main" optional = false python-versions = ">=3.7" files = [ @@ -417,7 +402,6 @@ files = [ name = "filelock" version = "3.12.0" description = "A platform independent file lock." -category = "dev" optional = false python-versions = ">=3.7" files = [ @@ -433,7 +417,6 @@ testing = ["covdefaults (>=2.3)", "coverage (>=7.2.3)", "diff-cover (>=7.5)", "p name = "flake8" version = "5.0.4" description = "the modular source code checker: pep8 pyflakes and co" -category = "dev" optional = false python-versions = ">=3.6.1" files = [ @@ -450,7 +433,6 @@ pyflakes = ">=2.5.0,<2.6.0" name = "fonttools" version = "4.39.4" description = "Tools to manipulate font files" -category = "main" optional = false python-versions = ">=3.8" files = [ @@ -474,15 +456,12 @@ woff = ["brotli (>=1.0.1)", "brotlicffi (>=0.8.0)", "zopfli (>=0.1.4)"] [[package]] name = "gokart" -version = "1.2.2" +version = "0.0.0" description = "Gokart solves reproducibility, task dependencies, constraints of good code, and ease of use for Machine Learning Pipeline. [Documentation](https://gokart.readthedocs.io/en/latest/)" -category = "main" optional = false python-versions = ">=3.8,<3.12" -files = [ - {file = "gokart-1.2.2-py3-none-any.whl", hash = "sha256:a17dfd9fe62abd2263f968c4c3d1864679ef3548df204b5736bf6766f2f9f4d5"}, - {file = "gokart-1.2.2.tar.gz", hash = "sha256:4a64646db103db60690e4570c3e9d15b2b461413462e19fa69015703c1649020"}, -] +files = [] +develop = false [package.dependencies] APScheduler = "*" @@ -495,15 +474,20 @@ numpy = "*" pandas = "*" pyarrow = "*" redis = "*" -slack-sdk = ">=3,<4" +slack-sdk = "^3" tqdm = "*" uritemplate = "*" +[package.source] +type = "git" +url = "https://github.com/maronuu/gokart.git" +reference = "feature/make_target_ini" +resolved_reference = "6dd28bf2bd2b8a5114e0b3294a281e9e361d8029" + [[package]] name = "google-api-core" version = "2.11.0" description = "Google API client core library" -category = "main" optional = false python-versions = ">=3.7" files = [ @@ -526,7 +510,6 @@ grpcio-gcp = ["grpcio-gcp (>=0.2.2,<1.0dev)"] name = "google-api-python-client" version = "2.86.0" description = "Google API Client Library for Python" -category = "main" optional = false python-versions = ">=3.7" files = [ @@ -535,7 +518,7 @@ files = [ ] [package.dependencies] -google-api-core = ">=1.31.5,<2.0.0 || >2.3.0,<3.0.0dev" +google-api-core = ">=1.31.5,<2.0.dev0 || >2.3.0,<3.0.0dev" google-auth = ">=1.19.0,<3.0.0dev" google-auth-httplib2 = ">=0.1.0" httplib2 = ">=0.15.0,<1dev" @@ -545,7 +528,6 @@ uritemplate = ">=3.0.1,<5" name = "google-auth" version = "2.17.3" description = "Google Authentication Library" -category = "main" optional = false python-versions = ">=2.7,!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,!=3.4.*,!=3.5.*" files = [ @@ -570,7 +552,6 @@ requests = ["requests (>=2.20.0,<3.0.0dev)"] name = "google-auth-httplib2" version = "0.1.0" description = "Google Authentication Library: httplib2 transport" -category = "main" optional = false python-versions = "*" files = [ @@ -587,7 +568,6 @@ six = "*" name = "googleapis-common-protos" version = "1.59.0" description = "Common protobufs used in Google APIs" -category = "main" optional = false python-versions = ">=3.7" files = [ @@ -605,7 +585,6 @@ grpc = ["grpcio (>=1.44.0,<2.0.0dev)"] name = "httplib2" version = "0.22.0" description = "A comprehensive HTTP client library." -category = "main" optional = false python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*" files = [ @@ -620,7 +599,6 @@ pyparsing = {version = ">=2.4.2,<3.0.0 || >3.0.0,<3.0.1 || >3.0.1,<3.0.2 || >3.0 name = "idna" version = "3.4" description = "Internationalized Domain Names in Applications (IDNA)" -category = "main" optional = false python-versions = ">=3.5" files = [ @@ -632,7 +610,6 @@ files = [ name = "importlib-resources" version = "5.12.0" description = "Read resources from Python packages" -category = "main" optional = false python-versions = ">=3.7" files = [ @@ -651,7 +628,6 @@ testing = ["flake8 (<5)", "pytest (>=6)", "pytest-black (>=0.3.7)", "pytest-chec name = "isort" version = "5.12.0" description = "A Python utility / library to sort Python imports." -category = "dev" optional = false python-versions = ">=3.8.0" files = [ @@ -669,7 +645,6 @@ requirements-deprecated-finder = ["pip-api", "pipreqs"] name = "jmespath" version = "0.10.0" description = "JSON Matching Expressions" -category = "main" optional = false python-versions = ">=2.6, !=3.0.*, !=3.1.*, !=3.2.*" files = [ @@ -681,7 +656,6 @@ files = [ name = "kiwisolver" version = "1.4.4" description = "A fast implementation of the Cassowary constraint solver" -category = "main" optional = false python-versions = ">=3.7" files = [ @@ -759,7 +733,6 @@ files = [ name = "kubernetes" version = "26.1.0" description = "Kubernetes python client" -category = "main" optional = false python-versions = ">=3.6" files = [ @@ -777,7 +750,7 @@ requests-oauthlib = "*" setuptools = ">=21.0.0" six = ">=1.9.0" urllib3 = ">=1.24.2" -websocket-client = ">=0.32.0,<0.40.0 || >0.40.0,<0.41.0 || >=0.43.0" +websocket-client = ">=0.32.0,<0.40.0 || >0.40.0,<0.41.dev0 || >=0.43.dev0" [package.extras] adal = ["adal (>=1.0.2)"] @@ -786,7 +759,6 @@ adal = ["adal (>=1.0.2)"] name = "lockfile" version = "0.12.2" description = "Platform-independent file locking module" -category = "main" optional = false python-versions = "*" files = [ @@ -798,7 +770,6 @@ files = [ name = "luigi" version = "3.3.0" description = "Workflow mgmgt + task scheduling + dependency resolution." -category = "main" optional = false python-versions = "*" files = [ @@ -820,7 +791,6 @@ toml = ["toml (<2.0.0)"] name = "matplotlib" version = "3.7.1" description = "Python plotting package" -category = "main" optional = false python-versions = ">=3.8" files = [ @@ -883,7 +853,6 @@ python-dateutil = ">=2.7" name = "mccabe" version = "0.7.0" description = "McCabe checker, plugin for flake8" -category = "dev" optional = false python-versions = ">=3.6" files = [ @@ -895,7 +864,6 @@ files = [ name = "mypy" version = "1.3.0" description = "Optional static typing for Python" -category = "dev" optional = false python-versions = ">=3.7" files = [ @@ -942,7 +910,6 @@ reports = ["lxml"] name = "mypy-extensions" version = "1.0.0" description = "Type system extensions for programs checked with the mypy type checker." -category = "dev" optional = false python-versions = ">=3.5" files = [ @@ -954,7 +921,6 @@ files = [ name = "numpy" version = "1.24.3" description = "Fundamental package for array computing in Python" -category = "main" optional = false python-versions = ">=3.8" files = [ @@ -992,7 +958,6 @@ files = [ name = "oauthlib" version = "3.2.2" description = "A generic, spec-compliant, thorough implementation of the OAuth request-signing logic" -category = "main" optional = false python-versions = ">=3.6" files = [ @@ -1009,7 +974,6 @@ signedtoken = ["cryptography (>=3.0.0)", "pyjwt (>=2.0.0,<3)"] name = "packaging" version = "23.1" description = "Core utilities for Python packages" -category = "main" optional = false python-versions = ">=3.7" files = [ @@ -1021,7 +985,6 @@ files = [ name = "pandas" version = "2.0.1" description = "Powerful data structures for data analysis, time series, and statistics" -category = "main" optional = false python-versions = ">=3.8" files = [ @@ -1055,8 +1018,8 @@ files = [ [package.dependencies] numpy = [ {version = ">=1.20.3", markers = "python_version < \"3.10\""}, - {version = ">=1.21.0", markers = "python_version >= \"3.10\""}, {version = ">=1.23.2", markers = "python_version >= \"3.11\""}, + {version = ">=1.21.0", markers = "python_version >= \"3.10\" and python_version < \"3.11\""}, ] python-dateutil = ">=2.8.2" pytz = ">=2020.1" @@ -1089,7 +1052,6 @@ xml = ["lxml (>=4.6.3)"] name = "pandas-stubs" version = "1.5.3.230321" description = "Type annotations for pandas" -category = "dev" optional = false python-versions = ">=3.8,<3.12" files = [ @@ -1104,7 +1066,6 @@ types-pytz = ">=2022.1.1" name = "pillow" version = "9.5.0" description = "Python Imaging Library (Fork)" -category = "main" optional = false python-versions = ">=3.7" files = [ @@ -1184,7 +1145,6 @@ tests = ["check-manifest", "coverage", "defusedxml", "markdown2", "olefile", "pa name = "platformdirs" version = "3.5.0" description = "A small Python package for determining appropriate platform-specific dirs, e.g. a \"user data dir\"." -category = "dev" optional = false python-versions = ">=3.7" files = [ @@ -1200,7 +1160,6 @@ test = ["appdirs (==1.4.4)", "covdefaults (>=2.3)", "pytest (>=7.3.1)", "pytest- name = "pluggy" version = "1.0.0" description = "plugin and hook calling mechanisms for python" -category = "dev" optional = false python-versions = ">=3.6" files = [ @@ -1216,7 +1175,6 @@ testing = ["pytest", "pytest-benchmark"] name = "protobuf" version = "4.23.0" description = "" -category = "main" optional = false python-versions = ">=3.7" files = [ @@ -1239,7 +1197,6 @@ files = [ name = "pyarrow" version = "12.0.0" description = "Python library for Apache Arrow" -category = "main" optional = false python-versions = ">=3.7" files = [ @@ -1277,7 +1234,6 @@ numpy = ">=1.16.6" name = "pyasn1" version = "0.5.0" description = "Pure-Python implementation of ASN.1 types and DER/BER/CER codecs (X.208)" -category = "main" optional = false python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,!=3.4.*,!=3.5.*,>=2.7" files = [ @@ -1289,7 +1245,6 @@ files = [ name = "pyasn1-modules" version = "0.3.0" description = "A collection of ASN.1-based protocols modules" -category = "main" optional = false python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,!=3.4.*,!=3.5.*,>=2.7" files = [ @@ -1304,7 +1259,6 @@ pyasn1 = ">=0.4.6,<0.6.0" name = "pycodestyle" version = "2.9.1" description = "Python style guide checker" -category = "dev" optional = false python-versions = ">=3.6" files = [ @@ -1316,7 +1270,6 @@ files = [ name = "pyflakes" version = "2.5.0" description = "passive checker of Python programs" -category = "dev" optional = false python-versions = ">=3.6" files = [ @@ -1328,7 +1281,6 @@ files = [ name = "pyparsing" version = "3.0.9" description = "pyparsing module - Classes and methods to define and execute parsing grammars" -category = "main" optional = false python-versions = ">=3.6.8" files = [ @@ -1343,7 +1295,6 @@ diagrams = ["jinja2", "railroad-diagrams"] name = "pyproject-api" version = "1.5.1" description = "API to interact with the python pyproject.toml based projects" -category = "dev" optional = false python-versions = ">=3.7" files = [ @@ -1363,7 +1314,6 @@ testing = ["covdefaults (>=2.2.2)", "importlib-metadata (>=6)", "pytest (>=7.2.1 name = "pyproject-flake8" version = "5.0.4" description = "pyproject-flake8 (`pflake8`), a monkey patching wrapper to connect flake8 with pyproject.toml configuration" -category = "dev" optional = false python-versions = "*" files = [ @@ -1379,7 +1329,6 @@ tomli = {version = "*", markers = "python_version < \"3.11\""} name = "python-daemon" version = "3.0.1" description = "Library to implement a well-behaved Unix daemon process." -category = "main" optional = false python-versions = ">=3" files = [ @@ -1400,7 +1349,6 @@ test = ["coverage", "docutils", "testscenarios (>=0.4)", "testtools"] name = "python-dateutil" version = "2.8.2" description = "Extensions to the standard Python datetime module" -category = "main" optional = false python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,>=2.7" files = [ @@ -1415,7 +1363,6 @@ six = ">=1.5" name = "pytz" version = "2023.3" description = "World timezone definitions, modern and historical" -category = "main" optional = false python-versions = "*" files = [ @@ -1427,7 +1374,6 @@ files = [ name = "pytz-deprecation-shim" version = "0.1.0.post0" description = "Shims to make deprecation of pytz easier" -category = "main" optional = false python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,!=3.4.*,!=3.5.*,>=2.7" files = [ @@ -1443,7 +1389,6 @@ tzdata = {version = "*", markers = "python_version >= \"3.6\""} name = "pyyaml" version = "6.0" description = "YAML parser and emitter for Python" -category = "main" optional = false python-versions = ">=3.6" files = [ @@ -1493,7 +1438,6 @@ files = [ name = "redis" version = "4.5.5" description = "Python client for Redis database and key-value store" -category = "main" optional = false python-versions = ">=3.7" files = [ @@ -1512,7 +1456,6 @@ ocsp = ["cryptography (>=36.0.1)", "pyopenssl (==20.0.1)", "requests (>=2.26.0)" name = "requests" version = "2.30.0" description = "Python HTTP for Humans." -category = "main" optional = false python-versions = ">=3.7" files = [ @@ -1534,7 +1477,6 @@ use-chardet-on-py3 = ["chardet (>=3.0.2,<6)"] name = "requests-oauthlib" version = "1.3.1" description = "OAuthlib authentication support for Requests." -category = "main" optional = false python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*" files = [ @@ -1553,7 +1495,6 @@ rsa = ["oauthlib[signedtoken] (>=3.0.0)"] name = "rsa" version = "4.9" description = "Pure-Python RSA implementation" -category = "main" optional = false python-versions = ">=3.6,<4" files = [ @@ -1568,7 +1509,6 @@ pyasn1 = ">=0.1.3" name = "s3transfer" version = "0.1.13" description = "An Amazon S3 Transfer Manager" -category = "main" optional = false python-versions = "*" files = [ @@ -1583,7 +1523,6 @@ botocore = ">=1.3.0,<2.0.0" name = "setuptools" version = "67.7.2" description = "Easily download, build, install, upgrade, and uninstall Python packages" -category = "main" optional = false python-versions = ">=3.7" files = [ @@ -1600,7 +1539,6 @@ testing-integration = ["build[virtualenv]", "filelock (>=3.4.0)", "jaraco.envs ( name = "six" version = "1.16.0" description = "Python 2 and 3 compatibility utilities" -category = "main" optional = false python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*" files = [ @@ -1612,7 +1550,6 @@ files = [ name = "slack-sdk" version = "3.21.3" description = "The Slack API Platform SDK for Python" -category = "main" optional = false python-versions = ">=3.6.0" files = [ @@ -1628,7 +1565,6 @@ testing = ["Flask (>=1,<2)", "Flask-Sockets (>=0.2,<1)", "Jinja2 (==3.0.3)", "We name = "tenacity" version = "8.2.2" description = "Retry code until it succeeds" -category = "main" optional = false python-versions = ">=3.6" files = [ @@ -1643,7 +1579,6 @@ doc = ["reno", "sphinx", "tornado (>=4.5)"] name = "toml" version = "0.10.2" description = "Python Library for Tom's Obvious, Minimal Language" -category = "dev" optional = false python-versions = ">=2.6, !=3.0.*, !=3.1.*, !=3.2.*" files = [ @@ -1655,7 +1590,6 @@ files = [ name = "tomli" version = "2.0.1" description = "A lil' TOML parser" -category = "dev" optional = false python-versions = ">=3.7" files = [ @@ -1667,7 +1601,6 @@ files = [ name = "tornado" version = "6.3.1" description = "Tornado is a Python web framework and asynchronous networking library, originally developed at FriendFeed." -category = "main" optional = false python-versions = ">= 3.8" files = [ @@ -1688,7 +1621,6 @@ files = [ name = "tox" version = "4.5.1" description = "tox is a generic virtualenv management and test command line tool" -category = "dev" optional = false python-versions = ">=3.7" files = [ @@ -1716,7 +1648,6 @@ testing = ["build[virtualenv] (>=0.10)", "covdefaults (>=2.3)", "devpi-process ( name = "tqdm" version = "4.65.0" description = "Fast, Extensible Progress Meter" -category = "main" optional = false python-versions = ">=3.7" files = [ @@ -1737,7 +1668,6 @@ telegram = ["requests"] name = "types-pytz" version = "2023.3.0.0" description = "Typing stubs for pytz" -category = "dev" optional = false python-versions = "*" files = [ @@ -1749,7 +1679,6 @@ files = [ name = "typing-extensions" version = "4.5.0" description = "Backported and Experimental Type Hints for Python 3.7+" -category = "dev" optional = false python-versions = ">=3.7" files = [ @@ -1761,7 +1690,6 @@ files = [ name = "tzdata" version = "2023.3" description = "Provider of IANA time zone data" -category = "main" optional = false python-versions = ">=2" files = [ @@ -1773,7 +1701,6 @@ files = [ name = "tzlocal" version = "4.3" description = "tzinfo object for the local timezone" -category = "main" optional = false python-versions = ">=3.7" files = [ @@ -1793,7 +1720,6 @@ devenv = ["black", "check-manifest", "flake8", "pyroma", "pytest (>=4.3)", "pyte name = "uritemplate" version = "4.1.1" description = "Implementation of RFC 6570 URI Templates" -category = "main" optional = false python-versions = ">=3.6" files = [ @@ -1805,7 +1731,6 @@ files = [ name = "urllib3" version = "2.0.2" description = "HTTP library with thread-safe connection pooling, file post, and more." -category = "main" optional = false python-versions = ">=3.7" files = [ @@ -1823,7 +1748,6 @@ zstd = ["zstandard (>=0.18.0)"] name = "virtualenv" version = "20.23.0" description = "Virtual Python Environment builder" -category = "dev" optional = false python-versions = ">=3.7" files = [ @@ -1844,7 +1768,6 @@ test = ["covdefaults (>=2.3)", "coverage (>=7.2.3)", "coverage-enable-subprocess name = "websocket-client" version = "1.5.1" description = "WebSocket client for Python with low level API options" -category = "main" optional = false python-versions = ">=3.7" files = [ @@ -1861,7 +1784,6 @@ test = ["websockets"] name = "yapf" version = "0.33.0" description = "A formatter for Python code." -category = "dev" optional = false python-versions = "*" files = [ @@ -1876,7 +1798,6 @@ tomli = ">=2.0.1" name = "zipp" version = "3.15.0" description = "Backport of pathlib-compatible object wrapper for zip files" -category = "main" optional = false python-versions = ">=3.7" files = [ @@ -1891,4 +1812,4 @@ testing = ["big-O", "flake8 (<5)", "jaraco.functools", "jaraco.itertools", "more [metadata] lock-version = "2.0" python-versions = "^3.8,<3.12" -content-hash = "ee3cb44521f7be0f62baedf6aa4dbb7f9620f6b1d9f1c377cd0e35785be9fd28" +content-hash = "c65b59d89d5ff1dd4a605e529cebc1e1fe0b382ae63ce6b676113cef72ab3cc6" diff --git a/pyproject.toml b/pyproject.toml index aace85a..7943fb2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -16,7 +16,7 @@ pattern = "^(?P\\d+\\.\\d+\\.\\d+)" [tool.poetry.dependencies] python = "^3.8,<3.12" -gokart = "^1.2.2" +gokart = { git = "https://github.com/maronuu/gokart.git", branch = "feature/make_target_ini" } kubernetes = "^26.1.0" [tool.poetry.group.dev.dependencies] From 5164e44c8422e134d7687d67e85c016d46d66860 Mon Sep 17 00:00:00 2001 From: yutaro-oguri Date: Thu, 26 Oct 2023 10:56:53 +0900 Subject: [PATCH 22/30] update gokart --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 7943fb2..aace85a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -16,7 +16,7 @@ pattern = "^(?P\\d+\\.\\d+\\.\\d+)" [tool.poetry.dependencies] python = "^3.8,<3.12" -gokart = { git = "https://github.com/maronuu/gokart.git", branch = "feature/make_target_ini" } +gokart = "^1.2.2" kubernetes = "^26.1.0" [tool.poetry.group.dev.dependencies] From eaa7f8aa8fa29214c9e87428b43b8037b68ec8b3 Mon Sep 17 00:00:00 2001 From: yutaro-oguri Date: Thu, 26 Oct 2023 15:22:52 +0900 Subject: [PATCH 23/30] fix assert --- kannon/master.py | 3 ++- poetry.lock | 18 +++++++----------- 2 files changed, 9 insertions(+), 12 deletions(-) diff --git a/kannon/master.py b/kannon/master.py index 336d6f0..1c31bfe 100644 --- a/kannon/master.py +++ b/kannon/master.py @@ -53,7 +53,8 @@ def build(self, root_task: gokart.TaskOnKart) -> None: luigi_parser_instance = luigi.configuration.get_config() config_paths = luigi_parser_instance._config_paths for config_path in config_paths: - assert os.path.exists(config_path), f"Config file {config_path} does not exits." + logger.info(f"Config file {config_path} is registered") + # assert os.path.exists(config_path), f"Config file {config_path} does not exits." # save configs to remote cache workspace_dir = os.environ.get("TASK_WORKSPACE_DIRECTORY") remote_config_dir = os.path.join(workspace_dir, "kannon", "conf") diff --git a/poetry.lock b/poetry.lock index 5647e5a..730ce1c 100644 --- a/poetry.lock +++ b/poetry.lock @@ -456,12 +456,14 @@ woff = ["brotli (>=1.0.1)", "brotlicffi (>=0.8.0)", "zopfli (>=0.1.4)"] [[package]] name = "gokart" -version = "0.0.0" +version = "1.2.4" description = "Gokart solves reproducibility, task dependencies, constraints of good code, and ease of use for Machine Learning Pipeline. [Documentation](https://gokart.readthedocs.io/en/latest/)" optional = false python-versions = ">=3.8,<3.12" -files = [] -develop = false +files = [ + {file = "gokart-1.2.4-py3-none-any.whl", hash = "sha256:620f8b56a31e8a4efd353950f049d7f9ed231620aed19273b8e11f9d9e5f2ddc"}, + {file = "gokart-1.2.4.tar.gz", hash = "sha256:10adc110f2c0c697f716cf36769cd65ba4ade30e74124b0b9096ac9cc5c9d483"}, +] [package.dependencies] APScheduler = "*" @@ -474,16 +476,10 @@ numpy = "*" pandas = "*" pyarrow = "*" redis = "*" -slack-sdk = "^3" +slack-sdk = ">=3,<4" tqdm = "*" uritemplate = "*" -[package.source] -type = "git" -url = "https://github.com/maronuu/gokart.git" -reference = "feature/make_target_ini" -resolved_reference = "6dd28bf2bd2b8a5114e0b3294a281e9e361d8029" - [[package]] name = "google-api-core" version = "2.11.0" @@ -1812,4 +1808,4 @@ testing = ["big-O", "flake8 (<5)", "jaraco.functools", "jaraco.itertools", "more [metadata] lock-version = "2.0" python-versions = "^3.8,<3.12" -content-hash = "c65b59d89d5ff1dd4a605e529cebc1e1fe0b382ae63ce6b676113cef72ab3cc6" +content-hash = "ee3cb44521f7be0f62baedf6aa4dbb7f9620f6b1d9f1c377cd0e35785be9fd28" From cb35b7b07766f5abe43aa95440c62a3f7253252c Mon Sep 17 00:00:00 2001 From: yutaro-oguri Date: Thu, 26 Oct 2023 17:42:11 +0900 Subject: [PATCH 24/30] fix path --- kannon/master.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/kannon/master.py b/kannon/master.py index 1c31bfe..e846dea 100644 --- a/kannon/master.py +++ b/kannon/master.py @@ -60,6 +60,9 @@ def build(self, root_task: gokart.TaskOnKart) -> None: remote_config_dir = os.path.join(workspace_dir, "kannon", "conf") remote_config_paths = [os.path.join(remote_config_dir, os.path.basename(config_path)) for config_path in config_paths] for remote_config_path in remote_config_paths: + if not config_path.endswith(".ini"): + logger.warning(f"Format {config_path} is not supported, so skipped") + continue with open(config_path, "r") as f: make_target(remote_config_path).dump(f) From a344a51801e346cff8288d2c4bd526a3c948e674 Mon Sep 17 00:00:00 2001 From: yutaro-oguri Date: Thu, 26 Oct 2023 18:47:30 +0900 Subject: [PATCH 25/30] fix path to remote path --- kannon/master.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kannon/master.py b/kannon/master.py index e846dea..5c76ae1 100644 --- a/kannon/master.py +++ b/kannon/master.py @@ -60,7 +60,7 @@ def build(self, root_task: gokart.TaskOnKart) -> None: remote_config_dir = os.path.join(workspace_dir, "kannon", "conf") remote_config_paths = [os.path.join(remote_config_dir, os.path.basename(config_path)) for config_path in config_paths] for remote_config_path in remote_config_paths: - if not config_path.endswith(".ini"): + if not remote_config_path.endswith(".ini"): logger.warning(f"Format {config_path} is not supported, so skipped") continue with open(config_path, "r") as f: From 3034c32b002e3c8b01319558ed81617fd293e143 Mon Sep 17 00:00:00 2001 From: yutaro-oguri Date: Tue, 31 Oct 2023 14:35:45 +0900 Subject: [PATCH 26/30] fix local / remote path --- example/run_child.py | 9 +++++++-- kannon/master.py | 15 ++++++++------- 2 files changed, 15 insertions(+), 9 deletions(-) diff --git a/example/run_child.py b/example/run_child.py index 8730927..fa9806a 100644 --- a/example/run_child.py +++ b/example/run_child.py @@ -2,6 +2,7 @@ import logging from typing import List +import os import fire import gokart import luigi @@ -13,8 +14,12 @@ def main(task_pkl_path: str, remote_config_paths: List[str]) -> None: # Load luigi config luigi.configuration.LuigiConfigParser.add_config_path("./conf/base.ini") - for path in remote_config_paths: - luigi.configuration.LuigiConfigParser.add_config_path(path) + for remote_config_path in remote_config_paths: + conf = make_target(remote_config_path).load() + # copy to pod local + local_path = os.path.join("./conf", os.path.basename(remote_config_path)) + make_target(local_path).dump(conf) + luigi.configuration.LuigiConfigParser.add_config_path(local_path) # Parse a serialized gokart.TaskOnKart task: gokart.TaskOnKart = make_target(task_pkl_path).load() diff --git a/kannon/master.py b/kannon/master.py index 5c76ae1..cf6ac67 100644 --- a/kannon/master.py +++ b/kannon/master.py @@ -54,17 +54,18 @@ def build(self, root_task: gokart.TaskOnKart) -> None: config_paths = luigi_parser_instance._config_paths for config_path in config_paths: logger.info(f"Config file {config_path} is registered") - # assert os.path.exists(config_path), f"Config file {config_path} does not exits." # save configs to remote cache workspace_dir = os.environ.get("TASK_WORKSPACE_DIRECTORY") remote_config_dir = os.path.join(workspace_dir, "kannon", "conf") - remote_config_paths = [os.path.join(remote_config_dir, os.path.basename(config_path)) for config_path in config_paths] - for remote_config_path in remote_config_paths: - if not remote_config_path.endswith(".ini"): - logger.warning(f"Format {config_path} is not supported, so skipped") + added_remote_config_paths: List[str] = [] + for local_config_path in config_paths: + if not local_config_path.endswith(".ini"): + logger.warning(f"Format {local_config_path} is not supported, so skipped") continue - with open(config_path, "r") as f: + remote_config_path = os.path.join(remote_config_dir, local_config_path) + with open(local_config_path, "r") as f: make_target(remote_config_path).dump(f) + added_remote_config_paths.append(remote_config_path) # push tasks into queue logger.info("Creating task queue...") @@ -101,7 +102,7 @@ def build(self, root_task: gokart.TaskOnKart) -> None: # execute task if isinstance(task, TaskOnBullet): logger.info(f"Trying to run task {self._gen_task_info(task)} on child job...") - self._exec_bullet_task(task, remote_config_paths) + self._exec_bullet_task(task, added_remote_config_paths) task_queue.append(task) # re-enqueue task to check if it is done elif isinstance(task, gokart.TaskOnKart): logger.info(f"Executing task {self._gen_task_info(task)} on master job...") From 2e7c32c500e333017da82f70bdbb0cc1903fbcfd Mon Sep 17 00:00:00 2001 From: yutaro-oguri Date: Tue, 31 Oct 2023 18:24:31 +0900 Subject: [PATCH 27/30] add logging --- kannon/master.py | 1 + 1 file changed, 1 insertion(+) diff --git a/kannon/master.py b/kannon/master.py index cf6ac67..79093ce 100644 --- a/kannon/master.py +++ b/kannon/master.py @@ -65,6 +65,7 @@ def build(self, root_task: gokart.TaskOnKart) -> None: remote_config_path = os.path.join(remote_config_dir, local_config_path) with open(local_config_path, "r") as f: make_target(remote_config_path).dump(f) + logger.info(f"local config file {local_config_path} is saved at remote {remote_config_path}.") added_remote_config_paths.append(remote_config_path) # push tasks into queue From 32b033f2bf264269097fa930123b34f2b3053bc8 Mon Sep 17 00:00:00 2001 From: yutaro-oguri Date: Tue, 31 Oct 2023 20:09:14 +0900 Subject: [PATCH 28/30] fix path bug --- kannon/master.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/kannon/master.py b/kannon/master.py index 79093ce..f43e29a 100644 --- a/kannon/master.py +++ b/kannon/master.py @@ -62,9 +62,10 @@ def build(self, root_task: gokart.TaskOnKart) -> None: if not local_config_path.endswith(".ini"): logger.warning(f"Format {local_config_path} is not supported, so skipped") continue - remote_config_path = os.path.join(remote_config_dir, local_config_path) + remote_config_path = os.path.join(remote_config_dir, os.path.basename(local_config_path)) with open(local_config_path, "r") as f: - make_target(remote_config_path).dump(f) + content = "\n".join(f.readlines()) + make_target(remote_config_path).dump(content) logger.info(f"local config file {local_config_path} is saved at remote {remote_config_path}.") added_remote_config_paths.append(remote_config_path) From a036bec29a0d3b05c1ea56156faeb352dec52932 Mon Sep 17 00:00:00 2001 From: yutaro-oguri Date: Mon, 13 Nov 2023 19:28:43 +0900 Subject: [PATCH 29/30] Add dynamic conf arg --- kannon/master.py | 41 +++++++++++++++++++++++------------------ 1 file changed, 23 insertions(+), 18 deletions(-) diff --git a/kannon/master.py b/kannon/master.py index f43e29a..84967bd 100644 --- a/kannon/master.py +++ b/kannon/master.py @@ -30,6 +30,7 @@ def __init__( env_to_inherit: Optional[List[str]] = None, master_pod_name: Optional[str] = None, master_pod_uid: Optional[str] = None, + dynamic_config_paths: Optional[List[str]] = None, ) -> None: # validation if not os.path.exists(path_child_script): @@ -45,29 +46,33 @@ def __init__( self.env_to_inherit = env_to_inherit self.master_pod_name = master_pod_name self.master_pod_uid = master_pod_uid + self.dynamic_config_paths = dynamic_config_paths self.task_id_to_job_name: Dict[str, str] = dict() def build(self, root_task: gokart.TaskOnKart) -> None: # check all config file paths exists - luigi_parser_instance = luigi.configuration.get_config() - config_paths = luigi_parser_instance._config_paths - for config_path in config_paths: - logger.info(f"Config file {config_path} is registered") - # save configs to remote cache - workspace_dir = os.environ.get("TASK_WORKSPACE_DIRECTORY") - remote_config_dir = os.path.join(workspace_dir, "kannon", "conf") - added_remote_config_paths: List[str] = [] - for local_config_path in config_paths: - if not local_config_path.endswith(".ini"): - logger.warning(f"Format {local_config_path} is not supported, so skipped") - continue - remote_config_path = os.path.join(remote_config_dir, os.path.basename(local_config_path)) - with open(local_config_path, "r") as f: - content = "\n".join(f.readlines()) - make_target(remote_config_path).dump(content) - logger.info(f"local config file {local_config_path} is saved at remote {remote_config_path}.") - added_remote_config_paths.append(remote_config_path) + # luigi_parser_instance = luigi.configuration.get_config() + # config_paths = luigi_parser_instance._config_paths + if self.dynamic_config_paths: + logger.info("Handling dynamic config files...") + for config_path in self.dynamic_config_paths: + logger.info(f"Config file {config_path} is registered") + # save configs to remote cache + remote_config_dir = os.path.join(os.environ.get("TASK_WORKSPACE_DIRECTORY"), "kannon", "conf") + added_remote_config_paths: List[str] = [] + for local_config_path in self.dynamic_config_paths: + if not local_config_path.endswith(".ini"): + logger.warning(f"Format {local_config_path} is not supported, so skipped") + continue + # load local config and save it to remote cache + local_conf_content = make_target(local_config_path).load() + remote_config_path = os.path.join(remote_config_dir, os.path.basename(local_config_path)) + make_target(remote_config_path).dump(local_conf_content) + logger.info(f"local config file {local_config_path} is saved at remote {remote_config_path}.") + added_remote_config_paths.append(remote_config_path) + else: + logger.info("No dynamic config files are given.") # push tasks into queue logger.info("Creating task queue...") From 841d3df13c010f65f05a38c469c5c4aad1cab75a Mon Sep 17 00:00:00 2001 From: yutaro-oguri Date: Tue, 14 Nov 2023 16:21:00 +0900 Subject: [PATCH 30/30] fix remote config arg --- kannon/master.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/kannon/master.py b/kannon/master.py index 84967bd..8ed8eac 100644 --- a/kannon/master.py +++ b/kannon/master.py @@ -178,8 +178,8 @@ def _create_child_job_object( self.path_child_script, "--task-pkl-path", f"'{task_pkl_path}'", - "--remote-config-paths", - ",".join(remote_config_paths), + "--remote-config-path", + remote_config_paths[0], # TODO: support multiple paths ] job = deepcopy(self.template_job) # replace command