Skip to content
This repository has been archived by the owner on Jan 2, 2024. It is now read-only.

Commit

Permalink
fixed inconsistent update abandon jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
Toan Quach authored and Toan Quach committed Nov 17, 2023
1 parent eaa7df5 commit e445355
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 17 deletions.
30 changes: 20 additions & 10 deletions src/taipy/core/_orchestrator/_orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,7 @@ def submit(

submission.jobs = jobs # type: ignore

for job in jobs:
cls._orchestrate_job_to_run_or_block(job)
cls._orchestrate_job_to_run_or_block(jobs)

if Config.job_config.is_development:
cls._check_and_execute_jobs_if_development_mode()
Expand Down Expand Up @@ -130,9 +129,10 @@ def submit_task(
force,
)

submission.jobs = [job] # type: ignore
jobs = [job]
submission.jobs = jobs # type: ignore

cls._orchestrate_job_to_run_or_block(job)
cls._orchestrate_job_to_run_or_block(jobs)

if Config.job_config.is_development:
cls._check_and_execute_jobs_if_development_mode()
Expand Down Expand Up @@ -160,12 +160,20 @@ def _lock_dn_output_and_create_job(
return job

@classmethod
def _orchestrate_job_to_run_or_block(cls, job: Job):
if cls._is_blocked(job):
job.blocked()
cls.blocked_jobs.append(job)
else:
job.pending()
def _orchestrate_job_to_run_or_block(cls, jobs: List[Job]):
blocked_jobs = []
pending_jobs = []

for job in jobs:
if cls._is_blocked(job):
job.blocked()
blocked_jobs.append(job)
else:
job.pending()
pending_jobs.append(job)

cls.blocked_jobs.extend(blocked_jobs)
for job in pending_jobs:
cls.jobs_to_run.put(job)

@classmethod
Expand Down Expand Up @@ -214,6 +222,7 @@ def _on_status_change(cls, job: Job):
if job.is_completed() or job.is_skipped():
cls.__unblock_jobs()
elif job.is_failed():
print(f"\nJob {job.id} failed, abandoning subsequent jobs.\n")
cls._fail_subsequent_jobs(job)

@classmethod
Expand Down Expand Up @@ -286,6 +295,7 @@ def _fail_subsequent_jobs(cls, failed_job: Job):
cls.__find_subsequent_jobs(failed_job.submit_id, set(failed_job.task.output.keys()))
)
for job in to_fail_or_abandon_jobs:
print(f"Abandoning job: {job.id}")
job.abandoned()
to_fail_or_abandon_jobs.update([failed_job])
cls.__remove_blocked_jobs(to_fail_or_abandon_jobs)
Expand Down
2 changes: 1 addition & 1 deletion src/taipy/core/_repository/db/_sql_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ def init_db(cls):
from ...data._data_model import _DataNodeModel
from ...job._job_model import _JobModel
from ...scenario._scenario_model import _ScenarioModel
from ...task._task_model import _TaskModel
from ...submission._submission_model import _SubmissionModel
from ...task._task_model import _TaskModel

cls._connection.execute(
str(CreateTable(_CycleModel.__table__, if_not_exists=True).compile(dialect=sqlite.dialect()))
Expand Down
12 changes: 6 additions & 6 deletions tests/core/job/test_job_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -361,15 +361,15 @@ def test_cancel_subsequent_jobs():
job_1 = orchestrator._lock_dn_output_and_create_job(
task_1, submit_id=submission_1.id, submit_entity_id=submission_1.entity_id
)
orchestrator._orchestrate_job_to_run_or_block(job_1)
orchestrator._orchestrate_job_to_run_or_block([job_1])
job_2 = orchestrator._lock_dn_output_and_create_job(
task_2, submit_id=submission_1.id, submit_entity_id=submission_1.entity_id
)
orchestrator._orchestrate_job_to_run_or_block(job_2)
orchestrator._orchestrate_job_to_run_or_block([job_2])
job_3 = orchestrator._lock_dn_output_and_create_job(
task_3, submit_id=submission_1.id, submit_entity_id=submission_1.entity_id
)
orchestrator._orchestrate_job_to_run_or_block(job_3)
orchestrator._orchestrate_job_to_run_or_block([job_3])

submission_1.jobs = [job_1, job_2, job_3]

Expand All @@ -382,15 +382,15 @@ def test_cancel_subsequent_jobs():
job_4 = _OrchestratorFactory._orchestrator._lock_dn_output_and_create_job(
task_1, submit_id=submission_2.id, submit_entity_id=submission_2.entity_id
)
orchestrator._orchestrate_job_to_run_or_block(job_4)
orchestrator._orchestrate_job_to_run_or_block([job_4])
job_5 = _OrchestratorFactory._orchestrator._lock_dn_output_and_create_job(
task_2, submit_id=submission_2.id, submit_entity_id=submission_2.entity_id
)
orchestrator._orchestrate_job_to_run_or_block(job_5)
orchestrator._orchestrate_job_to_run_or_block([job_5])
job_6 = _OrchestratorFactory._orchestrator._lock_dn_output_and_create_job(
task_3, submit_id=submission_2.id, submit_entity_id=submission_2.entity_id
)
orchestrator._orchestrate_job_to_run_or_block(job_6)
orchestrator._orchestrate_job_to_run_or_block([job_6])

submission_2.jobs = [job_4, job_5, job_6]

Expand Down

0 comments on commit e445355

Please sign in to comment.