From 212274cb2a5f7c050261150fa18d54a802134de0 Mon Sep 17 00:00:00 2001 From: t3eHawk Date: Mon, 12 Dec 2022 04:55:55 +0300 Subject: [PATCH] Use run ID as a key field --- pydin/core.py | 22 +++++++++++++--------- pydin/vars.py | 31 ++++++++++++++++++------------- 2 files changed, 31 insertions(+), 22 deletions(-) diff --git a/pydin/core.py b/pydin/core.py index 8583ae4..c21fb64 100644 --- a/pydin/core.py +++ b/pydin/core.py @@ -1274,7 +1274,7 @@ def pipeline(self): key_field = getattr(variables, key_name) if key_name else None chunk_size = record['chunk_size'] - cleanup = True if record['cleanup'] else False + cleanup = to_boolean(record['cleanup']) constructor = getattr(models, node_type) node = constructor(model_name=node_name, @@ -1788,9 +1788,9 @@ def pipeline(self, value): @property def job(self): - """Get process job ID.""" + """Get process job.""" if self.pipeline and self.pipeline.job: - return self.pipeline.job + return self.pipeline.job @property def process_name(self): @@ -1816,13 +1816,13 @@ def error_handler(self): if exc_info: self.errors.add(exc_info) if isinstance(self, Task): - if self.pipeline.job: - self.pipeline.job.errors.add(exc_info) + if self.job: + self.job.errors.add(exc_info) if isinstance(self, Step): if self.pipeline: self.pipeline.task.errors.add(exc_info) - if self.pipeline.job: - self.pipeline.job.errors.add(exc_info) + if self.job: + self.job.errors.add(exc_info) pass @@ -2088,11 +2088,15 @@ def revoke(self): th = db.tables.task_history for record in self.history: record_id = record['id'] + process_id = record['run_id'] status = record['status'] if status == 'D': for step in self.pipeline.walk(): if step.last.recyclable: - if step.last.key_field.unit == 'Task': + if step.last.key_field.associated(self.job): + if process_id: + step.last.recycle(process_id) + elif step.last.key_field.associated(self): step.last.recycle(record_id) update = th.update().values(status='C').\ where(th.c.id == record_id) @@ -2521,7 +2525,7 @@ def revoke(self): status = record['status'] if status == 'D': if self.last.recyclable: - if self.last.key_field.unit == 'Step': + if self.last.key_field.associated(self): self.last.recycle(record_id) update = sh.update().values(status='C').\ where(sh.c.id == record_id) diff --git a/pydin/vars.py b/pydin/vars.py index 64f8eb8..71c0e0b 100644 --- a/pydin/vars.py +++ b/pydin/vars.py @@ -22,10 +22,6 @@ def name(self): value += elem.lower() return value - @property - def unit(self): - return 'Task' if self.name.startswith(('task', 'process')) else 'Step' - @property def label(self): return f'pd_{self.name}' @@ -34,6 +30,18 @@ def label(self): def column(self): return sa.literal(self.value).label(self.label) + def associated(self, obj): + """Check if the key field associated with the given instance.""" + class_name = obj.__class__.__name__ + if self.name.startswith(class_name.lower()): + return True + elif self.name.startswith('run') and class_name == 'Job': + return True + elif self.name.startswith('process') and class_name == 'Job': + return True + else: + return False + class RunId(KeyField): """Represents Run ID as a key field.""" @@ -44,6 +52,12 @@ def value(self): return self.model.job.record_id +class ProcessId(RunId): + """Represents Process ID as a key field.""" + + pass + + class TaskId(KeyField): """Represents Task ID as a key field.""" @@ -62,15 +76,6 @@ def value(self): raise NotImplementedError -class ProcessId(KeyField): - """Represents Process ID as a key field.""" - - @property - def value(self): - if self.model.pipeline: - return self.model.pipeline.task.id - - run_id = ProcessId() task_id = TaskId() step_id = StepId()