From 73c0294eaa3edab0f50cb8d1cd9a88309c79cb2c Mon Sep 17 00:00:00 2001 From: Yee Hing Tong Date: Wed, 8 Jan 2025 11:49:55 -0800 Subject: [PATCH] add status running to update object Signed-off-by: Yee Hing Tong --- flytekit/core/worker_queue.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flytekit/core/worker_queue.py b/flytekit/core/worker_queue.py index 635f83d3ec..b335d96850 100644 --- a/flytekit/core/worker_queue.py +++ b/flytekit/core/worker_queue.py @@ -202,6 +202,7 @@ def reconcile_one(self, update: Update): update.status = ItemStatus.RUNNING else: if not item.wf_exec.is_done: + update.status = ItemStatus.RUNNING # Technically a mutating operation, but let's pretend it's not update.wf_exec = self.remote.sync_execution(item.wf_exec) if update.wf_exec.closure.phase == WorkflowExecutionPhase.SUCCEEDED: @@ -356,7 +357,6 @@ async def add(self, entity: RunnableEntity, input_kwargs: dict[str, typing.Any]) """ # need to also check to see if the entity has already been registered, and if not, register it. i = WorkItem(entity=entity, input_kwargs=input_kwargs) - assert i.status == ItemStatus.PENDING with self.entries_lock: if entity.name not in self.entries: