Skip to content

Commit

Permalink
[armada-scheduler] Fix a bug in syncState causing follower state to b…
Browse files Browse the repository at this point in the history
…e wrong (#2757)

The bug is causing the follower armada-scheduler to accumulate non-terminate jobs, where the leader was correctly marking these jobs are terminated

The bug would happen if we receive an update that marks the job as terminal at the same time we receive a job_run update for that job.
 - We would then delete the job (as it is terminal) and then immediately add the job back as it had run update
 - This wouldn't cause an issue in the leader, as it already considered the job terminal so would go down a different code path

This PR is the minimal fix for the above issue - which is to upsert job updates before we try to delete jobs
  • Loading branch information
JamesMurkin authored Jul 26, 2023
1 parent a43d4e7 commit d7e5a05
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 9 deletions.
4 changes: 2 additions & 2 deletions internal/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,11 +306,11 @@ func (s *Scheduler) syncState(ctx context.Context) ([]*jobdb.Job, error) {
}

jobsToUpdate := maps.Values(jobsToUpdateById)
err = s.jobDb.BatchDelete(txn, jobsToDelete)
err = s.jobDb.Upsert(txn, jobsToUpdate)
if err != nil {
return nil, err
}
err = s.jobDb.Upsert(txn, jobsToUpdate)
err = s.jobDb.BatchDelete(txn, jobsToDelete)
if err != nil {
return nil, err
}
Expand Down
22 changes: 15 additions & 7 deletions internal/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -734,20 +734,28 @@ func TestScheduler_TestSyncState(t *testing.T) {
expectedJobDbIds: []string{queuedJob.Id()},
},
"job succeeded": {
initialJobs: []*jobdb.Job{queuedJob},
initialJobs: []*jobdb.Job{leasedJob},
jobUpdates: []database.Job{
{
JobID: queuedJob.Id(),
JobSet: queuedJob.Jobset(),
Queue: queuedJob.Queue(),
Submitted: queuedJob.Created(),
Priority: int64(queuedJob.Priority()),
JobID: leasedJob.Id(),
JobSet: leasedJob.Jobset(),
Queue: leasedJob.Queue(),
Submitted: leasedJob.Created(),
Priority: int64(leasedJob.Priority()),
SchedulingInfo: schedulingInfoBytes,
Succeeded: true,
Serial: 1,
},
},
expectedUpdatedJobs: []*jobdb.Job{},
runUpdates: []database.Run{
{
RunID: leasedJob.LatestRun().Id(),
JobID: leasedJob.LatestRun().JobId(),
JobSet: leasedJob.GetJobSet(),
Succeeded: true,
},
},
expectedUpdatedJobs: []*jobdb.Job{leasedJob.WithUpdatedRun(leasedJob.LatestRun().WithSucceeded(true))},
expectedJobDbIds: []string{},
},
"job requeued": {
Expand Down

0 comments on commit d7e5a05

Please sign in to comment.