Skip to content

Commit

Permalink
Add dominant resource fairness (#2614)
Browse files Browse the repository at this point in the history
* Move fair share comp. into context

* Avoid evicting queues below their fair share

* Add tests

* Comment

* Comment

* Initial commit

* Cleanup

* Add missing config

* Comments

* Cleanup

* Consider queues with queued/running jobs for fairness
  • Loading branch information
severinson authored Jun 28, 2023
1 parent dac1cf3 commit 6cc7e56
Show file tree
Hide file tree
Showing 8 changed files with 140 additions and 34 deletions.
20 changes: 10 additions & 10 deletions config/armada/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,13 @@ eventsApiRedis:
poolSize: 1000
scheduling:
enableAssertions: true
fairnessModel: "AssetFairness"
dominantResourceFairnessResourcesToConsider:
- "cpu"
- "memory"
- "nvidia.com/gpu"
resourceScarcity:
cpu: 1.0
preemption:
nodeEvictionProbability: 1.0
nodeOversubscriptionEvictionProbability: 1.0
Expand All @@ -43,8 +50,8 @@ scheduling:
priority: 1000
preemptible: false
maximumResourceFractionPerQueue:
memory: 0.99
cpu: 0.99
memory: 1.0
cpu: 1.0
armada-preemptible:
priority: 1000
preemptible: true
Expand All @@ -54,7 +61,7 @@ scheduling:
maxExtraNodesToConsider: 1
maximumResourceFractionToSchedule:
memory: 1.0
cpu: 1.0
cpu: 1.0
maxJobSchedulingContextsPerExecutor: 10000
lease:
expireAfter: 15m
Expand All @@ -69,11 +76,6 @@ scheduling:
value: "true"
effect: "NoSchedule"
defaultJobTolerationsByPriorityClass:
"":
- key: "armadaproject.io/pc-armada-default"
operator: "Equal"
value: "true"
effect: "NoSchedule"
armada-default:
- key: "armadaproject.io/pc-armada-default"
operator: "Equal"
Expand All @@ -85,8 +87,6 @@ scheduling:
value: "true"
effect: "NoSchedule"
maxRetries: 5
resourceScarcity:
cpu: 1.0
maxPodSpecSizeBytes: 65535
minJobResources:
memory: 1Mi
Expand Down
18 changes: 9 additions & 9 deletions config/scheduler/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,13 @@ grpc:
scheduling:
executorTimeout: 10m
enableAssertions: true
fairnessModel: "AssetFairness"
dominantResourceFairnessResourcesToConsider:
- "cpu"
- "memory"
- "nvidia.com/gpu"
resourceScarcity:
cpu: 1.0
preemption:
alwaysAttemptScheduling: false
enabled: true
Expand All @@ -60,8 +67,8 @@ scheduling:
priority: 1000
preemptible: false
maximumResourceFractionPerQueue:
memory: 0.99
cpu: 0.99
memory: 1.0
cpu: 1.0
armada-preemptible:
priority: 1000
preemptible: true
Expand All @@ -85,11 +92,6 @@ scheduling:
value: "true"
effect: "NoSchedule"
defaultJobTolerationsByPriorityClass:
"":
- key: "armadaproject.io/pc-armada-default"
operator: "Equal"
value: "true"
effect: "NoSchedule"
armada-default:
- key: "armadaproject.io/pc-armada-default"
operator: "Equal"
Expand All @@ -101,8 +103,6 @@ scheduling:
value: "true"
effect: "NoSchedule"
maxRetries: 5
resourceScarcity:
cpu: 1.0
indexedResources:
- name: "cpu"
resolution: "100m"
Expand Down
20 changes: 19 additions & 1 deletion internal/armada/configuration/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,11 @@ type SchedulingConfig struct {
DefaultJobTolerationsByResourceRequest map[string][]v1.Toleration
// Maximum number of times a job is retried before considered failed.
MaxRetries uint
// Weights used when computing fair share.
// Controls how fairness is calculated. Can be either AssetFairness or DominantResourceFairness.
FairnessModel FairnessModel
// List of resource names, e.g., []string{"cpu", "memory"}, to consider when computing DominantResourceFairness.
DominantResourceFairnessResourcesToConsider []string
// Weights used to compute fair share when using AssetFairness.
// Overrides dynamic scarcity calculation if provided.
// Applies to both the new and old scheduler.
ResourceScarcity map[string]float64
Expand Down Expand Up @@ -187,6 +191,20 @@ type SchedulingConfig struct {
AlwaysAttemptScheduling bool
}

// FairnessModel controls how fairness is computed.
// More specifically, each queue has a cost associated with it and the next job to schedule
// is taken from the queue with smallest cost. FairnessModel determines how that cost is computed.
type FairnessModel string

const (
// AssetFairness sets the cost associated with a queue to a linear combination of its total allocation.
// E.g., w_CPU * "CPU allocation" + w_memory * "memory allocation".
AssetFairness FairnessModel = "AssetFairness"
// DominantResourceFairness set the cost associated with a queue to
// max("CPU allocation" / "CPU capacity", "memory allocation" / "mamory capacity", ...).
DominantResourceFairness FairnessModel = "DominantResourceFairness"
)

type IndexedResource struct {
// Resource name. E.g., "cpu", "memory", or "nvidia.com/gpu".
Name string
Expand Down
22 changes: 22 additions & 0 deletions internal/armada/server/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,16 @@ func (q *AggregatedQueueServer) getJobs(ctx context.Context, req *api.StreamingL
apiQueues[i] = &api.Queue{Name: queue.Name}
}

// Record which queues are active, i.e., have jobs either queued or running.
queuesWithJobsQueued, err := q.jobRepository.FilterActiveQueues(apiQueues)
if err != nil {
return nil, err
}
isActiveByQueueName := make(map[string]bool, len(queuesWithJobsQueued))
for _, queue := range queuesWithJobsQueued {
isActiveByQueueName[queue.Name] = true
}

// Nodes to be considered by the scheduler.
lastSeen := q.clock.Now()
nodes := make([]*schedulerobjects.Node, 0, len(req.Nodes))
Expand Down Expand Up @@ -393,6 +403,11 @@ func (q *AggregatedQueueServer) getJobs(ctx context.Context, req *api.StreamingL
nodeIdByJobId[job.Id] = node.Id
}
nodes = append(nodes, node)

// Record which queues have jobs running. Necessary to omit inactive queues.
for _, job := range jobs {
isActiveByQueueName[job.Queue] = true
}
}
nodeDb, err := nodedb.NewNodeDb(
q.schedulingConfig.Preemption.PriorityClasses,
Expand Down Expand Up @@ -466,7 +481,14 @@ func (q *AggregatedQueueServer) getJobs(ctx context.Context, req *api.StreamingL
q.schedulingConfig.ResourceScarcity,
schedulerobjects.ResourceList{Resources: totalCapacity},
)
if q.schedulingConfig.FairnessModel == configuration.DominantResourceFairness {
sctx.EnableDominantResourceFairness(q.schedulingConfig.DominantResourceFairnessResourcesToConsider)
}
for queue, priorityFactor := range priorityFactorByQueue {
if !isActiveByQueueName[queue] {
// To ensure fair share is computed only from active queues, i.e., queues with jobs queued or running.
continue
}
var weight float64 = 1
if priorityFactor > 0 {
weight = 1 / priorityFactor
Expand Down
75 changes: 63 additions & 12 deletions internal/scheduler/context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/pkg/errors"
"golang.org/x/exp/maps"
"golang.org/x/exp/slices"
"k8s.io/apimachinery/pkg/api/resource"

"github.com/armadaproject/armada/internal/armada/configuration"
"github.com/armadaproject/armada/internal/common/armadaerrors"
Expand All @@ -34,12 +35,16 @@ type SchedulingContext struct {
PriorityClasses map[string]configuration.PriorityClass
// Default priority class.
DefaultPriorityClass string
// Weights used when computing total resource usage.
// Determines how fairness is computed.
FairnessModel configuration.FairnessModel
// Resources considered when computing DominantResourceFairness.
DominantResourceFairnessResourcesToConsider []string
// Weights used when computing AssetFairness.
ResourceScarcity map[string]float64
// Sum of queue weights across all queues.
WeightSum float64
// Per-queue scheduling contexts.
QueueSchedulingContexts map[string]*QueueSchedulingContext
// Sum of weights across all queues.
WeightSum float64
// Total resources across all clusters available at the start of the scheduling cycle.
TotalResources schedulerobjects.ResourceList
// = TotalResources.AsWeightedMillis(ResourceScarcity).
Expand Down Expand Up @@ -81,6 +86,7 @@ func NewSchedulingContext(
Pool: pool,
PriorityClasses: priorityClasses,
DefaultPriorityClass: defaultPriorityClass,
FairnessModel: configuration.AssetFairness,
ResourceScarcity: resourceScarcity,
QueueSchedulingContexts: make(map[string]*QueueSchedulingContext),
TotalResources: totalResources.DeepCopy(),
Expand All @@ -93,6 +99,11 @@ func NewSchedulingContext(
}
}

func (sctx *SchedulingContext) EnableDominantResourceFairness(dominantResourceFairnessResourcesToConsider []string) {
sctx.FairnessModel = configuration.DominantResourceFairness
sctx.DominantResourceFairnessResourcesToConsider = dominantResourceFairnessResourcesToConsider
}

func (sctx *SchedulingContext) SchedulingKeyFromLegacySchedulerJob(job interfaces.LegacySchedulerJob) schedulerobjects.SchedulingKey {
var priority int32
if priorityClass, ok := sctx.PriorityClasses[job.GetPriorityClassName()]; ok {
Expand Down Expand Up @@ -151,6 +162,15 @@ func (sctx *SchedulingContext) String() string {
return sctx.ReportString(0)
}

// TotalCost returns the sum of the costs across all queues.
func (sctx *SchedulingContext) TotalCost() float64 {
var rv float64
for _, qctx := range sctx.QueueSchedulingContexts {
rv += qctx.TotalCostForQueue()
}
return rv
}

func (sctx *SchedulingContext) ReportString(verbosity int32) string {
var sb strings.Builder
w := tabwriter.NewWriter(&sb, 1, 1, 1, ' ', 0)
Expand Down Expand Up @@ -496,17 +516,48 @@ func (qctx *QueueSchedulingContext) ClearJobSpecs() {
}
}

// FractionOfFairShare returns a number in [0, 1] indicating what fraction of its fair share this queue is allocated.
func (qctx *QueueSchedulingContext) FractionOfFairShare() float64 {
return qctx.FractionOfFairShareWithAllocation(qctx.Allocated)
// TotalCostForQueue returns the total cost of this queue.
func (qctx *QueueSchedulingContext) TotalCostForQueue() float64 {
return qctx.TotalCostForQueueWithAllocation(qctx.Allocated)
}

// FractionOfFairShareWithAllocation returns a number in [0, 1] indicating what
// fraction of its fair share this queue is allocated if the total allocation of this queue is given by allocated.
func (qctx *QueueSchedulingContext) FractionOfFairShareWithAllocation(allocated schedulerobjects.ResourceList) float64 {
fairShare := qctx.Weight / qctx.SchedulingContext.WeightSum
allocatedAsWeightedMillis := allocated.AsWeightedMillis(qctx.SchedulingContext.ResourceScarcity)
return (float64(allocatedAsWeightedMillis) / float64(qctx.SchedulingContext.TotalResourcesAsWeightedMillis)) / fairShare
// TotalCostForQueueWithAllocation returns the total cost of this queue if its total allocation is given by allocated.
func (qctx *QueueSchedulingContext) TotalCostForQueueWithAllocation(allocated schedulerobjects.ResourceList) float64 {
switch qctx.SchedulingContext.FairnessModel {
case configuration.AssetFairness:
return qctx.assetFairnessCostWithAllocation(allocated)
case configuration.DominantResourceFairness:
return qctx.dominantResourceFairnessCostWithAllocation(allocated)
default:
panic(fmt.Sprintf("unknown fairness type: %s", qctx.SchedulingContext.FairnessModel))
}
}

func (qctx *QueueSchedulingContext) assetFairnessCostWithAllocation(allocated schedulerobjects.ResourceList) float64 {
if len(qctx.SchedulingContext.ResourceScarcity) == 0 {
panic("ResourceScarcity is not set")
}
return float64(allocated.AsWeightedMillis(qctx.SchedulingContext.ResourceScarcity)) / qctx.Weight
}

func (qctx *QueueSchedulingContext) dominantResourceFairnessCostWithAllocation(allocated schedulerobjects.ResourceList) float64 {
if len(qctx.SchedulingContext.DominantResourceFairnessResourcesToConsider) == 0 {
panic("DominantResourceFairnessResourcesToConsider is not set")
}
var cost float64
for _, t := range qctx.SchedulingContext.DominantResourceFairnessResourcesToConsider {
capacity := qctx.SchedulingContext.TotalResources.Get(t)
if capacity.Equal(resource.Quantity{}) {
// Ignore any resources with zero capacity.
continue
}
q := allocated.Get(t)
tcost := float64(q.MilliValue()) / float64(capacity.MilliValue())
if tcost > cost {
cost = tcost
}
}
return cost / qctx.Weight
}

type GangSchedulingContext struct {
Expand Down
6 changes: 5 additions & 1 deletion internal/scheduler/preempting_queue_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ func (sch *PreemptingQueueScheduler) Schedule(ctx context.Context) (*SchedulerRe
snapshot := sch.nodeDb.Txn(false)

// Evict preemptible jobs.
totalCost := sch.schedulingContext.TotalCost()
evictorResult, inMemoryJobRepo, err := sch.evict(
ctxlogrus.ToContext(
ctx,
Expand All @@ -156,7 +157,10 @@ func (sch *PreemptingQueueScheduler) Schedule(ctx context.Context) (*SchedulerRe
return false
}
if qctx, ok := sch.schedulingContext.QueueSchedulingContexts[job.GetQueue()]; ok {
if qctx.FractionOfFairShare() <= sch.protectedFractionOfFairShare {
fairShare := qctx.Weight / sch.schedulingContext.WeightSum
actualShare := qctx.TotalCostForQueue() / totalCost
fractionOfFairShare := actualShare / fairShare
if fractionOfFairShare <= sch.protectedFractionOfFairShare {
return false
}
}
Expand Down
2 changes: 1 addition & 1 deletion internal/scheduler/queue_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ func (it *CandidateGangIterator) fractionOfFairShareWithGctx(gctx *schedulercont
it.buffer.Zero()
it.buffer.Add(qctx.Allocated)
it.buffer.Add(gctx.TotalResourceRequests)
return qctx.FractionOfFairShareWithAllocation(it.buffer)
return qctx.TotalCostForQueueWithAllocation(it.buffer)
}

// Clear removes the first item in the iterator.
Expand Down
11 changes: 11 additions & 0 deletions internal/scheduler/scheduling_algo.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ func (it *JobQueueIteratorAdapter) Next() (interfaces.LegacySchedulerJob, error)

type fairSchedulingAlgoContext struct {
priorityFactorByQueue map[string]float64
isActiveByQueueName map[string]bool
totalCapacity schedulerobjects.ResourceList
jobsByExecutorId map[string][]*jobdb.Job
nodeIdByJobId map[string]string
Expand Down Expand Up @@ -242,11 +243,13 @@ func (l *FairSchedulingAlgo) newFairSchedulingAlgoContext(ctx context.Context, t
}

// Create a map of jobs associated with each executor.
isActiveByQueueName := make(map[string]bool, len(queues))
jobsByExecutorId := make(map[string][]*jobdb.Job)
nodeIdByJobId := make(map[string]string)
jobIdsByGangId := make(map[string]map[string]bool)
gangIdByJobId := make(map[string]string)
for _, job := range jobDb.GetAll(txn) {
isActiveByQueueName[job.Queue()] = true
if job.Queued() {
continue
}
Expand Down Expand Up @@ -288,6 +291,7 @@ func (l *FairSchedulingAlgo) newFairSchedulingAlgoContext(ctx context.Context, t

return &fairSchedulingAlgoContext{
priorityFactorByQueue: priorityFactorByQueue,
isActiveByQueueName: isActiveByQueueName,
totalCapacity: totalCapacity,
jobsByExecutorId: jobsByExecutorId,
nodeIdByJobId: nodeIdByJobId,
Expand Down Expand Up @@ -322,7 +326,14 @@ func (l *FairSchedulingAlgo) scheduleOnExecutor(
l.config.ResourceScarcity,
accounting.totalCapacity,
)
if l.config.FairnessModel == configuration.DominantResourceFairness {
sctx.EnableDominantResourceFairness(l.config.DominantResourceFairnessResourcesToConsider)
}
for queue, priorityFactor := range accounting.priorityFactorByQueue {
if !accounting.isActiveByQueueName[queue] {
// To ensure fair share is computed only from active queues, i.e., queues with jobs queued or running.
continue
}
var allocatedByPriorityClass schedulerobjects.QuantityByTAndResourceType[string]
if allocatedByQueueAndPriorityClass := accounting.allocationByPoolAndQueueAndPriorityClass[executor.Pool]; allocatedByQueueAndPriorityClass != nil {
allocatedByPriorityClass = allocatedByQueueAndPriorityClass[queue]
Expand Down

0 comments on commit 6cc7e56

Please sign in to comment.