Skip to content

Commit

Permalink
Use run ID as a key field
Browse files Browse the repository at this point in the history
  • Loading branch information
t3eHawk committed Dec 12, 2022
1 parent 7af8ba5 commit 212274c
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 22 deletions.
22 changes: 13 additions & 9 deletions pydin/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -1274,7 +1274,7 @@ def pipeline(self):
key_field = getattr(variables, key_name) if key_name else None

chunk_size = record['chunk_size']
cleanup = True if record['cleanup'] else False
cleanup = to_boolean(record['cleanup'])

constructor = getattr(models, node_type)
node = constructor(model_name=node_name,
Expand Down Expand Up @@ -1788,9 +1788,9 @@ def pipeline(self, value):

@property
def job(self):
"""Get process job ID."""
"""Get process job."""
if self.pipeline and self.pipeline.job:
return self.pipeline.job
return self.pipeline.job

@property
def process_name(self):
Expand All @@ -1816,13 +1816,13 @@ def error_handler(self):
if exc_info:
self.errors.add(exc_info)
if isinstance(self, Task):
if self.pipeline.job:
self.pipeline.job.errors.add(exc_info)
if self.job:
self.job.errors.add(exc_info)
if isinstance(self, Step):
if self.pipeline:
self.pipeline.task.errors.add(exc_info)
if self.pipeline.job:
self.pipeline.job.errors.add(exc_info)
if self.job:
self.job.errors.add(exc_info)

pass

Expand Down Expand Up @@ -2088,11 +2088,15 @@ def revoke(self):
th = db.tables.task_history
for record in self.history:
record_id = record['id']
process_id = record['run_id']
status = record['status']
if status == 'D':
for step in self.pipeline.walk():
if step.last.recyclable:
if step.last.key_field.unit == 'Task':
if step.last.key_field.associated(self.job):
if process_id:
step.last.recycle(process_id)
elif step.last.key_field.associated(self):
step.last.recycle(record_id)
update = th.update().values(status='C').\
where(th.c.id == record_id)
Expand Down Expand Up @@ -2521,7 +2525,7 @@ def revoke(self):
status = record['status']
if status == 'D':
if self.last.recyclable:
if self.last.key_field.unit == 'Step':
if self.last.key_field.associated(self):
self.last.recycle(record_id)
update = sh.update().values(status='C').\
where(sh.c.id == record_id)
Expand Down
31 changes: 18 additions & 13 deletions pydin/vars.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,6 @@ def name(self):
value += elem.lower()
return value

@property
def unit(self):
return 'Task' if self.name.startswith(('task', 'process')) else 'Step'

@property
def label(self):
return f'pd_{self.name}'
Expand All @@ -34,6 +30,18 @@ def label(self):
def column(self):
return sa.literal(self.value).label(self.label)

def associated(self, obj):
"""Check if the key field associated with the given instance."""
class_name = obj.__class__.__name__
if self.name.startswith(class_name.lower()):
return True
elif self.name.startswith('run') and class_name == 'Job':
return True
elif self.name.startswith('process') and class_name == 'Job':
return True
else:
return False


class RunId(KeyField):
"""Represents Run ID as a key field."""
Expand All @@ -44,6 +52,12 @@ def value(self):
return self.model.job.record_id


class ProcessId(RunId):
"""Represents Process ID as a key field."""

pass


class TaskId(KeyField):
"""Represents Task ID as a key field."""

Expand All @@ -62,15 +76,6 @@ def value(self):
raise NotImplementedError


class ProcessId(KeyField):
"""Represents Process ID as a key field."""

@property
def value(self):
if self.model.pipeline:
return self.model.pipeline.task.id


run_id = ProcessId()
task_id = TaskId()
step_id = StepId()
Expand Down

0 comments on commit 212274c

Please sign in to comment.