From f869cea9503678d2564cb2dd2bf3bf19b57c41ee Mon Sep 17 00:00:00 2001 From: Albin Severinson Date: Thu, 22 Jun 2023 09:15:32 +0100 Subject: [PATCH] Make per-queue limits apply only within the current pool (#2597) * Initial commit * Add per-pool queue limit overrides --- internal/armada/configuration/types.go | 3 +++ internal/armada/server/lease.go | 4 +++- internal/armada/server/submit_test.go | 2 +- internal/scheduler/adapters/adapters_test.go | 8 ++++---- internal/scheduler/constraints/constraints.go | 12 +++++++++--- 5 files changed, 20 insertions(+), 9 deletions(-) diff --git a/internal/armada/configuration/types.go b/internal/armada/configuration/types.go index a1a23002697..7c8da6afefc 100644 --- a/internal/armada/configuration/types.go +++ b/internal/armada/configuration/types.go @@ -240,6 +240,9 @@ type PriorityClass struct { // jobs of this priority class are not scheduled if doing so would cause the total resources assigned // to jobs of priority 10 or lower from the same queue to exceed 30% of the total. MaximumResourceFractionPerQueue map[string]float64 + // Per-pool override of MaximumResourceFractionPerQueue. + // If missing for a particular pool, MaximumResourceFractionPerQueue is used instead for that pool. + MaximumResourceFractionPerQueueByPool map[string]map[string]float64 } func (p PreemptionConfig) PriorityByPriorityClassName() map[string]int32 { diff --git a/internal/armada/server/lease.go b/internal/armada/server/lease.go index f8641d32ffc..a8559ea6932 100644 --- a/internal/armada/server/lease.go +++ b/internal/armada/server/lease.go @@ -267,7 +267,9 @@ func (q *AggregatedQueueServer) getJobs(ctx context.Context, req *api.StreamingL activeClusterReports := scheduling.FilterActiveClusters(usageReports) totalCapacity := make(armadaresource.ComputeResources) for _, clusterReport := range activeClusterReports { - totalCapacity.Add(util.GetClusterAvailableCapacity(clusterReport)) + if clusterReport.Pool == req.Pool { + totalCapacity.Add(util.GetClusterAvailableCapacity(clusterReport)) + } } // Collect all allowed priorities. diff --git a/internal/armada/server/submit_test.go b/internal/armada/server/submit_test.go index 2d6a5b7a56a..151181a54ed 100644 --- a/internal/armada/server/submit_test.go +++ b/internal/armada/server/submit_test.go @@ -1637,7 +1637,7 @@ func withSubmitServerAndRepos(action func(s *SubmitServer, jobRepo repository.Jo MaxPodSpecSizeBytes: 65535, Preemption: configuration.PreemptionConfig{ DefaultPriorityClass: "high", - PriorityClasses: map[string]configuration.PriorityClass{"high": {0, false, nil}}, + PriorityClasses: map[string]configuration.PriorityClass{"high": {0, false, nil, nil}}, }, MinTerminationGracePeriod: time.Duration(30 * time.Second), MaxTerminationGracePeriod: time.Duration(300 * time.Second), diff --git a/internal/scheduler/adapters/adapters_test.go b/internal/scheduler/adapters/adapters_test.go index b9d5ac6cc9c..943e6680472 100644 --- a/internal/scheduler/adapters/adapters_test.go +++ b/internal/scheduler/adapters/adapters_test.go @@ -18,10 +18,10 @@ import ( var ( priorityByPriorityClassName = map[string]configuration.PriorityClass{ - "priority-0": {0, true, nil}, - "priority-1": {1, true, nil}, - "priority-2": {2, true, nil}, - "priority-3": {3, false, nil}, + "priority-0": {0, true, nil, nil}, + "priority-1": {1, true, nil, nil}, + "priority-2": {2, true, nil, nil}, + "priority-3": {3, false, nil, nil}, } priority int32 = 1 diff --git a/internal/scheduler/constraints/constraints.go b/internal/scheduler/constraints/constraints.go index 32c5f44fb38..2325737962d 100644 --- a/internal/scheduler/constraints/constraints.go +++ b/internal/scheduler/constraints/constraints.go @@ -68,15 +68,21 @@ func SchedulingConstraintsFromSchedulingConfig( ) SchedulingConstraints { priorityClassSchedulingConstraintsByPriorityClassName := make(map[string]PriorityClassSchedulingConstraints, len(config.Preemption.PriorityClasses)) for name, priorityClass := range config.Preemption.PriorityClasses { + maximumResourceFractionPerQueue := priorityClass.MaximumResourceFractionPerQueue + if m, ok := priorityClass.MaximumResourceFractionPerQueueByPool[pool]; ok { + // Use pool-specific config is available. + maximumResourceFractionPerQueue = m + } priorityClassSchedulingConstraintsByPriorityClassName[name] = PriorityClassSchedulingConstraints{ PriorityClassName: name, PriorityClassPriority: priorityClass.Priority, - MaximumCumulativeResourcesPerQueue: absoluteFromRelativeLimits(totalResources, priorityClass.MaximumResourceFractionPerQueue), + MaximumCumulativeResourcesPerQueue: absoluteFromRelativeLimits(totalResources, maximumResourceFractionPerQueue), } } maximumResourceFractionToSchedule := config.MaximumResourceFractionToSchedule - if limit, ok := config.MaximumResourceFractionToScheduleByPool[pool]; ok { - maximumResourceFractionToSchedule = limit + if m, ok := config.MaximumResourceFractionToScheduleByPool[pool]; ok { + // Use pool-specific config is available. + maximumResourceFractionToSchedule = m } return SchedulingConstraints{ MaximumJobsToSchedule: config.MaximumJobsToSchedule,