Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scheduler: tidy, more to internaltypes.ResourceList #4038

Merged
merged 3 commits into from
Nov 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 15 additions & 15 deletions internal/scheduler/scheduling/context/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
armadaslices "github.com/armadaproject/armada/internal/common/slices"
"github.com/armadaproject/armada/internal/scheduler/internaltypes"
"github.com/armadaproject/armada/internal/scheduler/jobdb"
"github.com/armadaproject/armada/internal/scheduler/schedulerobjects"
)

// QueueSchedulingContext captures the decisions made by the scheduler during one invocation
Expand Down Expand Up @@ -48,9 +47,9 @@ type QueueSchedulingContext struct {
// Includes jobs scheduled during this invocation of the scheduler.
AllocatedByPriorityClass map[string]internaltypes.ResourceList
// Resources assigned to this queue during this scheduling cycle.
ScheduledResourcesByPriorityClass schedulerobjects.QuantityByTAndResourceType[string]
ScheduledResourcesByPriorityClass map[string]internaltypes.ResourceList
// Resources evicted from this queue during this scheduling cycle.
EvictedResourcesByPriorityClass schedulerobjects.QuantityByTAndResourceType[string]
EvictedResourcesByPriorityClass map[string]internaltypes.ResourceList
// Job scheduling contexts associated with successful scheduling attempts.
SuccessfulJobSchedulingContexts map[string]*JobSchedulingContext
// Job scheduling contexts associated with unsuccessful scheduling attempts.
Expand Down Expand Up @@ -82,10 +81,10 @@ func (qctx *QueueSchedulingContext) ReportString(verbosity int32) string {
fmt.Fprintf(w, "Time:\t%s\n", qctx.Created)
fmt.Fprintf(w, "Queue:\t%s\n", qctx.Queue)
}
fmt.Fprintf(w, "Scheduled resources:\t%s\n", qctx.ScheduledResourcesByPriorityClass.AggregateByResource().CompactString())
fmt.Fprintf(w, "Scheduled resources (by priority):\t%s\n", qctx.ScheduledResourcesByPriorityClass.String())
fmt.Fprintf(w, "Preempted resources:\t%s\n", qctx.EvictedResourcesByPriorityClass.AggregateByResource().CompactString())
fmt.Fprintf(w, "Preempted resources (by priority):\t%s\n", qctx.EvictedResourcesByPriorityClass.String())
fmt.Fprintf(w, "Scheduled resources:\t%s\n", internaltypes.RlMapSumValues(qctx.ScheduledResourcesByPriorityClass).String())
fmt.Fprintf(w, "Scheduled resources (by priority):\t%s\n", internaltypes.RlMapToString(qctx.ScheduledResourcesByPriorityClass))
fmt.Fprintf(w, "Preempted resources:\t%s\n", internaltypes.RlMapSumValues(qctx.EvictedResourcesByPriorityClass).String())
fmt.Fprintf(w, "Preempted resources (by priority):\t%s\n", internaltypes.RlMapToString(qctx.EvictedResourcesByPriorityClass))
if verbosity >= 0 {
fmt.Fprintf(w, "Total allocated resources after scheduling:\t%s\n", qctx.Allocated.String())
for pc, res := range qctx.AllocatedByPriorityClass {
Expand Down Expand Up @@ -171,17 +170,18 @@ func (qctx *QueueSchedulingContext) addJobSchedulingContext(jctx *JobSchedulingC
// Always update ResourcesByPriority.
// Since ResourcesByPriority is used to order queues by fraction of fair share.
pcName := jctx.Job.PriorityClassName()
qctx.AllocatedByPriorityClass[pcName] = qctx.AllocatedByPriorityClass[pcName].Add(jctx.Job.AllResourceRequirements())
qctx.Allocated = qctx.Allocated.Add(jctx.Job.AllResourceRequirements())
rl := jctx.Job.AllResourceRequirements()
qctx.AllocatedByPriorityClass[pcName] = qctx.AllocatedByPriorityClass[pcName].Add(rl)
qctx.Allocated = qctx.Allocated.Add(rl)

// Only if the job is not evicted, update ScheduledResourcesByPriority.
// Since ScheduledResourcesByPriority is used to control per-round scheduling constraints.
if evictedInThisRound {
delete(qctx.EvictedJobsById, jctx.JobId)
qctx.EvictedResourcesByPriorityClass.SubV1ResourceList(jctx.Job.PriorityClassName(), jctx.PodRequirements.ResourceRequirements.Requests)
qctx.EvictedResourcesByPriorityClass[pcName] = qctx.EvictedResourcesByPriorityClass[pcName].Subtract(rl)
} else {
qctx.SuccessfulJobSchedulingContexts[jctx.JobId] = jctx
qctx.ScheduledResourcesByPriorityClass.AddV1ResourceList(jctx.Job.PriorityClassName(), jctx.PodRequirements.ResourceRequirements.Requests)
qctx.ScheduledResourcesByPriorityClass[pcName] = qctx.ScheduledResourcesByPriorityClass[pcName].Add(rl)
}
} else {
qctx.UnsuccessfulJobSchedulingContexts[jctx.JobId] = jctx
Expand All @@ -197,16 +197,16 @@ func (qctx *QueueSchedulingContext) evictJob(job *jobdb.Job) (bool, error) {
if _, ok := qctx.EvictedJobsById[jobId]; ok {
return false, errors.Errorf("failed evicting job %s from queue: job already marked evicted", jobId)
}
rl := job.ResourceRequirements().Requests
pcName := job.PriorityClassName()
rl := job.AllResourceRequirements()
_, scheduledInThisRound := qctx.SuccessfulJobSchedulingContexts[jobId]
if scheduledInThisRound {
qctx.ScheduledResourcesByPriorityClass.SubV1ResourceList(job.PriorityClassName(), rl)
qctx.ScheduledResourcesByPriorityClass[pcName] = qctx.ScheduledResourcesByPriorityClass[pcName].Subtract(rl)
delete(qctx.SuccessfulJobSchedulingContexts, jobId)
} else {
qctx.EvictedResourcesByPriorityClass.AddV1ResourceList(job.PriorityClassName(), rl)
qctx.EvictedResourcesByPriorityClass[pcName] = qctx.EvictedResourcesByPriorityClass[pcName].Add(rl)
qctx.EvictedJobsById[jobId] = true
}
pcName := job.PriorityClassName()
qctx.AllocatedByPriorityClass[pcName] = qctx.AllocatedByPriorityClass[pcName].Subtract(job.AllResourceRequirements())
qctx.Allocated = qctx.Allocated.Subtract(job.AllResourceRequirements())

Expand Down
43 changes: 18 additions & 25 deletions internal/scheduler/scheduling/context/scheduling.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,9 @@ type SchedulingContext struct {
// Allocated resources across all clusters in this pool
Allocated internaltypes.ResourceList
// Resources assigned across all queues during this scheduling cycle.
ScheduledResources internaltypes.ResourceList
ScheduledResourcesByPriorityClass schedulerobjects.QuantityByTAndResourceType[string]
ScheduledResources internaltypes.ResourceList
// Resources evicted across all queues during this scheduling cycle.
EvictedResources schedulerobjects.ResourceList
EvictedResourcesByPriorityClass schedulerobjects.QuantityByTAndResourceType[string]
EvictedResources internaltypes.ResourceList
// Total number of successfully scheduled jobs.
NumScheduledJobs int
// Total number of successfully scheduled gangs.
Expand All @@ -71,17 +69,16 @@ func NewSchedulingContext(
totalResources internaltypes.ResourceList,
) *SchedulingContext {
return &SchedulingContext{
Started: time.Now(),
Pool: pool,
FairnessCostProvider: fairnessCostProvider,
Limiter: limiter,
QueueSchedulingContexts: make(map[string]*QueueSchedulingContext),
TotalResources: totalResources,
ScheduledResources: internaltypes.ResourceList{},
ScheduledResourcesByPriorityClass: make(schedulerobjects.QuantityByTAndResourceType[string]),
EvictedResourcesByPriorityClass: make(schedulerobjects.QuantityByTAndResourceType[string]),
SchedulingKeyGenerator: schedulerobjects.NewSchedulingKeyGenerator(),
UnfeasibleSchedulingKeys: make(map[schedulerobjects.SchedulingKey]*JobSchedulingContext),
Started: time.Now(),
Pool: pool,
FairnessCostProvider: fairnessCostProvider,
Limiter: limiter,
QueueSchedulingContexts: make(map[string]*QueueSchedulingContext),
TotalResources: totalResources,
ScheduledResources: internaltypes.ResourceList{},
EvictedResources: internaltypes.ResourceList{},
SchedulingKeyGenerator: schedulerobjects.NewSchedulingKeyGenerator(),
UnfeasibleSchedulingKeys: make(map[schedulerobjects.SchedulingKey]*JobSchedulingContext),
}
}

Expand Down Expand Up @@ -125,8 +122,8 @@ func (sctx *SchedulingContext) AddQueueSchedulingContext(
Demand: demand,
CappedDemand: cappedDemand,
AllocatedByPriorityClass: initialAllocatedByPriorityClass,
ScheduledResourcesByPriorityClass: make(schedulerobjects.QuantityByTAndResourceType[string]),
EvictedResourcesByPriorityClass: make(schedulerobjects.QuantityByTAndResourceType[string]),
ScheduledResourcesByPriorityClass: make(map[string]internaltypes.ResourceList),
EvictedResourcesByPriorityClass: make(map[string]internaltypes.ResourceList),
SuccessfulJobSchedulingContexts: make(map[string]*JobSchedulingContext),
UnsuccessfulJobSchedulingContexts: make(map[string]*JobSchedulingContext),
EvictedJobsById: make(map[string]bool),
Expand Down Expand Up @@ -221,7 +218,7 @@ func (sctx *SchedulingContext) ReportString(verbosity int32) string {
fmt.Fprintf(w, "Termination reason:\t%s\n", sctx.TerminationReason)
fmt.Fprintf(w, "Total capacity:\t%s\n", sctx.TotalResources.String())
fmt.Fprintf(w, "Scheduled resources:\t%s\n", sctx.ScheduledResources.String())
fmt.Fprintf(w, "Preempted resources:\t%s\n", sctx.EvictedResources.CompactString())
fmt.Fprintf(w, "Preempted resources:\t%s\n", sctx.EvictedResources.String())
fmt.Fprintf(w, "Number of gangs scheduled:\t%d\n", sctx.NumScheduledGangs)
fmt.Fprintf(w, "Number of jobs scheduled:\t%d\n", sctx.NumScheduledJobs)
fmt.Fprintf(w, "Number of jobs preempted:\t%d\n", sctx.NumEvictedJobs)
Expand Down Expand Up @@ -293,12 +290,10 @@ func (sctx *SchedulingContext) AddJobSchedulingContext(jctx *JobSchedulingContex
}
if jctx.IsSuccessful() {
if evictedInThisRound {
sctx.EvictedResources.SubV1ResourceList(jctx.PodRequirements.ResourceRequirements.Requests)
sctx.EvictedResourcesByPriorityClass.SubV1ResourceList(jctx.Job.PriorityClassName(), jctx.PodRequirements.ResourceRequirements.Requests)
sctx.EvictedResources = sctx.EvictedResources.Subtract(jctx.Job.AllResourceRequirements())
sctx.NumEvictedJobs--
} else {
sctx.ScheduledResources = sctx.ScheduledResources.Add(jctx.Job.AllResourceRequirements())
sctx.ScheduledResourcesByPriorityClass.AddV1ResourceList(jctx.Job.PriorityClassName(), jctx.PodRequirements.ResourceRequirements.Requests)
sctx.NumScheduledJobs++
}
sctx.Allocated = sctx.Allocated.Add(jctx.Job.AllResourceRequirements())
Expand Down Expand Up @@ -340,14 +335,12 @@ func (sctx *SchedulingContext) EvictJob(jctx *JobSchedulingContext) (bool, error
if err != nil {
return false, err
}
rl := jctx.Job.ResourceRequirements().Requests

if scheduledInThisRound {
sctx.ScheduledResources = sctx.ScheduledResources.Subtract(jctx.Job.AllResourceRequirements())
sctx.ScheduledResourcesByPriorityClass.SubV1ResourceList(jctx.Job.PriorityClassName(), rl)
sctx.NumScheduledJobs--
} else {
sctx.EvictedResources.AddV1ResourceList(rl)
sctx.EvictedResourcesByPriorityClass.AddV1ResourceList(jctx.Job.PriorityClassName(), rl)
sctx.EvictedResources = sctx.EvictedResources.Add(jctx.Job.AllResourceRequirements())
sctx.NumEvictedJobs++
}
sctx.Allocated = sctx.Allocated.Subtract(jctx.Job.AllResourceRequirements())
Expand Down