-
Notifications
You must be signed in to change notification settings - Fork 136
Commit
Signed-off-by: Chris Martin <chris@cmartinit.co.uk>
- Loading branch information
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,261 +1,139 @@ | ||
package constraints | ||
|
||
import ( | ||
"math" | ||
|
||
"github.com/pkg/errors" | ||
"k8s.io/apimachinery/pkg/api/resource" | ||
|
||
"github.com/armadaproject/armada/internal/common/types" | ||
"github.com/armadaproject/armada/internal/common/util" | ||
"github.com/armadaproject/armada/internal/scheduler/configuration" | ||
schedulercontext "github.com/armadaproject/armada/internal/scheduler/context" | ||
"github.com/armadaproject/armada/internal/scheduler/schedulerobjects" | ||
"github.com/armadaproject/armada/internal/scheduler/context" | ||
"github.com/armadaproject/armada/internal/scheduler/internaltypes" | ||
"github.com/armadaproject/armada/pkg/api" | ||
) | ||
|
||
const ( | ||
// Indicates that the limit on resources scheduled per round has been exceeded. | ||
MaximumResourcesScheduledUnschedulableReason = "maximum resources scheduled" | ||
|
||
// Indicates that a queue has been assigned more than its allowed amount of resources. | ||
MaximumResourcesPerQueueExceededUnschedulableReason = "maximum total resources for this queue exceeded" | ||
|
||
// Indicates that the scheduling rate limit has been exceeded. | ||
GlobalRateLimitExceededUnschedulableReason = "global scheduling rate limit exceeded" | ||
QueueRateLimitExceededUnschedulableReason = "queue scheduling rate limit exceeded" | ||
SchedulingPausedOnQueueUnschedulableReason = "scheduling paused on queue" | ||
|
||
// Indicates that scheduling a gang would exceed the rate limit. | ||
GlobalRateLimitExceededByGangUnschedulableReason = "gang would exceed global scheduling rate limit" | ||
QueueRateLimitExceededByGangUnschedulableReason = "gang would exceed queue scheduling rate limit" | ||
|
||
// Indicates that the number of jobs in a gang exceeds the burst size. | ||
// This means the gang can not be scheduled without first increasing the burst size. | ||
GangExceedsGlobalBurstSizeUnschedulableReason = "gang cardinality too large: exceeds global max burst size" | ||
GangExceedsQueueBurstSizeUnschedulableReason = "gang cardinality too large: exceeds queue max burst size" | ||
|
||
// Indicates that jobs cannot be scheduled due current executor state | ||
GangDoesNotFitUnschedulableReason = "unable to schedule gang since minimum cardinality not met" | ||
JobDoesNotFitUnschedulableReason = "job does not fit on any node" | ||
|
||
UnschedulableReasonMaximumResourcesExceeded = "resource limit exceeded" | ||
) | ||
|
||
func UnschedulableReasonIsPropertyOfGang(reason string) bool { | ||
return reason == GangExceedsGlobalBurstSizeUnschedulableReason || reason == JobDoesNotFitUnschedulableReason || reason == GangDoesNotFitUnschedulableReason | ||
} | ||
|
||
// IsTerminalUnschedulableReason returns true if reason indicates | ||
// it's not possible to schedule any more jobs in this round. | ||
func IsTerminalUnschedulableReason(reason string) bool { | ||
return reason == MaximumResourcesScheduledUnschedulableReason || | ||
reason == GlobalRateLimitExceededUnschedulableReason | ||
} | ||
|
||
// IsTerminalQueueUnschedulableReason returns true if reason indicates | ||
// it's not possible to schedule any more jobs from this queue in this round. | ||
func IsTerminalQueueUnschedulableReason(reason string) bool { | ||
return reason == QueueRateLimitExceededUnschedulableReason || reason == SchedulingPausedOnQueueUnschedulableReason | ||
// SchedulingConstraints contains scheduling constraints, e.g. per-queue resource limits. | ||
type SchedulingConstraints interface { | ||
CheckRoundConstraints(sctx *context.SchedulingContext) (bool, string, error) | ||
CheckJobConstraints(sctx *context.SchedulingContext, gctx *context.GangSchedulingContext) (bool, string, error) | ||
CapResources(queue string, resourcesByPc map[string]internaltypes.ResourceList) map[string]internaltypes.ResourceList | ||
} | ||
|
||
// SchedulingConstraints contains scheduling constraints, e.g., per-queue resource limits. | ||
type SchedulingConstraints struct { | ||
// Max number of jobs to consider for a queue before giving up. | ||
maxQueueLookBack uint | ||
// Scheduling constraints by priority class. | ||
priorityClassSchedulingConstraintsByPriorityClassName map[string]priorityClassSchedulingConstraints | ||
// Scheduling constraints for specific queues. | ||
// If present for a particular queue, global limits (i.e., priorityClassSchedulingConstraintsByPriorityClassName) | ||
// do not apply for that queue. | ||
queueSchedulingConstraintsByQueueName map[string]queueSchedulingConstraints | ||
// Limits total resources scheduled per invocation. | ||
maximumResourcesToSchedule map[string]resource.Quantity | ||
type schedulingConstraints struct { | ||
// Limits total resources scheduled per scheduling round. | ||
maximumResourcesToSchedule internaltypes.ResourceList | ||
// Queues that are cordoned (i.e. no jobs may be scheduled on them) | ||
cordonedQueues map[string]bool | ||
// Resource limits by queue and priority class. E.g. "Queue A is limited to 100 cpu at priority class armada-default" | ||
resourceLimitsPerQueuePerPriorityClass map[string]map[string]internaltypes.ResourceList | ||
} | ||
|
||
// queueSchedulingConstraints contains per-queue scheduling constraints. | ||
type queueSchedulingConstraints struct { | ||
// Scheduling constraints by priority class. | ||
PriorityClassSchedulingConstraintsByPriorityClassName map[string]priorityClassSchedulingConstraints | ||
// Determines whether scheduling has been paused for this queue | ||
Cordoned bool | ||
} | ||
|
||
// priorityClassSchedulingConstraints contains scheduling constraints that apply to jobs of a specific priority class. | ||
type priorityClassSchedulingConstraints struct { | ||
PriorityClassName string | ||
// Limits total resources allocated to jobs of this priority class per queue. | ||
MaximumResourcesPerQueue map[string]resource.Quantity | ||
} | ||
|
||
func NewSchedulingConstraints(pool string, totalResources schedulerobjects.ResourceList, config configuration.SchedulingConfig, queues []*api.Queue, cordonStatusByQueue map[string]bool) SchedulingConstraints { | ||
priorityClassSchedulingConstraintsByPriorityClassName := make(map[string]priorityClassSchedulingConstraints, len(config.PriorityClasses)) | ||
for name, priorityClass := range config.PriorityClasses { | ||
maximumResourceFractionPerQueue := priorityClass.MaximumResourceFractionPerQueue | ||
if m, ok := priorityClass.MaximumResourceFractionPerQueueByPool[pool]; ok { | ||
// Use pool-specific config is available. | ||
maximumResourceFractionPerQueue = util.MergeMaps(maximumResourceFractionPerQueue, m) | ||
} | ||
priorityClassSchedulingConstraintsByPriorityClassName[name] = priorityClassSchedulingConstraints{ | ||
PriorityClassName: name, | ||
MaximumResourcesPerQueue: absoluteFromRelativeLimits(totalResources.Resources, maximumResourceFractionPerQueue), | ||
} | ||
} | ||
|
||
queueSchedulingConstraintsByQueueName := make(map[string]queueSchedulingConstraints, len(queues)) | ||
for _, queue := range queues { | ||
priorityClassSchedulingConstraintsByPriorityClassNameForQueue := make(map[string]priorityClassSchedulingConstraints, len(queue.ResourceLimitsByPriorityClassName)) | ||
for priorityClassName, priorityClassResourceLimits := range queue.ResourceLimitsByPriorityClassName { | ||
maximumResourceFraction := priorityClassResourceLimits.MaximumResourceFraction | ||
if m, ok := priorityClassResourceLimits.MaximumResourceFractionByPool[pool]; ok { | ||
// Use pool-specific maximum resource fraction if available. | ||
maximumResourceFraction = util.MergeMaps(maximumResourceFraction, m.MaximumResourceFraction) | ||
} | ||
priorityClassSchedulingConstraintsByPriorityClassNameForQueue[priorityClassName] = priorityClassSchedulingConstraints{ | ||
PriorityClassName: priorityClassName, | ||
MaximumResourcesPerQueue: absoluteFromRelativeLimits(totalResources.Resources, maximumResourceFraction), | ||
} | ||
} | ||
queueSchedulingConstraintsByQueueName[queue.Name] = queueSchedulingConstraints{ | ||
PriorityClassSchedulingConstraintsByPriorityClassName: priorityClassSchedulingConstraintsByPriorityClassNameForQueue, | ||
Cordoned: cordonStatusByQueue[queue.Name], | ||
} | ||
} | ||
|
||
maximumResourceFractionToSchedule := config.MaximumResourceFractionToSchedule | ||
if m, ok := config.MaximumResourceFractionToScheduleByPool[pool]; ok { | ||
// Use pool-specific config is available. | ||
maximumResourceFractionToSchedule = m | ||
} | ||
return SchedulingConstraints{ | ||
maxQueueLookBack: config.MaxQueueLookback, | ||
maximumResourcesToSchedule: absoluteFromRelativeLimits(totalResources.Resources, maximumResourceFractionToSchedule), | ||
priorityClassSchedulingConstraintsByPriorityClassName: priorityClassSchedulingConstraintsByPriorityClassName, | ||
queueSchedulingConstraintsByQueueName: queueSchedulingConstraintsByQueueName, | ||
func NewSchedulingConstraints( | ||
pool string, | ||
priorityClasses map[string]types.PriorityClass, | ||
totalResources internaltypes.ResourceList, | ||
maximumResourceFractionToSchedule float64, | ||
queues []*api.Queue, | ||
cordonStatusByQueue map[string]bool) SchedulingConstraints { | ||
return &schedulingConstraints{ | ||
cordonedQueues: cordonStatusByQueue, | ||
maximumResourcesToSchedule: totalResources.Scale(maximumResourceFractionToSchedule), | ||
resourceLimitsPerQueuePerPriorityClass: calculatePerQueueLimits(totalResources, pool, priorityClasses, queues), | ||
} | ||
} | ||
|
||
func absoluteFromRelativeLimits(totalResources map[string]resource.Quantity, relativeLimits map[string]float64) map[string]resource.Quantity { | ||
absoluteLimits := make(map[string]resource.Quantity, len(relativeLimits)) | ||
for t, f := range relativeLimits { | ||
absoluteLimits[t] = ScaleQuantity(totalResources[t].DeepCopy(), f) | ||
func (c *schedulingConstraints) CheckRoundConstraints(sctx *context.SchedulingContext) (bool, string, error) { | ||
scheduledResources := internaltypes.ResourceList{} // TODO should be sctx.ScheduledResources | ||
if scheduledResources.StrictlyLessOrEqualThan(c.maximumResourcesToSchedule) { | ||
return true, "", nil | ||
} | ||
return absoluteLimits | ||
return false, MaximumResourcesScheduled, nil | ||
Check failure on line 48 in internal/scheduler/constraints/constraints.go GitHub Actions / lint / Lint Go
Check failure on line 48 in internal/scheduler/constraints/constraints.go GitHub Actions / build / prepare
Check failure on line 48 in internal/scheduler/constraints/constraints.go GitHub Actions / test / Golang Unit Tests
|
||
} | ||
|
||
// ScaleQuantity scales q in-place by a factor f. | ||
// This functions overflows for quantities the milli value of which can't be expressed as an int64. | ||
// E.g., 1Pi is ok, but not 10Pi. | ||
func ScaleQuantity(q resource.Quantity, f float64) resource.Quantity { | ||
q.SetMilli(int64(math.Round(float64(q.MilliValue()) * f))) | ||
return q | ||
} | ||
func (c *schedulingConstraints) CheckJobConstraints(sctx *context.SchedulingContext, gctx *context.GangSchedulingContext) (bool, string, error) { | ||
|
||
func (constraints *SchedulingConstraints) CheckRoundConstraints(sctx *schedulercontext.SchedulingContext) (bool, string, error) { | ||
// maximumResourcesToSchedule check. | ||
if !isStrictlyLessOrEqual(sctx.ScheduledResources.Resources, constraints.maximumResourcesToSchedule) { | ||
return false, MaximumResourcesScheduledUnschedulableReason, nil | ||
// Queue Context can be missing if we have a job in the system where the queue has been deleted | ||
// TODO: We should ensure such jobs are filtered out well before this point! | ||
qctx, ok := sctx.QueueSchedulingContexts[gctx.Queue] | ||
if !ok { | ||
return false, "", errors.Errorf("no QueueSchedulingContext for queue %s", gctx.Queue) | ||
} | ||
return true, "", nil | ||
} | ||
|
||
func (constraints *SchedulingConstraints) CheckConstraints( | ||
sctx *schedulercontext.SchedulingContext, | ||
gctx *schedulercontext.GangSchedulingContext, | ||
) (bool, string, error) { | ||
qctx := sctx.QueueSchedulingContexts[gctx.Queue] | ||
if qctx == nil { | ||
return false, "", errors.Errorf("no QueueSchedulingContext for queue %s", gctx.Queue) | ||
// Queue cordoned | ||
if c.cordonedQueues[qctx.Queue] { | ||
return false, QueueCordoned, nil | ||
Check failure on line 62 in internal/scheduler/constraints/constraints.go GitHub Actions / lint / Lint Go
Check failure on line 62 in internal/scheduler/constraints/constraints.go GitHub Actions / build / prepare
Check failure on line 62 in internal/scheduler/constraints/constraints.go GitHub Actions / test / Golang Unit Tests
|
||
} | ||
|
||
// Global rate limiter check. | ||
tokens := sctx.Limiter.TokensAt(sctx.Started) | ||
if tokens <= 0 { | ||
return false, GlobalRateLimitExceededUnschedulableReason, nil | ||
} | ||
if sctx.Limiter.Burst() < gctx.Cardinality() { | ||
return false, GangExceedsGlobalBurstSizeUnschedulableReason, nil | ||
} | ||
if tokens < float64(gctx.Cardinality()) { | ||
return false, GlobalRateLimitExceededByGangUnschedulableReason, nil | ||
if tokens < float64(gctx.Cardinality()) || sctx.Limiter.Burst() < gctx.Cardinality() { | ||
return false, GlobalRateLimitExceeded, nil | ||
Check failure on line 68 in internal/scheduler/constraints/constraints.go GitHub Actions / lint / Lint Go
Check failure on line 68 in internal/scheduler/constraints/constraints.go GitHub Actions / build / prepare
Check failure on line 68 in internal/scheduler/constraints/constraints.go GitHub Actions / test / Golang Unit Tests
|
||
} | ||
|
||
if queueConstraints, ok := constraints.queueSchedulingConstraintsByQueueName[qctx.Queue]; ok && queueConstraints.Cordoned { | ||
return false, SchedulingPausedOnQueueUnschedulableReason, nil | ||
} | ||
// Per-queue rate limiter check. | ||
tokens = qctx.Limiter.TokensAt(sctx.Started) | ||
if tokens <= 0 { | ||
return false, QueueRateLimitExceededUnschedulableReason, nil | ||
} | ||
if qctx.Limiter.Burst() < gctx.Cardinality() { | ||
return false, GangExceedsQueueBurstSizeUnschedulableReason, nil | ||
} | ||
if tokens < float64(gctx.Cardinality()) { | ||
return false, QueueRateLimitExceededByGangUnschedulableReason, nil | ||
if tokens < float64(gctx.Cardinality()) || qctx.Limiter.Burst() < gctx.Cardinality() { | ||
return false, QueueRateLimitExceeded, nil | ||
Check failure on line 74 in internal/scheduler/constraints/constraints.go GitHub Actions / lint / Lint Go
Check failure on line 74 in internal/scheduler/constraints/constraints.go GitHub Actions / build / prepare
Check failure on line 74 in internal/scheduler/constraints/constraints.go GitHub Actions / test / Golang Unit Tests
|
||
} | ||
|
||
// queueSchedulingConstraintsByQueueName / priorityClassSchedulingConstraintsByPriorityClassName checks. | ||
overallResourceLimits := constraints.resolveResourceLimitsForQueueAndPriorityClass(gctx.Queue, gctx.PriorityClassName) | ||
if !isStrictlyLessOrEqual(qctx.AllocatedByPriorityClass[gctx.PriorityClassName].Resources, overallResourceLimits) { | ||
return false, UnschedulableReasonMaximumResourcesExceeded, nil | ||
// Resource Allocation check | ||
queueLimit, haslimit := c.resourceLimitsPerQueuePerPriorityClass[qctx.Queue][gctx.PriorityClassName] | ||
var allocatedResources internaltypes.ResourceList = qctx.AllocatedByPriorityClass[gctx.PriorityClassName].Resources | ||
Check failure on line 79 in internal/scheduler/constraints/constraints.go GitHub Actions / lint / Lint Go
Check failure on line 79 in internal/scheduler/constraints/constraints.go GitHub Actions / build / prepare
Check failure on line 79 in internal/scheduler/constraints/constraints.go GitHub Actions / test / Golang Unit Tests
Check failure on line 79 in internal/scheduler/constraints/constraints.go GitHub Actions / test / Golang Integration Tests
|
||
if haslimit && !allocatedResources.StrictlyLessOrEqualThan(queueLimit) { | ||
return false, MaximumResourcesExceeded, nil | ||
Check failure on line 81 in internal/scheduler/constraints/constraints.go GitHub Actions / lint / Lint Go
Check failure on line 81 in internal/scheduler/constraints/constraints.go GitHub Actions / build / prepare
Check failure on line 81 in internal/scheduler/constraints/constraints.go GitHub Actions / test / Golang Unit Tests
|
||
} | ||
|
||
return true, "", nil | ||
} | ||
|
||
func (constraints *SchedulingConstraints) CapResources(queue string, resourcesByPc schedulerobjects.QuantityByTAndResourceType[string]) schedulerobjects.QuantityByTAndResourceType[string] { | ||
cappedResourcesByPc := schedulerobjects.QuantityByTAndResourceType[string]{} | ||
func (c *schedulingConstraints) CapResources(queue string, resourcesByPc map[string]internaltypes.ResourceList) map[string]internaltypes.ResourceList { | ||
perQueueLimit, ok := c.resourceLimitsPerQueuePerPriorityClass[queue] | ||
if !ok { | ||
return resourcesByPc | ||
} | ||
cappedResources := make(map[string]internaltypes.ResourceList, len(resourcesByPc)) | ||
for pc, resources := range resourcesByPc { | ||
overallResourceLimits := constraints.resolveResourceLimitsForQueueAndPriorityClass(queue, pc) | ||
cappedResources := make(map[string]resource.Quantity, len(resources.Resources)) | ||
for resourceName, qty := range resources.Resources { | ||
limit, ok := overallResourceLimits[resourceName] | ||
if ok && qty.Cmp(limit) == 1 { | ||
cappedResources[resourceName] = limit | ||
} else { | ||
cappedResources[resourceName] = qty | ||
} | ||
limits, ok := perQueueLimit[pc] | ||
if !ok { | ||
cappedResources[pc] = resources | ||
} else { | ||
resources.Cap(limits) | ||
} | ||
cappedResourcesByPc[pc] = schedulerobjects.ResourceList{Resources: cappedResources} | ||
} | ||
return cappedResourcesByPc | ||
return cappedResources | ||
} | ||
|
||
func (constraints *SchedulingConstraints) resolveResourceLimitsForQueueAndPriorityClass(queue string, priorityClass string) map[string]resource.Quantity { | ||
queueAndPriorityClassResourceLimits := constraints.getQueueAndPriorityClassResourceLimits(queue, priorityClass) | ||
priorityClassResourceLimits := constraints.getPriorityClassResourceLimits(priorityClass) | ||
return util.MergeMaps(priorityClassResourceLimits, queueAndPriorityClassResourceLimits) | ||
} | ||
|
||
func (constraints *SchedulingConstraints) getQueueAndPriorityClassResourceLimits(queue string, priorityClass string) map[string]resource.Quantity { | ||
if queueConstraint, ok := constraints.queueSchedulingConstraintsByQueueName[queue]; ok { | ||
if priorityClassConstraint, ok := queueConstraint.PriorityClassSchedulingConstraintsByPriorityClassName[priorityClass]; ok { | ||
return priorityClassConstraint.MaximumResourcesPerQueue | ||
} | ||
} | ||
return map[string]resource.Quantity{} | ||
} | ||
func calculatePerQueueLimits(totalResources internaltypes.ResourceList, pool string, priorityClasses map[string]types.PriorityClass, queues []*api.Queue) map[string]map[string]internaltypes.ResourceList { | ||
|
||
func (constraints *SchedulingConstraints) getPriorityClassResourceLimits(priorityClass string) map[string]resource.Quantity { | ||
if priorityClassConstraint, ok := constraints.priorityClassSchedulingConstraintsByPriorityClassName[priorityClass]; ok { | ||
return priorityClassConstraint.MaximumResourcesPerQueue | ||
// First we work out the default limit per pool | ||
defaultScalingFactorsByPc := map[string]map[string]float64{} | ||
defaultResourceLimitsByPc := map[string]internaltypes.ResourceList{} | ||
for pcName, pc := range priorityClasses { | ||
defaultLimit := pc.MaximumResourceFractionPerQueue | ||
poolLimit := pc.MaximumResourceFractionPerQueueByPool[pool] | ||
defaultScalingFactors := util.MergeMaps(defaultLimit, poolLimit) | ||
defaultScalingFactorsByPc[pcName] = defaultScalingFactors | ||
defaultResourceLimitsByPc[pcName] = totalResources.ScaleCustom(defaultScalingFactors) | ||
} | ||
return map[string]resource.Quantity{} | ||
} | ||
|
||
func (constraints *SchedulingConstraints) GetMaxQueueLookBack() uint { | ||
return constraints.maxQueueLookBack | ||
} | ||
limitsPerQueuePerPc := make(map[string]map[string]internaltypes.ResourceList, len(queues)) | ||
|
||
// isStrictlyLessOrEqual returns false if | ||
// - there is a quantity in b greater than that in a or | ||
// - there is a non-zero quantity in b not in a | ||
// and true otherwise. | ||
func isStrictlyLessOrEqual(a map[string]resource.Quantity, b map[string]resource.Quantity) bool { | ||
for t, q := range b { | ||
if q.Cmp(a[t]) == -1 { | ||
return false | ||
// Then we go apply any queue-level overrides | ||
for _, queue := range queues { | ||
// There are no queue-specific limits | ||
if len(queue.ResourceLimitsByPriorityClassName) == 0 { | ||
limitsPerQueuePerPc[queue.Name] = defaultResourceLimitsByPc | ||
} else { | ||
for pc, _ := range priorityClasses { | ||
queueLimits, ok := queue.ResourceLimitsByPriorityClassName[pc] | ||
if ok { | ||
defaultFraction := defaultScalingFactorsByPc[pc] | ||
fractionLimit := util.MergeMaps(defaultFraction, queueLimits.MaximumResourceFraction) | ||
fractionLimit = util.MergeMaps(fractionLimit, queueLimits.MaximumResourceFractionByPool[pool].GetMaximumResourceFraction()) | ||
limitsPerQueuePerPc[queue.Name][pc] = defaultResourceLimitsByPc[pc].ScaleCustom(fractionLimit) | ||
} else { | ||
limitsPerQueuePerPc[queue.Name][pc] = defaultResourceLimitsByPc[pc] | ||
} | ||
} | ||
} | ||
} | ||
return true | ||
return limitsPerQueuePerPc | ||
} |