Skip to content

Commit

Permalink
Merge branch 'master' into workflow-version-upgrades
Browse files Browse the repository at this point in the history
  • Loading branch information
kannon92 authored Aug 11, 2023
2 parents 0392060 + bbcb3eb commit 098a92a
Show file tree
Hide file tree
Showing 5 changed files with 91 additions and 24 deletions.
16 changes: 16 additions & 0 deletions internal/scheduler/jobdb/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package jobdb
import (
"time"

"github.com/gogo/protobuf/proto"
"github.com/google/uuid"
"golang.org/x/exp/maps"
v1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -431,6 +432,21 @@ func (job *Job) WithJobSchedulingInfo(jobSchedulingInfo *schedulerobjects.JobSch
return j
}

func (job *Job) DeepCopy() *Job {
copiedSchedulingInfo := proto.Clone(job.JobSchedulingInfo()).(*schedulerobjects.JobSchedulingInfo)
j := job.WithJobSchedulingInfo(copiedSchedulingInfo)

j.runsById = maps.Clone(j.runsById)
for key, run := range j.runsById {
j.runsById[key] = run.DeepCopy()
}
if j.activeRun != nil {
j.activeRun = job.activeRun.DeepCopy()
}

return j
}

// copyJob makes a copy of the job
func copyJob(j Job) *Job {
return &j
Expand Down
20 changes: 20 additions & 0 deletions internal/scheduler/jobdb/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,26 @@ func TestJob_TestWithCreated(t *testing.T) {
assert.Equal(t, int64(456), newJob.Created())
}

func TestJob_DeepCopy(t *testing.T) {
original := NewJob("test-job", "test-jobset", "test-queue", 2, schedulingInfo, true, 0, false, false, false, 3)
original = original.WithUpdatedRun(baseJobRun.DeepCopy())
expected := NewJob("test-job", "test-jobset", "test-queue", 2, schedulingInfo, true, 0, false, false, false, 3)
expected = expected.WithUpdatedRun(baseJobRun.DeepCopy())

result := original.DeepCopy()
assert.Equal(t, expected, result)
assert.Equal(t, expected, original)

// Modify and confirm original hasn't changed
result.activeRun.nodeName = "test"
result.runsById[baseJobRun.id].nodeName = "test"
result.queue = "test"
result.jobSchedulingInfo.Priority = 1

assert.NotEqual(t, expected, result)
assert.Equal(t, expected, original)
}

func TestJob_TestWithJobSchedulingInfo(t *testing.T) {
newSchedInfo := &schedulerobjects.JobSchedulingInfo{
ObjectRequirements: []*schedulerobjects.ObjectRequirements{
Expand Down
8 changes: 8 additions & 0 deletions internal/scheduler/preempting_queue_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
schedulerconstraints "github.com/armadaproject/armada/internal/scheduler/constraints"
schedulercontext "github.com/armadaproject/armada/internal/scheduler/context"
"github.com/armadaproject/armada/internal/scheduler/interfaces"
"github.com/armadaproject/armada/internal/scheduler/jobdb"
"github.com/armadaproject/armada/internal/scheduler/nodedb"
)

Expand Down Expand Up @@ -803,6 +804,13 @@ func (evi *Evictor) Evict(ctx context.Context, it nodedb.NodeIterator) (*Evictor
if err != nil {
return nil, err
}

for i, evictedJob := range evictedJobs {
if dbJob, ok := evictedJob.(*jobdb.Job); ok {
evictedJobs[i] = dbJob.DeepCopy()
}
}

for _, job := range evictedJobs {
evictedJobsById[job.GetId()] = job
nodeIdByJobId[job.GetId()] = node.Id
Expand Down
11 changes: 10 additions & 1 deletion internal/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,9 +232,11 @@ func (s *Scheduler) cycle(ctx context.Context, updateAll bool, leaderToken Leade
isLeader := func() bool {
return s.leaderController.ValidateToken(leaderToken)
}
start := s.clock.Now()
if err := s.publisher.PublishMessages(ctx, events, isLeader); err != nil {
return err
}
log.Infof("published %d events to pulsar in %s", len(events), s.clock.Since(start))
txn.Commit()
return nil
}
Expand All @@ -244,11 +246,12 @@ func (s *Scheduler) syncState(ctx context.Context) ([]*jobdb.Job, error) {
log := ctxlogrus.Extract(ctx)
log = log.WithField("function", "syncState")

start := s.clock.Now()
updatedJobs, updatedRuns, err := s.jobRepository.FetchJobUpdates(ctx, s.jobsSerial, s.runsSerial)
if err != nil {
return nil, err
}
log.Infof("received %d updated jobs and %d updated job runs", len(updatedJobs), len(updatedRuns))
log.Infof("received %d updated jobs and %d updated job runs in %s", len(updatedJobs), len(updatedRuns), s.clock.Since(start))

txn := s.jobDb.WriteTxn()
defer txn.Abort()
Expand Down Expand Up @@ -601,6 +604,12 @@ func (s *Scheduler) generateUpdateMessagesFromJob(job *jobdb.Job, jobRunErrors m
},
}
}
if runError == nil {
panic(
fmt.Sprintf("No run error found for run %s (job id = %s), this must mean we're out of sync with the database",
lastRun.Id().String(), job.Id()),
)
}
jobErrors := &armadaevents.EventSequence_Event{
Created: s.now(),
Event: &armadaevents.EventSequence_Event_JobErrors{
Expand Down
60 changes: 37 additions & 23 deletions internal/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,15 @@ var leasedJob = jobdb.NewJob(
false,
1).WithQueued(false).WithNewRun("testExecutor", "test-node", "node")

var defaultJobRunError = &armadaevents.Error{
Terminal: true,
Reason: &armadaevents.Error_PodError{
PodError: &armadaevents.PodError{
Message: "generic pod error",
},
},
}

var (
requeuedJobId = util.NewULID()
requeuedJob = jobdb.NewJob(
Expand Down Expand Up @@ -122,29 +131,30 @@ var (
// Test a single scheduler cycle
func TestScheduler_TestCycle(t *testing.T) {
tests := map[string]struct {
initialJobs []*jobdb.Job // jobs in the jobdb at the start of the cycle
jobUpdates []database.Job // job updates from the database
runUpdates []database.Run // run updates from the database
staleExecutor bool // if true then the executorRepository will report the executor as stale
fetchError bool // if true then the jobRepository will throw an error
scheduleError bool // if true then the schedulingalgo will throw an error
publishError bool // if true the publisher will throw an error
submitCheckerFailure bool // if true the submit checker will say the job is unschedulable
expectedJobRunLeased []string // ids of jobs we expect to have produced leased messages
expectedJobRunErrors []string // ids of jobs we expect to have produced jobRunErrors messages
expectedJobErrors []string // ids of jobs we expect to have produced jobErrors messages
expectedJobRunPreempted []string // ids of jobs we expect to have produced jobRunPreempted messages
expectedJobCancelled []string // ids of jobs we expect to have produced cancelled messages
expectedJobReprioritised []string // ids of jobs we expect to have produced reprioritised messages
expectedQueued []string // ids of jobs we expect to have produced requeued messages
expectedJobSucceeded []string // ids of jobs we expect to have produced succeeeded messages
expectedLeased []string // ids of jobs we expected to be leased in jobdb at the end of the cycle
expectedRequeued []string // ids of jobs we expected to be requeued in jobdb at the end of the cycle
expectedTerminal []string // ids of jobs we expected to be terminal in jobdb at the end of the cycle
expectedJobPriority map[string]uint32 // expected priority of jobs at the end of the cycle
expectedNodeAntiAffinities []string // list of nodes there is expected to be anti affinities for on job scheduling info
expectedJobSchedulingInfoVersion int // expected scheduling info version of jobs at the end of the cycle
expectedQueuedVersion int32 // expected queued version of jobs atthe end of the cycle
initialJobs []*jobdb.Job // jobs in the jobdb at the start of the cycle
jobUpdates []database.Job // job updates from the database
runUpdates []database.Run // run updates from the database
jobRunErrors map[uuid.UUID]*armadaevents.Error // job run errors in the database
staleExecutor bool // if true then the executorRepository will report the executor as stale
fetchError bool // if true then the jobRepository will throw an error
scheduleError bool // if true then the scheduling algo will throw an error
publishError bool // if true the publisher will throw an error
submitCheckerFailure bool // if true the submit checker will say the job is unschedulable
expectedJobRunLeased []string // ids of jobs we expect to have produced leased messages
expectedJobRunErrors []string // ids of jobs we expect to have produced jobRunErrors messages
expectedJobErrors []string // ids of jobs we expect to have produced jobErrors messages
expectedJobRunPreempted []string // ids of jobs we expect to have produced jobRunPreempted messages
expectedJobCancelled []string // ids of jobs we expect to have produced cancelled messages
expectedJobReprioritised []string // ids of jobs we expect to have produced reprioritised messages
expectedQueued []string // ids of jobs we expect to have produced requeued messages
expectedJobSucceeded []string // ids of jobs we expect to have produced succeeeded messages
expectedLeased []string // ids of jobs we expected to be leased in jobdb at the end of the cycle
expectedRequeued []string // ids of jobs we expected to be requeued in jobdb at the end of the cycle
expectedTerminal []string // ids of jobs we expected to be terminal in jobdb at the end of the cycle
expectedJobPriority map[string]uint32 // expected priority of jobs at the end of the cycle
expectedNodeAntiAffinities []string // list of nodes there is expected to be anti affinities for on job scheduling info
expectedJobSchedulingInfoVersion int // expected scheduling info version of jobs at the end of the cycle
expectedQueuedVersion int32 // expected queued version of jobs at the end of the cycle
}{
"Lease a single job already in the db": {
initialJobs: []*jobdb.Job{queuedJob},
Expand Down Expand Up @@ -352,6 +362,9 @@ func TestScheduler_TestCycle(t *testing.T) {
Serial: 1,
},
},
jobRunErrors: map[uuid.UUID]*armadaevents.Error{
leasedJob.LatestRun().Id(): defaultJobRunError,
},
expectedJobErrors: []string{leasedJob.Id()},
expectedTerminal: []string{leasedJob.Id()},
expectedQueuedVersion: leasedJob.QueuedVersion(),
Expand Down Expand Up @@ -407,6 +420,7 @@ func TestScheduler_TestCycle(t *testing.T) {
jobRepo := &testJobRepository{
updatedJobs: tc.jobUpdates,
updatedRuns: tc.runUpdates,
errors: tc.jobRunErrors,
shouldError: tc.fetchError,
}
testClock := clock.NewFakeClock(time.Now())
Expand Down

0 comments on commit 098a92a

Please sign in to comment.