Skip to content

Commit

Permalink
Prevent scheduling from mutating the jobDb (#2838)
Browse files Browse the repository at this point in the history
* Make scheduling not mutate the jobDb

Preempted/evicted jobs get mutated as part of the scheduling loop

This causes a bug where preemptible jobs are always retried on the same node:
 - As the nodeSelector is mutated as part of scheduling to point at a specific node

To fix this, we now deepcopy the evicted jobs as part of the scheduling round. Meaning the jobDb objects don't get mutated and prevents these mutations being persisted between scheduling rounds

* Remove comment

* Remove comment

---------

Co-authored-by: owenthomas17 <2owen.thomas@gmail.com>
  • Loading branch information
JamesMurkin and owenthomas17 authored Aug 11, 2023
1 parent b13cdd8 commit b85fe50
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 0 deletions.
16 changes: 16 additions & 0 deletions internal/scheduler/jobdb/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package jobdb
import (
"time"

"github.com/gogo/protobuf/proto"
"github.com/google/uuid"
"golang.org/x/exp/maps"
v1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -431,6 +432,21 @@ func (job *Job) WithJobSchedulingInfo(jobSchedulingInfo *schedulerobjects.JobSch
return j
}

func (job *Job) DeepCopy() *Job {
copiedSchedulingInfo := proto.Clone(job.JobSchedulingInfo()).(*schedulerobjects.JobSchedulingInfo)
j := job.WithJobSchedulingInfo(copiedSchedulingInfo)

j.runsById = maps.Clone(j.runsById)
for key, run := range j.runsById {
j.runsById[key] = run.DeepCopy()
}
if j.activeRun != nil {
j.activeRun = job.activeRun.DeepCopy()
}

return j
}

// copyJob makes a copy of the job
func copyJob(j Job) *Job {
return &j
Expand Down
20 changes: 20 additions & 0 deletions internal/scheduler/jobdb/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,26 @@ func TestJob_TestWithCreated(t *testing.T) {
assert.Equal(t, int64(456), newJob.Created())
}

func TestJob_DeepCopy(t *testing.T) {
original := NewJob("test-job", "test-jobset", "test-queue", 2, schedulingInfo, true, 0, false, false, false, 3)
original = original.WithUpdatedRun(baseJobRun.DeepCopy())
expected := NewJob("test-job", "test-jobset", "test-queue", 2, schedulingInfo, true, 0, false, false, false, 3)
expected = expected.WithUpdatedRun(baseJobRun.DeepCopy())

result := original.DeepCopy()
assert.Equal(t, expected, result)
assert.Equal(t, expected, original)

// Modify and confirm original hasn't changed
result.activeRun.nodeName = "test"
result.runsById[baseJobRun.id].nodeName = "test"
result.queue = "test"
result.jobSchedulingInfo.Priority = 1

assert.NotEqual(t, expected, result)
assert.Equal(t, expected, original)
}

func TestJob_TestWithJobSchedulingInfo(t *testing.T) {
newSchedInfo := &schedulerobjects.JobSchedulingInfo{
ObjectRequirements: []*schedulerobjects.ObjectRequirements{
Expand Down
8 changes: 8 additions & 0 deletions internal/scheduler/preempting_queue_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
schedulerconstraints "github.com/armadaproject/armada/internal/scheduler/constraints"
schedulercontext "github.com/armadaproject/armada/internal/scheduler/context"
"github.com/armadaproject/armada/internal/scheduler/interfaces"
"github.com/armadaproject/armada/internal/scheduler/jobdb"
"github.com/armadaproject/armada/internal/scheduler/nodedb"
)

Expand Down Expand Up @@ -803,6 +804,13 @@ func (evi *Evictor) Evict(ctx context.Context, it nodedb.NodeIterator) (*Evictor
if err != nil {
return nil, err
}

for i, evictedJob := range evictedJobs {
if dbJob, ok := evictedJob.(*jobdb.Job); ok {
evictedJobs[i] = dbJob.DeepCopy()
}
}

for _, job := range evictedJobs {
evictedJobsById[job.GetId()] = job
nodeIdByJobId[job.GetId()] = node.Id
Expand Down

0 comments on commit b85fe50

Please sign in to comment.