Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove calls to GetRequirements where possible #2608

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion internal/armada/server/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ func (q *AggregatedQueueServer) getJobs(ctx context.Context, req *api.StreamingL

// Group gangs.
for _, job := range jobs {
gangId, _, isGangJob, err := scheduler.GangIdAndCardinalityFromLegacySchedulerJob(job, q.schedulingConfig.Preemption.PriorityClasses)
gangId, _, isGangJob, err := scheduler.GangIdAndCardinalityFromLegacySchedulerJob(job)
if err != nil {
return nil, err
}
Expand Down
39 changes: 6 additions & 33 deletions internal/scheduler/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,7 @@ func JobsSummary(jobs []interfaces.LegacySchedulerJob) string {
func(jobs []interfaces.LegacySchedulerJob) schedulerobjects.ResourceList {
rv := schedulerobjects.NewResourceListWithDefaultSize()
for _, job := range jobs {
req := PodRequirementFromLegacySchedulerJob(job, nil)
if req == nil {
continue
}
rv.AddV1ResourceList(req.ResourceRequirements.Requests)
rv.AddV1ResourceList(job.GetResourceRequirements().Requests)
}
return rv
},
Expand Down Expand Up @@ -141,29 +137,14 @@ func isEvictedJob(job interfaces.LegacySchedulerJob) bool {
return job.GetAnnotations()[schedulerconfig.IsEvictedAnnotation] == "true"
}

func targetNodeIdFromLegacySchedulerJob(job interfaces.LegacySchedulerJob) (string, bool) {
req := PodRequirementFromLegacySchedulerJob(job, nil)
if req == nil {
return "", false
}
nodeId, ok := req.NodeSelector[schedulerconfig.NodeIdLabel]
func targetNodeIdFromNodeSelector(nodeSelector map[string]string) (string, bool) {
nodeId, ok := nodeSelector[schedulerconfig.NodeIdLabel]
return nodeId, ok
}

// GangIdAndCardinalityFromLegacySchedulerJob returns a tuple (gangId, gangCardinality, isGangJob, error).
func GangIdAndCardinalityFromLegacySchedulerJob(job interfaces.LegacySchedulerJob, priorityClasses map[string]configuration.PriorityClass) (string, int, bool, error) {
reqs := job.GetRequirements(priorityClasses)
if reqs == nil {
return "", 0, false, nil
}
if len(reqs.ObjectRequirements) != 1 {
return "", 0, false, errors.Errorf("expected exactly one object requirement in %v", reqs)
}
podReqs := reqs.ObjectRequirements[0].GetPodRequirements()
if podReqs == nil {
return "", 0, false, nil
}
return GangIdAndCardinalityFromAnnotations(podReqs.Annotations)
func GangIdAndCardinalityFromLegacySchedulerJob(job interfaces.LegacySchedulerJob) (string, int, bool, error) {
return GangIdAndCardinalityFromAnnotations(job.GetAnnotations())
}

// GangIdAndCardinalityFromAnnotations returns a tuple (gangId, gangCardinality, isGangJob, error).
Expand Down Expand Up @@ -222,20 +203,12 @@ func PodRequirementFromLegacySchedulerJob[E interfaces.LegacySchedulerJob](job E
}
annotations[schedulerconfig.JobIdAnnotation] = job.GetId()
annotations[schedulerconfig.QueueAnnotation] = job.GetQueue()
info := job.GetRequirements(priorityClasses)
info := job.GetJobSchedulingInfo(priorityClasses)
req := PodRequirementFromJobSchedulingInfo(info)
req.Annotations = annotations
return req
}

func PodRequirementsFromJobSchedulingInfos(infos []*schedulerobjects.JobSchedulingInfo) []*schedulerobjects.PodRequirements {
rv := make([]*schedulerobjects.PodRequirements, 0, len(infos))
for _, info := range infos {
rv = append(rv, PodRequirementFromJobSchedulingInfo(info))
}
return rv
}

func PodRequirementFromJobSchedulingInfo(info *schedulerobjects.JobSchedulingInfo) *schedulerobjects.PodRequirements {
for _, oreq := range info.ObjectRequirements {
if preq := oreq.GetPodRequirements(); preq != nil {
Expand Down
16 changes: 1 addition & 15 deletions internal/scheduler/context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"github.com/pkg/errors"
"golang.org/x/exp/maps"
"golang.org/x/exp/slices"
v1 "k8s.io/api/core/v1"

"github.com/armadaproject/armada/internal/armada/configuration"
"github.com/armadaproject/armada/internal/common/armadaerrors"
Expand Down Expand Up @@ -461,13 +460,13 @@ func (qctx *QueueSchedulingContext) AddJobSchedulingContext(jctx *JobSchedulingC

func (qctx *QueueSchedulingContext) EvictJob(job interfaces.LegacySchedulerJob) (bool, error) {
jobId := job.GetId()
_, rl := priorityAndRequestsFromLegacySchedulerJob(job, qctx.SchedulingContext.PriorityClasses)
if _, ok := qctx.UnsuccessfulJobSchedulingContexts[jobId]; ok {
return false, errors.Errorf("failed evicting job %s from queue: job already marked unsuccessful", jobId)
}
if _, ok := qctx.EvictedJobsById[jobId]; ok {
return false, errors.Errorf("failed evicting job %s from queue: job already marked evicted", jobId)
}
rl := job.GetResourceRequirements().Requests
_, scheduledInThisRound := qctx.SuccessfulJobSchedulingContexts[jobId]
if scheduledInThisRound {
qctx.ScheduledResourcesByPriorityClass.SubV1ResourceList(job.GetPriorityClassName(), rl)
Expand All @@ -481,19 +480,6 @@ func (qctx *QueueSchedulingContext) EvictJob(job interfaces.LegacySchedulerJob)
return scheduledInThisRound, nil
}

// TODO: Remove?
func priorityAndRequestsFromLegacySchedulerJob(job interfaces.LegacySchedulerJob, priorityClasses map[string]configuration.PriorityClass) (int32, v1.ResourceList) {
req := job.GetRequirements(priorityClasses)
for _, r := range req.ObjectRequirements {
podReqs := r.GetPodRequirements()
if podReqs == nil {
continue
}
return podReqs.Priority, podReqs.ResourceRequirements.Requests
}
return 0, nil
}

// ClearJobSpecs zeroes out job specs to reduce memory usage.
func (qctx *QueueSchedulingContext) ClearJobSpecs() {
for _, jctx := range qctx.SuccessfulJobSchedulingContexts {
Expand Down
2 changes: 1 addition & 1 deletion internal/scheduler/context/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,6 @@ func testSmallCpuJobSchedulingContext(queue, priorityClassName string) *JobSched
NumNodes: 1,
JobId: job.GetId(),
Job: job,
Req: job.GetRequirements(nil).ObjectRequirements[0].GetPodRequirements(),
Req: job.GetJobSchedulingInfo(nil).ObjectRequirements[0].GetPodRequirements(),
}
}
19 changes: 5 additions & 14 deletions internal/scheduler/interfaces/interfaces.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package interfaces

import (
"time"

v1 "k8s.io/api/core/v1"

"github.com/armadaproject/armada/internal/armada/configuration"
Expand All @@ -12,24 +14,13 @@ type LegacySchedulerJob interface {
GetId() string
GetQueue() string
GetJobSet() string
GetPerQueuePriority() uint32
GetSubmitTime() time.Time
GetAnnotations() map[string]string
GetRequirements(map[string]configuration.PriorityClass) *schedulerobjects.JobSchedulingInfo
GetJobSchedulingInfo(map[string]configuration.PriorityClass) *schedulerobjects.JobSchedulingInfo
GetPriorityClassName() string
GetNodeSelector() map[string]string
GetAffinity() *v1.Affinity
GetTolerations() []v1.Toleration
GetResourceRequirements() v1.ResourceRequirements
}

func PodRequirementFromLegacySchedulerJob(job LegacySchedulerJob, priorityClasses map[string]configuration.PriorityClass) *schedulerobjects.PodRequirements {
schedulingInfo := job.GetRequirements(priorityClasses)
if schedulingInfo == nil {
return nil
}
for _, objectReq := range schedulingInfo.ObjectRequirements {
if req := objectReq.GetPodRequirements(); req != nil {
return req
}
}
return nil
}
15 changes: 14 additions & 1 deletion internal/scheduler/jobdb/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,19 @@ func (job *Job) Priority() uint32 {
return job.priority
}

// GetPerQueuePriority exists for compatibility with the LegacyJob interface.
func (job *Job) GetPerQueuePriority() uint32 {
return job.priority
}

// GetSubmitTime exists for compatibility with the LegacyJob interface.
func (job *Job) GetSubmitTime() time.Time {
if job.jobSchedulingInfo == nil {
return time.Time{}
}
return job.jobSchedulingInfo.SubmitTime
}

// RequestedPriority returns the requested priority of the job.
func (job *Job) RequestedPriority() uint32 {
return job.requestedPriority
Expand Down Expand Up @@ -161,7 +174,7 @@ func (job *Job) GetAnnotations() map[string]string {

// GetRequirements returns the scheduling requirements associated with the job.
// Needed for compatibility with interfaces.LegacySchedulerJob
func (job *Job) GetRequirements(_ map[string]configuration.PriorityClass) *schedulerobjects.JobSchedulingInfo {
func (job *Job) GetJobSchedulingInfo(_ map[string]configuration.PriorityClass) *schedulerobjects.JobSchedulingInfo {
return job.JobSchedulingInfo()
}

Expand Down
2 changes: 1 addition & 1 deletion internal/scheduler/jobdb/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func TestJob_TestGetter(t *testing.T) {
assert.Equal(t, baseJob.queue, baseJob.Queue())
assert.Equal(t, baseJob.queue, baseJob.GetQueue())
assert.Equal(t, baseJob.created, baseJob.Created())
assert.Equal(t, schedulingInfo, baseJob.GetRequirements(nil))
assert.Equal(t, schedulingInfo, baseJob.GetJobSchedulingInfo(nil))
assert.Equal(t, schedulingInfo, baseJob.JobSchedulingInfo())
assert.Equal(t, baseJob.GetAnnotations(), map[string]string{
"foo": "bar",
Expand Down
14 changes: 7 additions & 7 deletions internal/scheduler/jobiteration.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,23 +88,23 @@ func (repo *InMemoryJobRepository) Enqueue(job interfaces.LegacySchedulerJob) {
// finally by submit time, with earlier submit times first.
func (repo *InMemoryJobRepository) sortQueue(queue string) {
slices.SortFunc(repo.jobsByQueue[queue], func(a, b interfaces.LegacySchedulerJob) bool {
infoa := a.GetRequirements(repo.priorityClasses)
infob := b.GetRequirements(repo.priorityClasses)
if repo.sortByPriorityClass {
pca := repo.priorityClasses[infoa.PriorityClassName]
pcb := repo.priorityClasses[infob.PriorityClassName]
pca := repo.priorityClasses[a.GetPriorityClassName()]
pcb := repo.priorityClasses[b.GetPriorityClassName()]
if pca.Priority > pcb.Priority {
return true
} else if pca.Priority < pcb.Priority {
return false
}
}
if infoa.GetPriority() < infob.GetPriority() {
pa := a.GetPerQueuePriority()
pb := b.GetPerQueuePriority()
if pa < pb {
return true
} else if infoa.GetPriority() > infob.GetPriority() {
} else if pa > pb {
return false
}
return infoa.GetSubmitTime().Before(infob.GetSubmitTime())
return a.GetSubmitTime().Before(b.GetSubmitTime())
})
}

Expand Down
29 changes: 13 additions & 16 deletions internal/scheduler/preempting_queue_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,27 +432,24 @@ func (sch *PreemptingQueueScheduler) evictionAssertions(evictedJobsById map[stri
}
evictedJobIdsByGangId := make(map[string]map[string]bool)
for _, job := range evictedJobsById {
if gangId, ok := sch.gangIdByJobId[job.GetId()]; ok {
jobId := job.GetId()
if gangId, ok := sch.gangIdByJobId[jobId]; ok {
if m := evictedJobIdsByGangId[gangId]; m != nil {
m[job.GetId()] = true
m[jobId] = true
} else {
evictedJobIdsByGangId[gangId] = map[string]bool{job.GetId(): true}
evictedJobIdsByGangId[gangId] = map[string]bool{jobId: true}
}
}
if !isEvictedJob(job) {
return errors.Errorf("evicted job %s is not marked as such: job annotations %v", job.GetId(), job.GetAnnotations())
return errors.Errorf("evicted job %s is not marked as such: job annotations %v", jobId, job.GetAnnotations())
}
if nodeId, ok := targetNodeIdFromLegacySchedulerJob(job); ok {
nodeSelector := job.GetNodeSelector()
if nodeId, ok := targetNodeIdFromNodeSelector(nodeSelector); ok {
if _, ok := affectedNodesById[nodeId]; !ok {
return errors.Errorf("node id %s targeted by job %s is not marked as affected", nodeId, job.GetId())
return errors.Errorf("node id %s targeted by job %s is not marked as affected", nodeId, jobId)
}
} else {
req := PodRequirementFromLegacySchedulerJob(job, nil)
if req != nil {
return errors.Errorf("evicted job %s is missing target node id selector: job nodeSelector %v", job.GetId(), req.NodeSelector)
} else {
return errors.Errorf("evicted job %s is missing target node id selector: req is nil", job.GetId())
}
return errors.Errorf("evicted job %s is missing target node id selector: job nodeSelector %v", jobId, nodeSelector)
}
}
for gangId, evictedGangJobIds := range evictedJobIdsByGangId {
Expand Down Expand Up @@ -552,7 +549,7 @@ func (sch *PreemptingQueueScheduler) updateGangAccounting(preemptedJobs, schedul
}
}
for _, job := range scheduledJobs {
gangId, _, isGangJob, err := GangIdAndCardinalityFromLegacySchedulerJob(job, sch.schedulingContext.PriorityClasses)
gangId, _, isGangJob, err := GangIdAndCardinalityFromLegacySchedulerJob(job)
if err != nil {
return err
}
Expand Down Expand Up @@ -701,7 +698,7 @@ func NewPreemptibleEvictor(
log.Warnf("can't evict job %s: annotations not initialised", job.GetId())
return false
}
priorityClassName := job.GetRequirements(priorityClasses).PriorityClassName
priorityClassName := job.GetPriorityClassName()
priorityClass, ok := priorityClasses[priorityClassName]
if !ok {
priorityClass = priorityClasses[defaultPriorityClass]
Expand Down Expand Up @@ -786,7 +783,7 @@ func NewOversubscribedEvictor(
log.Warnf("can't evict job %s: annotations not initialised", job.GetId())
return false
}
priorityClassName := job.GetRequirements(priorityClasses).PriorityClassName
priorityClassName := job.GetPriorityClassName()
priorityClass, ok := priorityClasses[priorityClassName]
if !ok {
priorityClass = priorityClasses[defaultPriorityClass]
Expand Down Expand Up @@ -874,7 +871,7 @@ func defaultPostEvictFunc(ctx context.Context, job interfaces.LegacySchedulerJob

// Add a toleration to allow the job to be re-scheduled even if node is unschedulable.
//
// TODO: Because req is created with a new tolerations slice above, this toleration doesn't persist.
// TODO: Because req is allocated by GetJobSchedulingInfo() if job is an api.Job, this toleration may not persist.
// In practice, this isn't an issue now since we don't check static requirements for evicted jobs.
if node.Unschedulable {
req.Tolerations = append(req.Tolerations, nodedb.UnschedulableToleration())
Expand Down
2 changes: 1 addition & 1 deletion internal/scheduler/queue_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -577,7 +577,7 @@ func TestQueueScheduler(t *testing.T) {
continue
}
assert.Equal(t, nodeDb.NumNodes(), pctx.NumNodes)
_, _, isGangJob, err := GangIdAndCardinalityFromLegacySchedulerJob(jctx.Job, nil)
_, _, isGangJob, err := GangIdAndCardinalityFromLegacySchedulerJob(jctx.Job)
require.NoError(t, err)
if !isGangJob {
numExcludedNodes := 0
Expand Down
2 changes: 1 addition & 1 deletion internal/scheduler/scheduling_algo.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ func (l *FairSchedulingAlgo) newFairSchedulingAlgoContext(ctx context.Context, t
}
jobsByExecutorId[executorId] = append(jobsByExecutorId[executorId], job)
nodeIdByJobId[job.Id()] = nodeId
gangId, _, isGangJob, err := GangIdAndCardinalityFromLegacySchedulerJob(job, l.config.Preemption.PriorityClasses)
gangId, _, isGangJob, err := GangIdAndCardinalityFromLegacySchedulerJob(job)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions internal/scheduler/testfixtures/testfixtures.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ func WithRequestsPodReqs(rl schedulerobjects.ResourceList, reqs []*schedulerobje

func WithNodeSelectorJobs(selector map[string]string, jobs []*jobdb.Job) []*jobdb.Job {
for _, job := range jobs {
for _, req := range job.GetRequirements(nil).GetObjectRequirements() {
for _, req := range job.GetJobSchedulingInfo(nil).GetObjectRequirements() {
req.GetPodRequirements().NodeSelector = maps.Clone(selector)
}
}
Expand All @@ -284,7 +284,7 @@ func WithGangAnnotationsJobs(jobs []*jobdb.Job) []*jobdb.Job {

func WithAnnotationsJobs(annotations map[string]string, jobs []*jobdb.Job) []*jobdb.Job {
for _, job := range jobs {
for _, req := range job.GetRequirements(nil).GetObjectRequirements() {
for _, req := range job.GetJobSchedulingInfo(nil).GetObjectRequirements() {
if req.GetPodRequirements().Annotations == nil {
req.GetPodRequirements().Annotations = make(map[string]string)
}
Expand Down
Loading