Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add dominant resource fairness #2614

Merged
merged 12 commits into from
Jun 28, 2023
20 changes: 10 additions & 10 deletions config/armada/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,13 @@ eventsApiRedis:
poolSize: 1000
scheduling:
enableAssertions: true
fairnessModel: "AssetFairness"
dominantResourceFairnessResourcesToConsider:
- "cpu"
- "memory"
- "nvidia.com/gpu"
resourceScarcity:
cpu: 1.0
preemption:
nodeEvictionProbability: 1.0
nodeOversubscriptionEvictionProbability: 1.0
Expand All @@ -43,8 +50,8 @@ scheduling:
priority: 1000
preemptible: false
maximumResourceFractionPerQueue:
memory: 0.99
cpu: 0.99
memory: 1.0
cpu: 1.0
armada-preemptible:
priority: 1000
preemptible: true
Expand All @@ -54,7 +61,7 @@ scheduling:
maxExtraNodesToConsider: 1
maximumResourceFractionToSchedule:
memory: 1.0
cpu: 1.0
cpu: 1.0
maxJobSchedulingContextsPerExecutor: 10000
lease:
expireAfter: 15m
Expand All @@ -69,11 +76,6 @@ scheduling:
value: "true"
effect: "NoSchedule"
defaultJobTolerationsByPriorityClass:
"":
- key: "armadaproject.io/pc-armada-default"
operator: "Equal"
value: "true"
effect: "NoSchedule"
armada-default:
- key: "armadaproject.io/pc-armada-default"
operator: "Equal"
Expand All @@ -85,8 +87,6 @@ scheduling:
value: "true"
effect: "NoSchedule"
maxRetries: 5
resourceScarcity:
cpu: 1.0
maxPodSpecSizeBytes: 65535
minJobResources:
memory: 1Mi
Expand Down
18 changes: 9 additions & 9 deletions config/scheduler/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,13 @@ grpc:
scheduling:
executorTimeout: 10m
enableAssertions: true
fairnessModel: "AssetFairness"
dominantResourceFairnessResourcesToConsider:
- "cpu"
- "memory"
- "nvidia.com/gpu"
resourceScarcity:
cpu: 1.0
preemption:
alwaysAttemptScheduling: false
enabled: true
Expand All @@ -60,8 +67,8 @@ scheduling:
priority: 1000
preemptible: false
maximumResourceFractionPerQueue:
memory: 0.99
cpu: 0.99
memory: 1.0
cpu: 1.0
armada-preemptible:
priority: 1000
preemptible: true
Expand All @@ -85,11 +92,6 @@ scheduling:
value: "true"
effect: "NoSchedule"
defaultJobTolerationsByPriorityClass:
"":
- key: "armadaproject.io/pc-armada-default"
operator: "Equal"
value: "true"
effect: "NoSchedule"
armada-default:
- key: "armadaproject.io/pc-armada-default"
operator: "Equal"
Expand All @@ -101,8 +103,6 @@ scheduling:
value: "true"
effect: "NoSchedule"
maxRetries: 5
resourceScarcity:
cpu: 1.0
indexedResources:
- name: "cpu"
resolution: "100m"
Expand Down
20 changes: 19 additions & 1 deletion internal/armada/configuration/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,11 @@ type SchedulingConfig struct {
DefaultJobTolerationsByResourceRequest map[string][]v1.Toleration
// Maximum number of times a job is retried before considered failed.
MaxRetries uint
// Weights used when computing fair share.
// Controls how fairness is calculated. Can be either AssetFairness or DominantResourceFairness.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This list of possible values is bound to become out of date.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once DRF is used everywhere, I want to remove asset fairness and leave DRF as the only option.

FairnessModel FairnessModel
// List of resource names, e.g., []string{"cpu", "memory"}, to consider when computing DominantResourceFairness.
DominantResourceFairnessResourcesToConsider []string
// Weights used to compute fair share when using AssetFairness.
// Overrides dynamic scarcity calculation if provided.
// Applies to both the new and old scheduler.
ResourceScarcity map[string]float64
Expand Down Expand Up @@ -187,6 +191,20 @@ type SchedulingConfig struct {
AlwaysAttemptScheduling bool
}

// FairnessModel controls how fairness is computed.
// More specifically, each queue has a cost associated with it and the next job to schedule
// is taken from the queue with smallest cost. FairnessModel determines how that cost is computed.
type FairnessModel string

const (
// AssetFairness sets the cost associated with a queue to a linear combination of its total allocation.
// E.g., w_CPU * "CPU allocation" + w_memory * "memory allocation".
AssetFairness FairnessModel = "AssetFairness"
// DominantResourceFairness set the cost associated with a queue to
// max("CPU allocation" / "CPU capacity", "memory allocation" / "mamory capacity", ...).
DominantResourceFairness FairnessModel = "DominantResourceFairness"
)

type IndexedResource struct {
// Resource name. E.g., "cpu", "memory", or "nvidia.com/gpu".
Name string
Expand Down
22 changes: 22 additions & 0 deletions internal/armada/server/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,16 @@ func (q *AggregatedQueueServer) getJobs(ctx context.Context, req *api.StreamingL
apiQueues[i] = &api.Queue{Name: queue.Name}
}

// Record which queues are active, i.e., have jobs either queued or running.
queuesWithJobsQueued, err := q.jobRepository.FilterActiveQueues(apiQueues)
if err != nil {
return nil, err
}
isActiveByQueueName := make(map[string]bool, len(queuesWithJobsQueued))
for _, queue := range queuesWithJobsQueued {
isActiveByQueueName[queue.Name] = true
}

// Nodes to be considered by the scheduler.
lastSeen := q.clock.Now()
nodes := make([]*schedulerobjects.Node, 0, len(req.Nodes))
Expand Down Expand Up @@ -393,6 +403,11 @@ func (q *AggregatedQueueServer) getJobs(ctx context.Context, req *api.StreamingL
nodeIdByJobId[job.Id] = node.Id
}
nodes = append(nodes, node)

// Record which queues have jobs running. Necessary to omit inactive queues.
for _, job := range jobs {
isActiveByQueueName[job.Queue] = true
}
}
nodeDb, err := nodedb.NewNodeDb(
q.schedulingConfig.Preemption.PriorityClasses,
Expand Down Expand Up @@ -466,7 +481,14 @@ func (q *AggregatedQueueServer) getJobs(ctx context.Context, req *api.StreamingL
q.schedulingConfig.ResourceScarcity,
schedulerobjects.ResourceList{Resources: totalCapacity},
)
if q.schedulingConfig.FairnessModel == configuration.DominantResourceFairness {
sctx.EnableDominantResourceFairness(q.schedulingConfig.DominantResourceFairnessResourcesToConsider)
}
for queue, priorityFactor := range priorityFactorByQueue {
if !isActiveByQueueName[queue] {
// To ensure fair share is computed only from active queues, i.e., queues with jobs queued or running.
continue
}
var weight float64 = 1
if priorityFactor > 0 {
weight = 1 / priorityFactor
Expand Down
75 changes: 63 additions & 12 deletions internal/scheduler/context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/pkg/errors"
"golang.org/x/exp/maps"
"golang.org/x/exp/slices"
"k8s.io/apimachinery/pkg/api/resource"

"github.com/armadaproject/armada/internal/armada/configuration"
"github.com/armadaproject/armada/internal/common/armadaerrors"
Expand All @@ -34,12 +35,16 @@ type SchedulingContext struct {
PriorityClasses map[string]configuration.PriorityClass
// Default priority class.
DefaultPriorityClass string
// Weights used when computing total resource usage.
// Determines how fairness is computed.
FairnessModel configuration.FairnessModel
// Resources considered when computing DominantResourceFairness.
DominantResourceFairnessResourcesToConsider []string
// Weights used when computing AssetFairness.
ResourceScarcity map[string]float64
// Sum of queue weights across all queues.
WeightSum float64
// Per-queue scheduling contexts.
QueueSchedulingContexts map[string]*QueueSchedulingContext
// Sum of weights across all queues.
WeightSum float64
// Total resources across all clusters available at the start of the scheduling cycle.
TotalResources schedulerobjects.ResourceList
// = TotalResources.AsWeightedMillis(ResourceScarcity).
Expand Down Expand Up @@ -81,6 +86,7 @@ func NewSchedulingContext(
Pool: pool,
PriorityClasses: priorityClasses,
DefaultPriorityClass: defaultPriorityClass,
FairnessModel: configuration.AssetFairness,
ResourceScarcity: resourceScarcity,
QueueSchedulingContexts: make(map[string]*QueueSchedulingContext),
TotalResources: totalResources.DeepCopy(),
Expand All @@ -93,6 +99,11 @@ func NewSchedulingContext(
}
}

func (sctx *SchedulingContext) EnableDominantResourceFairness(dominantResourceFairnessResourcesToConsider []string) {
sctx.FairnessModel = configuration.DominantResourceFairness
sctx.DominantResourceFairnessResourcesToConsider = dominantResourceFairnessResourcesToConsider
}

func (sctx *SchedulingContext) SchedulingKeyFromLegacySchedulerJob(job interfaces.LegacySchedulerJob) schedulerobjects.SchedulingKey {
var priority int32
if priorityClass, ok := sctx.PriorityClasses[job.GetPriorityClassName()]; ok {
Expand Down Expand Up @@ -151,6 +162,15 @@ func (sctx *SchedulingContext) String() string {
return sctx.ReportString(0)
}

// TotalCost returns the sum of the costs across all queues.
func (sctx *SchedulingContext) TotalCost() float64 {
var rv float64
for _, qctx := range sctx.QueueSchedulingContexts {
rv += qctx.TotalCostForQueue()
}
return rv
}

func (sctx *SchedulingContext) ReportString(verbosity int32) string {
var sb strings.Builder
w := tabwriter.NewWriter(&sb, 1, 1, 1, ' ', 0)
Expand Down Expand Up @@ -496,17 +516,48 @@ func (qctx *QueueSchedulingContext) ClearJobSpecs() {
}
}

// FractionOfFairShare returns a number in [0, 1] indicating what fraction of its fair share this queue is allocated.
func (qctx *QueueSchedulingContext) FractionOfFairShare() float64 {
return qctx.FractionOfFairShareWithAllocation(qctx.Allocated)
// TotalCostForQueue returns the total cost of this queue.
func (qctx *QueueSchedulingContext) TotalCostForQueue() float64 {
return qctx.TotalCostForQueueWithAllocation(qctx.Allocated)
}

// FractionOfFairShareWithAllocation returns a number in [0, 1] indicating what
// fraction of its fair share this queue is allocated if the total allocation of this queue is given by allocated.
func (qctx *QueueSchedulingContext) FractionOfFairShareWithAllocation(allocated schedulerobjects.ResourceList) float64 {
fairShare := qctx.Weight / qctx.SchedulingContext.WeightSum
allocatedAsWeightedMillis := allocated.AsWeightedMillis(qctx.SchedulingContext.ResourceScarcity)
return (float64(allocatedAsWeightedMillis) / float64(qctx.SchedulingContext.TotalResourcesAsWeightedMillis)) / fairShare
// TotalCostForQueueWithAllocation returns the total cost of this queue if its total allocation is given by allocated.
func (qctx *QueueSchedulingContext) TotalCostForQueueWithAllocation(allocated schedulerobjects.ResourceList) float64 {
switch qctx.SchedulingContext.FairnessModel {
case configuration.AssetFairness:
return qctx.assetFairnessCostWithAllocation(allocated)
case configuration.DominantResourceFairness:
return qctx.dominantResourceFairnessCostWithAllocation(allocated)
default:
panic(fmt.Sprintf("unknown fairness type: %s", qctx.SchedulingContext.FairnessModel))
}
}

func (qctx *QueueSchedulingContext) assetFairnessCostWithAllocation(allocated schedulerobjects.ResourceList) float64 {
if len(qctx.SchedulingContext.ResourceScarcity) == 0 {
panic("ResourceScarcity is not set")
}
return float64(allocated.AsWeightedMillis(qctx.SchedulingContext.ResourceScarcity)) / qctx.Weight
}

func (qctx *QueueSchedulingContext) dominantResourceFairnessCostWithAllocation(allocated schedulerobjects.ResourceList) float64 {
if len(qctx.SchedulingContext.DominantResourceFairnessResourcesToConsider) == 0 {
panic("DominantResourceFairnessResourcesToConsider is not set")
}
var cost float64
for _, t := range qctx.SchedulingContext.DominantResourceFairnessResourcesToConsider {
capacity := qctx.SchedulingContext.TotalResources.Get(t)
if capacity.Equal(resource.Quantity{}) {
// Ignore any resources with zero capacity.
continue
}
q := allocated.Get(t)
tcost := float64(q.MilliValue()) / float64(capacity.MilliValue())
if tcost > cost {
cost = tcost
}
}
return cost / qctx.Weight
}

type GangSchedulingContext struct {
Expand Down
6 changes: 5 additions & 1 deletion internal/scheduler/preempting_queue_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ func (sch *PreemptingQueueScheduler) Schedule(ctx context.Context) (*SchedulerRe
snapshot := sch.nodeDb.Txn(false)

// Evict preemptible jobs.
totalCost := sch.schedulingContext.TotalCost()
evictorResult, inMemoryJobRepo, err := sch.evict(
ctxlogrus.ToContext(
ctx,
Expand All @@ -156,7 +157,10 @@ func (sch *PreemptingQueueScheduler) Schedule(ctx context.Context) (*SchedulerRe
return false
}
if qctx, ok := sch.schedulingContext.QueueSchedulingContexts[job.GetQueue()]; ok {
if qctx.FractionOfFairShare() <= sch.protectedFractionOfFairShare {
fairShare := qctx.Weight / sch.schedulingContext.WeightSum
actualShare := qctx.TotalCostForQueue() / totalCost
fractionOfFairShare := actualShare / fairShare
if fractionOfFairShare <= sch.protectedFractionOfFairShare {
return false
}
}
Expand Down
2 changes: 1 addition & 1 deletion internal/scheduler/queue_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ func (it *CandidateGangIterator) fractionOfFairShareWithGctx(gctx *schedulercont
it.buffer.Zero()
it.buffer.Add(qctx.Allocated)
it.buffer.Add(gctx.TotalResourceRequests)
return qctx.FractionOfFairShareWithAllocation(it.buffer)
return qctx.TotalCostForQueueWithAllocation(it.buffer)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name of this method is now out of date, but this is obviously not a blocker.

}

// Clear removes the first item in the iterator.
Expand Down
11 changes: 11 additions & 0 deletions internal/scheduler/scheduling_algo.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ func (it *JobQueueIteratorAdapter) Next() (interfaces.LegacySchedulerJob, error)

type fairSchedulingAlgoContext struct {
priorityFactorByQueue map[string]float64
isActiveByQueueName map[string]bool
totalCapacity schedulerobjects.ResourceList
jobsByExecutorId map[string][]*jobdb.Job
nodeIdByJobId map[string]string
Expand Down Expand Up @@ -242,11 +243,13 @@ func (l *FairSchedulingAlgo) newFairSchedulingAlgoContext(ctx context.Context, t
}

// Create a map of jobs associated with each executor.
isActiveByQueueName := make(map[string]bool, len(queues))
jobsByExecutorId := make(map[string][]*jobdb.Job)
nodeIdByJobId := make(map[string]string)
jobIdsByGangId := make(map[string]map[string]bool)
gangIdByJobId := make(map[string]string)
for _, job := range jobDb.GetAll(txn) {
isActiveByQueueName[job.Queue()] = true
if job.Queued() {
continue
}
Expand Down Expand Up @@ -288,6 +291,7 @@ func (l *FairSchedulingAlgo) newFairSchedulingAlgoContext(ctx context.Context, t

return &fairSchedulingAlgoContext{
priorityFactorByQueue: priorityFactorByQueue,
isActiveByQueueName: isActiveByQueueName,
totalCapacity: totalCapacity,
jobsByExecutorId: jobsByExecutorId,
nodeIdByJobId: nodeIdByJobId,
Expand Down Expand Up @@ -322,7 +326,14 @@ func (l *FairSchedulingAlgo) scheduleOnExecutor(
l.config.ResourceScarcity,
accounting.totalCapacity,
)
if l.config.FairnessModel == configuration.DominantResourceFairness {
sctx.EnableDominantResourceFairness(l.config.DominantResourceFairnessResourcesToConsider)
}
for queue, priorityFactor := range accounting.priorityFactorByQueue {
if !accounting.isActiveByQueueName[queue] {
// To ensure fair share is computed only from active queues, i.e., queues with jobs queued or running.
continue
}
var allocatedByPriorityClass schedulerobjects.QuantityByTAndResourceType[string]
if allocatedByQueueAndPriorityClass := accounting.allocationByPoolAndQueueAndPriorityClass[executor.Pool]; allocatedByQueueAndPriorityClass != nil {
allocatedByPriorityClass = allocatedByQueueAndPriorityClass[queue]
Expand Down