Skip to content

Commit

Permalink
Remove calls to GetRequirements where possible
Browse files Browse the repository at this point in the history
  • Loading branch information
zuqq committed Jun 26, 2023
1 parent e810638 commit 1c3ae70
Show file tree
Hide file tree
Showing 10 changed files with 63 additions and 70 deletions.
2 changes: 1 addition & 1 deletion internal/armada/server/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ func (q *AggregatedQueueServer) getJobs(ctx context.Context, req *api.StreamingL

// Group gangs.
for _, job := range jobs {
gangId, _, isGangJob, err := scheduler.GangIdAndCardinalityFromLegacySchedulerJob(job, q.schedulingConfig.Preemption.PriorityClasses)
gangId, _, isGangJob, err := scheduler.GangIdAndCardinalityFromLegacySchedulerJob(job)
if err != nil {
return nil, err
}
Expand Down
35 changes: 6 additions & 29 deletions internal/scheduler/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,7 @@ func JobsSummary(jobs []interfaces.LegacySchedulerJob) string {
func(jobs []interfaces.LegacySchedulerJob) schedulerobjects.ResourceList {
rv := schedulerobjects.NewResourceListWithDefaultSize()
for _, job := range jobs {
req := PodRequirementFromLegacySchedulerJob(job, nil)
if req == nil {
continue
}
rv.AddV1ResourceList(req.ResourceRequirements.Requests)
rv.AddV1ResourceList(job.GetResourceRequirements().Requests)
}
return rv
},
Expand Down Expand Up @@ -142,28 +138,17 @@ func isEvictedJob(job interfaces.LegacySchedulerJob) bool {
}

func targetNodeIdFromLegacySchedulerJob(job interfaces.LegacySchedulerJob) (string, bool) {
req := PodRequirementFromLegacySchedulerJob(job, nil)
if req == nil {
nodeSelector := job.GetNodeSelector()
if nodeSelector == nil {
return "", false
}
nodeId, ok := req.NodeSelector[schedulerconfig.NodeIdLabel]
nodeId, ok := nodeSelector[schedulerconfig.NodeIdLabel]
return nodeId, ok
}

// GangIdAndCardinalityFromLegacySchedulerJob returns a tuple (gangId, gangCardinality, isGangJob, error).
func GangIdAndCardinalityFromLegacySchedulerJob(job interfaces.LegacySchedulerJob, priorityClasses map[string]configuration.PriorityClass) (string, int, bool, error) {
reqs := job.GetRequirements(priorityClasses)
if reqs == nil {
return "", 0, false, nil
}
if len(reqs.ObjectRequirements) != 1 {
return "", 0, false, errors.Errorf("expected exactly one object requirement in %v", reqs)
}
podReqs := reqs.ObjectRequirements[0].GetPodRequirements()
if podReqs == nil {
return "", 0, false, nil
}
return GangIdAndCardinalityFromAnnotations(podReqs.Annotations)
func GangIdAndCardinalityFromLegacySchedulerJob(job interfaces.LegacySchedulerJob) (string, int, bool, error) {
return GangIdAndCardinalityFromAnnotations(job.GetAnnotations())
}

// GangIdAndCardinalityFromAnnotations returns a tuple (gangId, gangCardinality, isGangJob, error).
Expand Down Expand Up @@ -228,14 +213,6 @@ func PodRequirementFromLegacySchedulerJob[E interfaces.LegacySchedulerJob](job E
return req
}

func PodRequirementsFromJobSchedulingInfos(infos []*schedulerobjects.JobSchedulingInfo) []*schedulerobjects.PodRequirements {
rv := make([]*schedulerobjects.PodRequirements, 0, len(infos))
for _, info := range infos {
rv = append(rv, PodRequirementFromJobSchedulingInfo(info))
}
return rv
}

func PodRequirementFromJobSchedulingInfo(info *schedulerobjects.JobSchedulingInfo) *schedulerobjects.PodRequirements {
for _, oreq := range info.ObjectRequirements {
if preq := oreq.GetPodRequirements(); preq != nil {
Expand Down
2 changes: 1 addition & 1 deletion internal/scheduler/context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -460,13 +460,13 @@ func (qctx *QueueSchedulingContext) AddJobSchedulingContext(jctx *JobSchedulingC

func (qctx *QueueSchedulingContext) EvictJob(job interfaces.LegacySchedulerJob) (bool, error) {
jobId := job.GetId()
rl := job.GetResourceRequirements().Requests
if _, ok := qctx.UnsuccessfulJobSchedulingContexts[jobId]; ok {
return false, errors.Errorf("failed evicting job %s from queue: job already marked unsuccessful", jobId)
}
if _, ok := qctx.EvictedJobsById[jobId]; ok {
return false, errors.Errorf("failed evicting job %s from queue: job already marked evicted", jobId)
}
rl := job.GetResourceRequirements().Requests
_, scheduledInThisRound := qctx.SuccessfulJobSchedulingContexts[jobId]
if scheduledInThisRound {
qctx.ScheduledResourcesByPriorityClass.SubV1ResourceList(job.GetPriorityClassName(), rl)
Expand Down
4 changes: 4 additions & 0 deletions internal/scheduler/interfaces/interfaces.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package interfaces

import (
"time"

v1 "k8s.io/api/core/v1"

"github.com/armadaproject/armada/internal/armada/configuration"
Expand All @@ -12,6 +14,8 @@ type LegacySchedulerJob interface {
GetId() string
GetQueue() string
GetJobSet() string
GetPerQueuePriority() uint32
GetSubmitTime() time.Time
GetAnnotations() map[string]string
GetRequirements(map[string]configuration.PriorityClass) *schedulerobjects.JobSchedulingInfo
GetPriorityClassName() string
Expand Down
13 changes: 13 additions & 0 deletions internal/scheduler/jobdb/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,19 @@ func (job *Job) Priority() uint32 {
return job.priority
}

// GetPerQueuePriority exists for compatibility with the LegacyJob interface.
func (job *Job) GetPerQueuePriority() uint32 {
return job.priority
}

// GetSubmitTime exists for compatibility with the LegacyJob interface.
func (job *Job) GetSubmitTime() time.Time {
if job.jobSchedulingInfo == nil {
return time.Time{}
}
return job.jobSchedulingInfo.SubmitTime
}

// RequestedPriority returns the requested priority of the job.
func (job *Job) RequestedPriority() uint32 {
return job.requestedPriority
Expand Down
14 changes: 7 additions & 7 deletions internal/scheduler/jobiteration.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,23 +88,23 @@ func (repo *InMemoryJobRepository) Enqueue(job interfaces.LegacySchedulerJob) {
// finally by submit time, with earlier submit times first.
func (repo *InMemoryJobRepository) sortQueue(queue string) {
slices.SortFunc(repo.jobsByQueue[queue], func(a, b interfaces.LegacySchedulerJob) bool {
infoa := a.GetRequirements(repo.priorityClasses)
infob := b.GetRequirements(repo.priorityClasses)
if repo.sortByPriorityClass {
pca := repo.priorityClasses[infoa.PriorityClassName]
pcb := repo.priorityClasses[infob.PriorityClassName]
pca := repo.priorityClasses[a.GetPriorityClassName()]
pcb := repo.priorityClasses[b.GetPriorityClassName()]
if pca.Priority > pcb.Priority {
return true
} else if pca.Priority < pcb.Priority {
return false
}
}
if infoa.GetPriority() < infob.GetPriority() {
pa := a.GetPerQueuePriority()
pb := b.GetPerQueuePriority()
if pa < pb {
return true
} else if infoa.GetPriority() > infob.GetPriority() {
} else if pa > pb {
return false
}
return infoa.GetSubmitTime().Before(infob.GetSubmitTime())
return a.GetSubmitTime().Before(b.GetSubmitTime())
})
}

Expand Down
27 changes: 14 additions & 13 deletions internal/scheduler/preempting_queue_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,26 +432,27 @@ func (sch *PreemptingQueueScheduler) evictionAssertions(evictedJobsById map[stri
}
evictedJobIdsByGangId := make(map[string]map[string]bool)
for _, job := range evictedJobsById {
if gangId, ok := sch.gangIdByJobId[job.GetId()]; ok {
jobId := job.GetId()
if gangId, ok := sch.gangIdByJobId[jobId]; ok {
if m := evictedJobIdsByGangId[gangId]; m != nil {
m[job.GetId()] = true
m[jobId] = true
} else {
evictedJobIdsByGangId[gangId] = map[string]bool{job.GetId(): true}
evictedJobIdsByGangId[gangId] = map[string]bool{jobId: true}
}
}
if !isEvictedJob(job) {
return errors.Errorf("evicted job %s is not marked as such: job annotations %v", job.GetId(), job.GetAnnotations())
return errors.Errorf("evicted job %s is not marked as such: job annotations %v", jobId, job.GetAnnotations())
}
if nodeId, ok := targetNodeIdFromLegacySchedulerJob(job); ok {
if _, ok := affectedNodesById[nodeId]; !ok {
return errors.Errorf("node id %s targeted by job %s is not marked as affected", nodeId, job.GetId())
return errors.Errorf("node id %s targeted by job %s is not marked as affected", nodeId, jobId)
}
} else {
req := PodRequirementFromLegacySchedulerJob(job, nil)
if req != nil {
return errors.Errorf("evicted job %s is missing target node id selector: job nodeSelector %v", job.GetId(), req.NodeSelector)
nodeSelector := job.GetNodeSelector()
if nodeSelector != nil {
return errors.Errorf("evicted job %s is missing target node id selector: job nodeSelector %v", jobId, nodeSelector)
} else {
return errors.Errorf("evicted job %s is missing target node id selector: req is nil", job.GetId())
return errors.Errorf("evicted job %s is missing target node id selector: req is nil", jobId)
}
}
}
Expand Down Expand Up @@ -552,7 +553,7 @@ func (sch *PreemptingQueueScheduler) updateGangAccounting(preemptedJobs, schedul
}
}
for _, job := range scheduledJobs {
gangId, _, isGangJob, err := GangIdAndCardinalityFromLegacySchedulerJob(job, sch.schedulingContext.PriorityClasses)
gangId, _, isGangJob, err := GangIdAndCardinalityFromLegacySchedulerJob(job)
if err != nil {
return err
}
Expand Down Expand Up @@ -701,7 +702,7 @@ func NewPreemptibleEvictor(
log.Warnf("can't evict job %s: annotations not initialised", job.GetId())
return false
}
priorityClassName := job.GetRequirements(priorityClasses).PriorityClassName
priorityClassName := job.GetPriorityClassName()
priorityClass, ok := priorityClasses[priorityClassName]
if !ok {
priorityClass = priorityClasses[defaultPriorityClass]
Expand Down Expand Up @@ -786,7 +787,7 @@ func NewOversubscribedEvictor(
log.Warnf("can't evict job %s: annotations not initialised", job.GetId())
return false
}
priorityClassName := job.GetRequirements(priorityClasses).PriorityClassName
priorityClassName := job.GetPriorityClassName()
priorityClass, ok := priorityClasses[priorityClassName]
if !ok {
priorityClass = priorityClasses[defaultPriorityClass]
Expand Down Expand Up @@ -874,7 +875,7 @@ func defaultPostEvictFunc(ctx context.Context, job interfaces.LegacySchedulerJob

// Add a toleration to allow the job to be re-scheduled even if node is unschedulable.
//
// TODO: Because req is created with a new tolerations slice above, this toleration doesn't persist.
// TODO: Because req is allocated by GetRequirements() if job is an api.Job, this toleration may not persist.
// In practice, this isn't an issue now since we don't check static requirements for evicted jobs.
if node.Unschedulable {
req.Tolerations = append(req.Tolerations, nodedb.UnschedulableToleration())
Expand Down
2 changes: 1 addition & 1 deletion internal/scheduler/queue_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -577,7 +577,7 @@ func TestQueueScheduler(t *testing.T) {
continue
}
assert.Equal(t, nodeDb.NumNodes(), pctx.NumNodes)
_, _, isGangJob, err := GangIdAndCardinalityFromLegacySchedulerJob(jctx.Job, nil)
_, _, isGangJob, err := GangIdAndCardinalityFromLegacySchedulerJob(jctx.Job)
require.NoError(t, err)
if !isGangJob {
numExcludedNodes := 0
Expand Down
2 changes: 1 addition & 1 deletion internal/scheduler/scheduling_algo.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ func (l *FairSchedulingAlgo) newFairSchedulingAlgoContext(ctx context.Context, t
}
jobsByExecutorId[executorId] = append(jobsByExecutorId[executorId], job)
nodeIdByJobId[job.Id()] = nodeId
gangId, _, isGangJob, err := GangIdAndCardinalityFromLegacySchedulerJob(job, l.config.Preemption.PriorityClasses)
gangId, _, isGangJob, err := GangIdAndCardinalityFromLegacySchedulerJob(job)
if err != nil {
return nil, err
}
Expand Down
32 changes: 15 additions & 17 deletions pkg/api/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,19 @@ func JobRunStateFromApiJobState(s JobState) schedulerobjects.JobRunState {
return schedulerobjects.JobRunState_UNKNOWN
}

func NewNodeTypeFromNodeInfo(nodeInfo *NodeInfo, indexedTaints map[string]interface{}, indexedLabels map[string]interface{}) *schedulerobjects.NodeType {
return schedulerobjects.NewNodeType(nodeInfo.GetTaints(), nodeInfo.GetLabels(), indexedTaints, indexedLabels)
func (job *Job) GetPerQueuePriority() uint32 {
priority := job.Priority
if priority < 0 {
return 0
}
if priority > math.MaxUint32 {
return math.MaxUint32
}
return uint32(math.Round(priority))
}

func (job *Job) GetSubmitTime() time.Time {
return job.Created
}

func (job *Job) GetRequirements(priorityClasses map[string]configuration.PriorityClass) *schedulerobjects.JobSchedulingInfo {
Expand Down Expand Up @@ -132,8 +143,8 @@ func (job *Job) GetRequirements(priorityClasses map[string]configuration.Priorit
}
return &schedulerobjects.JobSchedulingInfo{
PriorityClassName: podSpec.PriorityClassName,
Priority: LogSubmitPriorityFromApiPriority(job.GetPriority()),
SubmitTime: job.GetCreated(),
Priority: job.GetPerQueuePriority(),
SubmitTime: job.GetSubmitTime(),
ObjectRequirements: []*schedulerobjects.ObjectRequirements{
{
Requirements: &schedulerobjects.ObjectRequirements_PodRequirements{
Expand Down Expand Up @@ -249,19 +260,6 @@ func (job *Job) GetJobSet() string {
return job.JobSetId
}

// LogSubmitPriorityFromApiPriority returns the uint32 representation of the priority included with a submitted job,
// or an error if the conversion fails.
func LogSubmitPriorityFromApiPriority(priority float64) uint32 {
if priority < 0 {
priority = 0
}
if priority > math.MaxUint32 {
priority = math.MaxUint32
}
priority = math.Round(priority)
return uint32(priority)
}

func (job *Job) GetMainPodSpec() *v1.PodSpec {
if job.PodSpec != nil {
return job.PodSpec
Expand Down

0 comments on commit 1c3ae70

Please sign in to comment.