diff --git a/config/armada/config.yaml b/config/armada/config.yaml index 90b8af17421..fbd15ba5476 100644 --- a/config/armada/config.yaml +++ b/config/armada/config.yaml @@ -31,6 +31,13 @@ eventsApiRedis: poolSize: 1000 scheduling: enableAssertions: true + fairnessModel: "AssetFairness" + dominantResourceFairnessResourcesToConsider: + - "cpu" + - "memory" + - "nvidia.com/gpu" + resourceScarcity: + cpu: 1.0 preemption: nodeEvictionProbability: 1.0 nodeOversubscriptionEvictionProbability: 1.0 @@ -43,8 +50,8 @@ scheduling: priority: 1000 preemptible: false maximumResourceFractionPerQueue: - memory: 0.99 - cpu: 0.99 + memory: 1.0 + cpu: 1.0 armada-preemptible: priority: 1000 preemptible: true @@ -54,7 +61,7 @@ scheduling: maxExtraNodesToConsider: 1 maximumResourceFractionToSchedule: memory: 1.0 - cpu: 1.0 + cpu: 1.0 maxJobSchedulingContextsPerExecutor: 10000 lease: expireAfter: 15m @@ -69,11 +76,6 @@ scheduling: value: "true" effect: "NoSchedule" defaultJobTolerationsByPriorityClass: - "": - - key: "armadaproject.io/pc-armada-default" - operator: "Equal" - value: "true" - effect: "NoSchedule" armada-default: - key: "armadaproject.io/pc-armada-default" operator: "Equal" @@ -85,8 +87,6 @@ scheduling: value: "true" effect: "NoSchedule" maxRetries: 5 - resourceScarcity: - cpu: 1.0 maxPodSpecSizeBytes: 65535 minJobResources: memory: 1Mi diff --git a/config/scheduler/config.yaml b/config/scheduler/config.yaml index c05b6e1ebf4..13a095be02f 100644 --- a/config/scheduler/config.yaml +++ b/config/scheduler/config.yaml @@ -49,6 +49,13 @@ grpc: scheduling: executorTimeout: 10m enableAssertions: true + fairnessModel: "AssetFairness" + dominantResourceFairnessResourcesToConsider: + - "cpu" + - "memory" + - "nvidia.com/gpu" + resourceScarcity: + cpu: 1.0 preemption: alwaysAttemptScheduling: false enabled: true @@ -60,8 +67,8 @@ scheduling: priority: 1000 preemptible: false maximumResourceFractionPerQueue: - memory: 0.99 - cpu: 0.99 + memory: 1.0 + cpu: 1.0 armada-preemptible: priority: 1000 preemptible: true @@ -85,11 +92,6 @@ scheduling: value: "true" effect: "NoSchedule" defaultJobTolerationsByPriorityClass: - "": - - key: "armadaproject.io/pc-armada-default" - operator: "Equal" - value: "true" - effect: "NoSchedule" armada-default: - key: "armadaproject.io/pc-armada-default" operator: "Equal" @@ -101,8 +103,6 @@ scheduling: value: "true" effect: "NoSchedule" maxRetries: 5 - resourceScarcity: - cpu: 1.0 indexedResources: - name: "cpu" resolution: "100m" diff --git a/internal/armada/configuration/types.go b/internal/armada/configuration/types.go index cf9db749e82..80f4ed4ec45 100644 --- a/internal/armada/configuration/types.go +++ b/internal/armada/configuration/types.go @@ -113,7 +113,11 @@ type SchedulingConfig struct { DefaultJobTolerationsByResourceRequest map[string][]v1.Toleration // Maximum number of times a job is retried before considered failed. MaxRetries uint - // Weights used when computing fair share. + // Controls how fairness is calculated. Can be either AssetFairness or DominantResourceFairness. + FairnessModel FairnessModel + // List of resource names, e.g., []string{"cpu", "memory"}, to consider when computing DominantResourceFairness. + DominantResourceFairnessResourcesToConsider []string + // Weights used to compute fair share when using AssetFairness. // Overrides dynamic scarcity calculation if provided. // Applies to both the new and old scheduler. ResourceScarcity map[string]float64 @@ -187,6 +191,20 @@ type SchedulingConfig struct { AlwaysAttemptScheduling bool } +// FairnessModel controls how fairness is computed. +// More specifically, each queue has a cost associated with it and the next job to schedule +// is taken from the queue with smallest cost. FairnessModel determines how that cost is computed. +type FairnessModel string + +const ( + // AssetFairness sets the cost associated with a queue to a linear combination of its total allocation. + // E.g., w_CPU * "CPU allocation" + w_memory * "memory allocation". + AssetFairness FairnessModel = "AssetFairness" + // DominantResourceFairness set the cost associated with a queue to + // max("CPU allocation" / "CPU capacity", "memory allocation" / "mamory capacity", ...). + DominantResourceFairness FairnessModel = "DominantResourceFairness" +) + type IndexedResource struct { // Resource name. E.g., "cpu", "memory", or "nvidia.com/gpu". Name string diff --git a/internal/armada/server/lease.go b/internal/armada/server/lease.go index 448068c98e7..ac130332620 100644 --- a/internal/armada/server/lease.go +++ b/internal/armada/server/lease.go @@ -296,6 +296,16 @@ func (q *AggregatedQueueServer) getJobs(ctx context.Context, req *api.StreamingL apiQueues[i] = &api.Queue{Name: queue.Name} } + // Record which queues are active, i.e., have jobs either queued or running. + queuesWithJobsQueued, err := q.jobRepository.FilterActiveQueues(apiQueues) + if err != nil { + return nil, err + } + isActiveByQueueName := make(map[string]bool, len(queuesWithJobsQueued)) + for _, queue := range queuesWithJobsQueued { + isActiveByQueueName[queue.Name] = true + } + // Nodes to be considered by the scheduler. lastSeen := q.clock.Now() nodes := make([]*schedulerobjects.Node, 0, len(req.Nodes)) @@ -393,6 +403,11 @@ func (q *AggregatedQueueServer) getJobs(ctx context.Context, req *api.StreamingL nodeIdByJobId[job.Id] = node.Id } nodes = append(nodes, node) + + // Record which queues have jobs running. Necessary to omit inactive queues. + for _, job := range jobs { + isActiveByQueueName[job.Queue] = true + } } nodeDb, err := nodedb.NewNodeDb( q.schedulingConfig.Preemption.PriorityClasses, @@ -466,7 +481,14 @@ func (q *AggregatedQueueServer) getJobs(ctx context.Context, req *api.StreamingL q.schedulingConfig.ResourceScarcity, schedulerobjects.ResourceList{Resources: totalCapacity}, ) + if q.schedulingConfig.FairnessModel == configuration.DominantResourceFairness { + sctx.EnableDominantResourceFairness(q.schedulingConfig.DominantResourceFairnessResourcesToConsider) + } for queue, priorityFactor := range priorityFactorByQueue { + if !isActiveByQueueName[queue] { + // To ensure fair share is computed only from active queues, i.e., queues with jobs queued or running. + continue + } var weight float64 = 1 if priorityFactor > 0 { weight = 1 / priorityFactor diff --git a/internal/scheduler/context/context.go b/internal/scheduler/context/context.go index 6c5372bf362..1b545f4fa9f 100644 --- a/internal/scheduler/context/context.go +++ b/internal/scheduler/context/context.go @@ -10,6 +10,7 @@ import ( "github.com/pkg/errors" "golang.org/x/exp/maps" "golang.org/x/exp/slices" + "k8s.io/apimachinery/pkg/api/resource" "github.com/armadaproject/armada/internal/armada/configuration" "github.com/armadaproject/armada/internal/common/armadaerrors" @@ -34,12 +35,16 @@ type SchedulingContext struct { PriorityClasses map[string]configuration.PriorityClass // Default priority class. DefaultPriorityClass string - // Weights used when computing total resource usage. + // Determines how fairness is computed. + FairnessModel configuration.FairnessModel + // Resources considered when computing DominantResourceFairness. + DominantResourceFairnessResourcesToConsider []string + // Weights used when computing AssetFairness. ResourceScarcity map[string]float64 + // Sum of queue weights across all queues. + WeightSum float64 // Per-queue scheduling contexts. QueueSchedulingContexts map[string]*QueueSchedulingContext - // Sum of weights across all queues. - WeightSum float64 // Total resources across all clusters available at the start of the scheduling cycle. TotalResources schedulerobjects.ResourceList // = TotalResources.AsWeightedMillis(ResourceScarcity). @@ -81,6 +86,7 @@ func NewSchedulingContext( Pool: pool, PriorityClasses: priorityClasses, DefaultPriorityClass: defaultPriorityClass, + FairnessModel: configuration.AssetFairness, ResourceScarcity: resourceScarcity, QueueSchedulingContexts: make(map[string]*QueueSchedulingContext), TotalResources: totalResources.DeepCopy(), @@ -93,6 +99,11 @@ func NewSchedulingContext( } } +func (sctx *SchedulingContext) EnableDominantResourceFairness(dominantResourceFairnessResourcesToConsider []string) { + sctx.FairnessModel = configuration.DominantResourceFairness + sctx.DominantResourceFairnessResourcesToConsider = dominantResourceFairnessResourcesToConsider +} + func (sctx *SchedulingContext) SchedulingKeyFromLegacySchedulerJob(job interfaces.LegacySchedulerJob) schedulerobjects.SchedulingKey { var priority int32 if priorityClass, ok := sctx.PriorityClasses[job.GetPriorityClassName()]; ok { @@ -151,6 +162,15 @@ func (sctx *SchedulingContext) String() string { return sctx.ReportString(0) } +// TotalCost returns the sum of the costs across all queues. +func (sctx *SchedulingContext) TotalCost() float64 { + var rv float64 + for _, qctx := range sctx.QueueSchedulingContexts { + rv += qctx.TotalCostForQueue() + } + return rv +} + func (sctx *SchedulingContext) ReportString(verbosity int32) string { var sb strings.Builder w := tabwriter.NewWriter(&sb, 1, 1, 1, ' ', 0) @@ -496,17 +516,48 @@ func (qctx *QueueSchedulingContext) ClearJobSpecs() { } } -// FractionOfFairShare returns a number in [0, 1] indicating what fraction of its fair share this queue is allocated. -func (qctx *QueueSchedulingContext) FractionOfFairShare() float64 { - return qctx.FractionOfFairShareWithAllocation(qctx.Allocated) +// TotalCostForQueue returns the total cost of this queue. +func (qctx *QueueSchedulingContext) TotalCostForQueue() float64 { + return qctx.TotalCostForQueueWithAllocation(qctx.Allocated) } -// FractionOfFairShareWithAllocation returns a number in [0, 1] indicating what -// fraction of its fair share this queue is allocated if the total allocation of this queue is given by allocated. -func (qctx *QueueSchedulingContext) FractionOfFairShareWithAllocation(allocated schedulerobjects.ResourceList) float64 { - fairShare := qctx.Weight / qctx.SchedulingContext.WeightSum - allocatedAsWeightedMillis := allocated.AsWeightedMillis(qctx.SchedulingContext.ResourceScarcity) - return (float64(allocatedAsWeightedMillis) / float64(qctx.SchedulingContext.TotalResourcesAsWeightedMillis)) / fairShare +// TotalCostForQueueWithAllocation returns the total cost of this queue if its total allocation is given by allocated. +func (qctx *QueueSchedulingContext) TotalCostForQueueWithAllocation(allocated schedulerobjects.ResourceList) float64 { + switch qctx.SchedulingContext.FairnessModel { + case configuration.AssetFairness: + return qctx.assetFairnessCostWithAllocation(allocated) + case configuration.DominantResourceFairness: + return qctx.dominantResourceFairnessCostWithAllocation(allocated) + default: + panic(fmt.Sprintf("unknown fairness type: %s", qctx.SchedulingContext.FairnessModel)) + } +} + +func (qctx *QueueSchedulingContext) assetFairnessCostWithAllocation(allocated schedulerobjects.ResourceList) float64 { + if len(qctx.SchedulingContext.ResourceScarcity) == 0 { + panic("ResourceScarcity is not set") + } + return float64(allocated.AsWeightedMillis(qctx.SchedulingContext.ResourceScarcity)) / qctx.Weight +} + +func (qctx *QueueSchedulingContext) dominantResourceFairnessCostWithAllocation(allocated schedulerobjects.ResourceList) float64 { + if len(qctx.SchedulingContext.DominantResourceFairnessResourcesToConsider) == 0 { + panic("DominantResourceFairnessResourcesToConsider is not set") + } + var cost float64 + for _, t := range qctx.SchedulingContext.DominantResourceFairnessResourcesToConsider { + capacity := qctx.SchedulingContext.TotalResources.Get(t) + if capacity.Equal(resource.Quantity{}) { + // Ignore any resources with zero capacity. + continue + } + q := allocated.Get(t) + tcost := float64(q.MilliValue()) / float64(capacity.MilliValue()) + if tcost > cost { + cost = tcost + } + } + return cost / qctx.Weight } type GangSchedulingContext struct { diff --git a/internal/scheduler/preempting_queue_scheduler.go b/internal/scheduler/preempting_queue_scheduler.go index a4a08546ed3..b6fd2a696a1 100644 --- a/internal/scheduler/preempting_queue_scheduler.go +++ b/internal/scheduler/preempting_queue_scheduler.go @@ -135,6 +135,7 @@ func (sch *PreemptingQueueScheduler) Schedule(ctx context.Context) (*SchedulerRe snapshot := sch.nodeDb.Txn(false) // Evict preemptible jobs. + totalCost := sch.schedulingContext.TotalCost() evictorResult, inMemoryJobRepo, err := sch.evict( ctxlogrus.ToContext( ctx, @@ -156,7 +157,10 @@ func (sch *PreemptingQueueScheduler) Schedule(ctx context.Context) (*SchedulerRe return false } if qctx, ok := sch.schedulingContext.QueueSchedulingContexts[job.GetQueue()]; ok { - if qctx.FractionOfFairShare() <= sch.protectedFractionOfFairShare { + fairShare := qctx.Weight / sch.schedulingContext.WeightSum + actualShare := qctx.TotalCostForQueue() / totalCost + fractionOfFairShare := actualShare / fairShare + if fractionOfFairShare <= sch.protectedFractionOfFairShare { return false } } diff --git a/internal/scheduler/queue_scheduler.go b/internal/scheduler/queue_scheduler.go index 3e512aaf8af..067fc294d75 100644 --- a/internal/scheduler/queue_scheduler.go +++ b/internal/scheduler/queue_scheduler.go @@ -348,7 +348,7 @@ func (it *CandidateGangIterator) fractionOfFairShareWithGctx(gctx *schedulercont it.buffer.Zero() it.buffer.Add(qctx.Allocated) it.buffer.Add(gctx.TotalResourceRequests) - return qctx.FractionOfFairShareWithAllocation(it.buffer) + return qctx.TotalCostForQueueWithAllocation(it.buffer) } // Clear removes the first item in the iterator. diff --git a/internal/scheduler/scheduling_algo.go b/internal/scheduler/scheduling_algo.go index ba70c3bbf20..e3e0b45d91f 100644 --- a/internal/scheduler/scheduling_algo.go +++ b/internal/scheduler/scheduling_algo.go @@ -176,6 +176,7 @@ func (it *JobQueueIteratorAdapter) Next() (interfaces.LegacySchedulerJob, error) type fairSchedulingAlgoContext struct { priorityFactorByQueue map[string]float64 + isActiveByQueueName map[string]bool totalCapacity schedulerobjects.ResourceList jobsByExecutorId map[string][]*jobdb.Job nodeIdByJobId map[string]string @@ -242,11 +243,13 @@ func (l *FairSchedulingAlgo) newFairSchedulingAlgoContext(ctx context.Context, t } // Create a map of jobs associated with each executor. + isActiveByQueueName := make(map[string]bool, len(queues)) jobsByExecutorId := make(map[string][]*jobdb.Job) nodeIdByJobId := make(map[string]string) jobIdsByGangId := make(map[string]map[string]bool) gangIdByJobId := make(map[string]string) for _, job := range jobDb.GetAll(txn) { + isActiveByQueueName[job.Queue()] = true if job.Queued() { continue } @@ -288,6 +291,7 @@ func (l *FairSchedulingAlgo) newFairSchedulingAlgoContext(ctx context.Context, t return &fairSchedulingAlgoContext{ priorityFactorByQueue: priorityFactorByQueue, + isActiveByQueueName: isActiveByQueueName, totalCapacity: totalCapacity, jobsByExecutorId: jobsByExecutorId, nodeIdByJobId: nodeIdByJobId, @@ -322,7 +326,14 @@ func (l *FairSchedulingAlgo) scheduleOnExecutor( l.config.ResourceScarcity, accounting.totalCapacity, ) + if l.config.FairnessModel == configuration.DominantResourceFairness { + sctx.EnableDominantResourceFairness(l.config.DominantResourceFairnessResourcesToConsider) + } for queue, priorityFactor := range accounting.priorityFactorByQueue { + if !accounting.isActiveByQueueName[queue] { + // To ensure fair share is computed only from active queues, i.e., queues with jobs queued or running. + continue + } var allocatedByPriorityClass schedulerobjects.QuantityByTAndResourceType[string] if allocatedByQueueAndPriorityClass := accounting.allocationByPoolAndQueueAndPriorityClass[executor.Pool]; allocatedByQueueAndPriorityClass != nil { allocatedByPriorityClass = allocatedByQueueAndPriorityClass[queue]