diff --git a/flytekit/core/worker_queue.py b/flytekit/core/worker_queue.py index 635f83d3ec..b335d96850 100644 --- a/flytekit/core/worker_queue.py +++ b/flytekit/core/worker_queue.py @@ -202,6 +202,7 @@ def reconcile_one(self, update: Update): update.status = ItemStatus.RUNNING else: if not item.wf_exec.is_done: + update.status = ItemStatus.RUNNING # Technically a mutating operation, but let's pretend it's not update.wf_exec = self.remote.sync_execution(item.wf_exec) if update.wf_exec.closure.phase == WorkflowExecutionPhase.SUCCEEDED: @@ -356,7 +357,6 @@ async def add(self, entity: RunnableEntity, input_kwargs: dict[str, typing.Any]) """ # need to also check to see if the entity has already been registered, and if not, register it. i = WorkItem(entity=entity, input_kwargs=input_kwargs) - assert i.status == ItemStatus.PENDING with self.entries_lock: if entity.name not in self.entries: