Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add dominant resource fairness #2614

Merged
merged 12 commits into from
Jun 28, 2023
1 change: 1 addition & 0 deletions config/armada/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ scheduling:
preemption:
nodeEvictionProbability: 1.0
nodeOversubscriptionEvictionProbability: 1.0
protectedFractionOfFairShare: 1.0
setNodeIdSelector: true
nodeIdLabel: kubernetes.io/hostname
setNodeName: false
Expand Down
35 changes: 34 additions & 1 deletion internal/armada/configuration/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,11 @@ type SchedulingConfig struct {
DefaultJobTolerationsByResourceRequest map[string][]v1.Toleration
// Maximum number of times a job is retried before considered failed.
MaxRetries uint
// Weights used when computing fair share.
// Controls how fairness is calculated. Can be either AssetFairness or DominantResourceFairness.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This list of possible values is bound to become out of date.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once DRF is used everywhere, I want to remove asset fairness and leave DRF as the only option.

FairnessType FairnessType
// Used to convert one resource into another when using DominantResourceFairness.
FairnessResourceMapping []ResourceMapping
// Weights used to compute fair share when using AssetFairness.
// Overrides dynamic scarcity calculation if provided.
// Applies to both the new and old scheduler.
ResourceScarcity map[string]float64
Expand Down Expand Up @@ -187,6 +191,33 @@ type SchedulingConfig struct {
AlwaysAttemptScheduling bool
}

// FairnessType controls how fairness is computed.
// In particular, each queue has a cost associated with it and the next job to attempt to schedule
// is taken from the queue with the smallest cost associated with it.
//
// Can be either AssetFairness or DominantResourceFairness.
//
// If AssetFairness, the cost associated with a queue is a linear combination of its total allocation.
// E.g., w_CPU * "CPU allocation" + w_memory * "memory allocation".
//
// If DominantResourceFairness, the cost associated with a queue is
// max("CPU allocation" / "CPU capacity", "memory allocation" / "mamory capacity", ...).
zuqq marked this conversation as resolved.
Show resolved Hide resolved
type FairnessType string

const (
AssertFairness FairnessType = "AssertFairness"
zuqq marked this conversation as resolved.
Show resolved Hide resolved
DominantResourceFairness FairnessType = "DominantResourceFairness"
)

// ResourceMapping describes a mapping from one resource type to another. Used when computing fairness.
// E.g., ResourceMapping{"nvidia.com/mig-1g.10gb", "nvidia.com/gpu", 1/8}
// indicates 1 unit of "nvidia.com/mig-1g.10gb" should be trated as 1/8 unit of "nvidia.com/gpu".
type ResourceMapping struct {
Source string
Target string
Multiplier float64
}

type IndexedResource struct {
// Resource name. E.g., "cpu", "memory", or "nvidia.com/gpu".
Name string
Expand All @@ -209,6 +240,8 @@ type PreemptionConfig struct {
// the probability of evicting jobs on oversubscribed nodes, i.e.,
// nodes on which the total resource requests are greater than the available resources.
NodeOversubscriptionEvictionProbability float64
// Only queues allocated more than this fraction of their fair share are considered for preemption.
ProtectedFractionOfFairShare float64
// If true, the Armada scheduler will add to scheduled pods a node selector
// NodeIdLabel: <value of label on node selected by scheduler>.
// If true, NodeIdLabel must be non-empty.
Expand Down
4 changes: 3 additions & 1 deletion internal/armada/server/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,8 @@ func (q *AggregatedQueueServer) getJobs(ctx context.Context, req *api.StreamingL
schedulerobjects.ResourceList{Resources: totalCapacity},
)
for queue, priorityFactor := range priorityFactorByQueue {
if err := sctx.AddQueueSchedulingContext(queue, priorityFactor, allocatedByQueueAndPriorityClassForPool[queue]); err != nil {
weight := 1 / priorityFactor
if err := sctx.AddQueueSchedulingContext(queue, weight, allocatedByQueueAndPriorityClassForPool[queue]); err != nil {
return nil, err
}
}
Expand All @@ -484,6 +485,7 @@ func (q *AggregatedQueueServer) getJobs(ctx context.Context, req *api.StreamingL
constraints,
q.schedulingConfig.Preemption.NodeEvictionProbability,
q.schedulingConfig.Preemption.NodeOversubscriptionEvictionProbability,
q.schedulingConfig.Preemption.ProtectedFractionOfFairShare,
&SchedulerJobRepositoryAdapter{
r: q.jobRepository,
},
Expand Down
12 changes: 0 additions & 12 deletions internal/scheduler/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package scheduler

import (
"fmt"
"math"
"strconv"
"time"

Expand Down Expand Up @@ -170,17 +169,6 @@ func GangIdAndCardinalityFromAnnotations(annotations map[string]string) (string,
return gangId, gangCardinality, true, nil
}

// ResourceListAsWeightedMillis returns the linear combination of the milli values in rl with given weights.
// This function overflows for values that exceed MaxInt64. E.g., 1Pi is fine but not 10Pi.
func ResourceListAsWeightedMillis(weights map[string]float64, rl schedulerobjects.ResourceList) int64 {
var rv int64
for t, f := range weights {
q := rl.Get(t)
rv += int64(math.Round(float64(q.MilliValue()) * f))
}
return rv
}

func PodRequirementsFromLegacySchedulerJobs[S ~[]E, E interfaces.LegacySchedulerJob](jobs S, priorityClasses map[string]configuration.PriorityClass) []*schedulerobjects.PodRequirements {
rv := make([]*schedulerobjects.PodRequirements, len(jobs))
for i, job := range jobs {
Expand Down
4 changes: 2 additions & 2 deletions internal/scheduler/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func TestResourceListAsWeightedMillis(t *testing.T) {
}
for name, tc := range tests {
t.Run(name, func(t *testing.T) {
assert.Equal(t, tc.expected, ResourceListAsWeightedMillis(tc.weights, tc.rl))
assert.Equal(t, tc.expected, tc.rl.AsWeightedMillis(tc.weights))
})
}
}
Expand All @@ -151,6 +151,6 @@ func BenchmarkResourceListAsWeightedMillis(b *testing.B) {
}
b.ResetTimer()
for n := 0; n < b.N; n++ {
ResourceListAsWeightedMillis(weights, rl)
rl.AsWeightedMillis(weights)
}
}
68 changes: 64 additions & 4 deletions internal/scheduler/context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/pkg/errors"
"golang.org/x/exp/maps"
"golang.org/x/exp/slices"
"k8s.io/apimachinery/pkg/api/resource"

"github.com/armadaproject/armada/internal/armada/configuration"
"github.com/armadaproject/armada/internal/common/armadaerrors"
Expand All @@ -34,6 +35,11 @@ type SchedulingContext struct {
PriorityClasses map[string]configuration.PriorityClass
// Default priority class.
DefaultPriorityClass string
// Determines how fairness is computed.
FairnessType configuration.FairnessType
// Used to convert one resource into another when computing fair share.
// Only applies to DominantResourceFairness.
FairnessResourceMappingBySourceResource map[string]configuration.ResourceMapping
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't seem to be used yet.

Also, it's not clear to me why resource mapping would only apply to DominantResourceFairness; the fractional GPU example you gave above seems like it also makes sense in the case of asset fairness.

// Weights used when computing total resource usage.
ResourceScarcity map[string]float64
// Per-queue scheduling contexts.
Expand Down Expand Up @@ -77,6 +83,7 @@ func NewSchedulingContext(
Pool: pool,
PriorityClasses: priorityClasses,
DefaultPriorityClass: defaultPriorityClass,
FairnessType: configuration.AssertFairness,
zuqq marked this conversation as resolved.
Show resolved Hide resolved
ResourceScarcity: resourceScarcity,
QueueSchedulingContexts: make(map[string]*QueueSchedulingContext),
TotalResources: totalResources.DeepCopy(),
Expand Down Expand Up @@ -106,7 +113,7 @@ func (sctx *SchedulingContext) ClearUnfeasibleSchedulingKeys() {
sctx.UnfeasibleSchedulingKeys = make(map[schedulerobjects.SchedulingKey]*JobSchedulingContext)
}

func (sctx *SchedulingContext) AddQueueSchedulingContext(queue string, priorityFactor float64, initialAllocatedByPriorityClass schedulerobjects.QuantityByTAndResourceType[string]) error {
func (sctx *SchedulingContext) AddQueueSchedulingContext(queue string, weight float64, initialAllocatedByPriorityClass schedulerobjects.QuantityByTAndResourceType[string]) error {
if _, ok := sctx.QueueSchedulingContexts[queue]; ok {
return errors.WithStack(&armadaerrors.ErrInvalidArgument{
Name: "queue",
Expand All @@ -128,7 +135,7 @@ func (sctx *SchedulingContext) AddQueueSchedulingContext(queue string, priorityF
Created: time.Now(),
ExecutorId: sctx.ExecutorId,
Queue: queue,
PriorityFactor: priorityFactor,
Weight: weight,
Allocated: allocated,
AllocatedByPriorityClass: initialAllocatedByPriorityClass,
ScheduledResourcesByPriorityClass: make(schedulerobjects.QuantityByTAndResourceType[string]),
Expand All @@ -145,6 +152,21 @@ func (sctx *SchedulingContext) String() string {
return sctx.ReportString(0)
}

// TotalCostAndWeight returns the sum of the costs and weights across all queues.
// Only queues with non-zero cost contribute towards the total weight.
func (sctx *SchedulingContext) TotalCostAndWeight() (float64, float64) {
var cost float64
var weight float64
for _, qctx := range sctx.QueueSchedulingContexts {
queueCost := qctx.TotalCostForQueue()
if queueCost != 0 {
cost += queueCost
weight += qctx.Weight
}
}
return cost, weight
}

func (sctx *SchedulingContext) ReportString(verbosity int32) string {
var sb strings.Builder
w := tabwriter.NewWriter(&sb, 1, 1, 1, ' ', 0)
Expand Down Expand Up @@ -313,8 +335,8 @@ type QueueSchedulingContext struct {
ExecutorId string
// Queue name.
Queue string
// These factors influence the fraction of resources assigned to each queue.
PriorityFactor float64
// Determines the fair share of this queue relative to other queues.
Weight float64
// Total resources assigned to the queue across all clusters by priority class priority.
// Includes jobs scheduled during this invocation of the scheduler.
Allocated schedulerobjects.ResourceList
Expand Down Expand Up @@ -490,6 +512,44 @@ func (qctx *QueueSchedulingContext) ClearJobSpecs() {
}
}

// TotalCostForQueueWithAllocation returns the cost for which this queue should be penalised when computing fairness,
// if the total allocation of this queue is given by allocated.
zuqq marked this conversation as resolved.
Show resolved Hide resolved
func (qctx *QueueSchedulingContext) TotalCostForQueue() float64 {
return qctx.TotalCostForQueueWithAllocation(qctx.Allocated)
}

// TotalCostForQueueWithAllocation returns the cost for which this queue should be penalised when computing fairness,
// if the total allocation of this queue is given by allocated.
zuqq marked this conversation as resolved.
Show resolved Hide resolved
func (qctx *QueueSchedulingContext) TotalCostForQueueWithAllocation(allocated schedulerobjects.ResourceList) float64 {
switch qctx.SchedulingContext.FairnessType {
case configuration.AssertFairness:
zuqq marked this conversation as resolved.
Show resolved Hide resolved
return qctx.assetFairnessCostWithAllocation(allocated)
case configuration.DominantResourceFairness:
return qctx.dominantResourceFairnessCostWithAllocation(allocated)
default:
panic(fmt.Sprintf("unknown fairness type: %s", qctx.SchedulingContext.FairnessType))
}
}

func (qctx *QueueSchedulingContext) assetFairnessCostWithAllocation(allocated schedulerobjects.ResourceList) float64 {
return float64(allocated.AsWeightedMillis(qctx.SchedulingContext.ResourceScarcity)) / qctx.Weight
}

func (qctx *QueueSchedulingContext) dominantResourceFairnessCostWithAllocation(allocated schedulerobjects.ResourceList) float64 {
var cost float64
for t, q := range allocated.Resources {
totalq := qctx.SchedulingContext.TotalResources.Get(t)
if totalq.Cmp(resource.Quantity{}) == 0 {
totalq.SetMilli(1)
}
tcost := float64(q.MilliValue()) / float64(totalq.MilliValue())
if tcost > cost {
cost = tcost
}
}
return cost / qctx.Weight
}

type GangSchedulingContext struct {
Created time.Time
Queue string
Expand Down
78 changes: 36 additions & 42 deletions internal/scheduler/preempting_queue_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type PreemptingQueueScheduler struct {
constraints schedulerconstraints.SchedulingConstraints
nodeEvictionProbability float64
nodeOversubscriptionEvictionProbability float64
protectedFractionOfFairShare float64
jobRepo JobRepository
nodeDb *nodedb.NodeDb
// Maps job ids to the id of the node the job is associated with.
Expand All @@ -53,6 +54,7 @@ func NewPreemptingQueueScheduler(
constraints schedulerconstraints.SchedulingConstraints,
nodeEvictionProbability float64,
nodeOversubscriptionEvictionProbability float64,
protectedFractionOfFairShare float64,
jobRepo JobRepository,
nodeDb *nodedb.NodeDb,
initialNodeIdByJobId map[string]string,
Expand All @@ -77,6 +79,7 @@ func NewPreemptingQueueScheduler(
constraints: constraints,
nodeEvictionProbability: nodeEvictionProbability,
nodeOversubscriptionEvictionProbability: nodeOversubscriptionEvictionProbability,
protectedFractionOfFairShare: protectedFractionOfFairShare,
jobRepo: jobRepo,
nodeDb: nodeDb,
nodeIdByJobId: maps.Clone(initialNodeIdByJobId),
Expand All @@ -99,7 +102,7 @@ func (sch *PreemptingQueueScheduler) SkipUnsuccessfulSchedulingKeyCheck() {
func (sch *PreemptingQueueScheduler) Schedule(ctx context.Context) (*SchedulerResult, error) {
log := ctxlogrus.Extract(ctx)
log = log.WithField("service", "PreemptingQueueScheduler")
if ResourceListAsWeightedMillis(sch.schedulingContext.ResourceScarcity, sch.schedulingContext.TotalResources) == 0 {
if sch.schedulingContext.TotalResources.AsWeightedMillis(sch.schedulingContext.ResourceScarcity) == 0 {
// This refers to resources available across all clusters, i.e.,
// it may include resources not currently considered for scheduling.
log.Infof(
Expand All @@ -108,7 +111,7 @@ func (sch *PreemptingQueueScheduler) Schedule(ctx context.Context) (*SchedulerRe
)
return &SchedulerResult{}, nil
}
if ResourceListAsWeightedMillis(sch.schedulingContext.ResourceScarcity, sch.nodeDb.TotalResources()) == 0 {
if rl := sch.nodeDb.TotalResources(); rl.AsWeightedMillis(sch.schedulingContext.ResourceScarcity) == 0 {
// This refers to the resources currently considered for scheduling.
log.Infof(
"no resources with non-zero weight available for scheduling in NodeDb: resource scarcity %v, total resources %v",
Expand All @@ -132,16 +135,40 @@ func (sch *PreemptingQueueScheduler) Schedule(ctx context.Context) (*SchedulerRe
snapshot := sch.nodeDb.Txn(false)

// Evict preemptible jobs.
totalCost, totalWeight := sch.schedulingContext.TotalCostAndWeight()
evictorResult, inMemoryJobRepo, err := sch.evict(
ctxlogrus.ToContext(
ctx,
log.WithField("stage", "evict for resource balancing"),
),
NewStochasticEvictor(
NewNodeEvictor(
sch.jobRepo,
sch.schedulingContext.PriorityClasses,
sch.schedulingContext.DefaultPriorityClass,
sch.nodeEvictionProbability,
func(ctx context.Context, job interfaces.LegacySchedulerJob) bool {
if job.GetAnnotations() == nil {
log := ctxlogrus.Extract(ctx)
log.Errorf("can't evict job %s: annotations not initialised", job.GetId())
return false
}
if job.GetNodeSelector() == nil {
log := ctxlogrus.Extract(ctx)
log.Errorf("can't evict job %s: nodeSelector not initialised", job.GetId())
return false
}
if qctx, ok := sch.schedulingContext.QueueSchedulingContexts[job.GetQueue()]; ok {
fairShare := qctx.Weight / totalWeight
actualShare := qctx.TotalCostForQueue() / totalCost
fractionOfFairShare := actualShare / fairShare
if fractionOfFairShare <= sch.protectedFractionOfFairShare {
return false
}
}
if priorityClass, ok := sch.schedulingContext.PriorityClasses[job.GetPriorityClassName()]; ok {
return priorityClass.Preemptible
}
return false
},
nil,
),
)
Expand Down Expand Up @@ -655,13 +682,11 @@ type EvictorResult struct {
NodeIdByJobId map[string]string
}

// NewStochasticEvictor returns a new evictor that for each node evicts
// all preemptible jobs from that node with probability perNodeEvictionProbability.
func NewStochasticEvictor(
func NewNodeEvictor(
jobRepo JobRepository,
priorityClasses map[string]configuration.PriorityClass,
defaultPriorityClass string,
perNodeEvictionProbability float64,
jobFilter func(context.Context, interfaces.LegacySchedulerJob) bool,
random *rand.Rand,
) *Evictor {
if perNodeEvictionProbability <= 0 {
Expand All @@ -670,44 +695,13 @@ func NewStochasticEvictor(
if random == nil {
random = rand.New(rand.NewSource(int64(time.Now().Nanosecond())))
}
return NewPreemptibleEvictor(
jobRepo,
priorityClasses,
defaultPriorityClass,
func(_ context.Context, node *schedulerobjects.Node) bool {
return len(node.AllocatedByJobId) > 0 && random.Float64() < perNodeEvictionProbability
},
)
}

// NewPreemptibleEvictor returns a new evictor that evicts all preemptible jobs
// on nodes for which nodeFilter returns true.
func NewPreemptibleEvictor(
jobRepo JobRepository,
priorityClasses map[string]configuration.PriorityClass,
defaultPriorityClass string,
nodeFilter func(context.Context, *schedulerobjects.Node) bool,
) *Evictor {
return &Evictor{
jobRepo: jobRepo,
priorityClasses: priorityClasses,
nodeFilter: nodeFilter,
jobFilter: func(ctx context.Context, job interfaces.LegacySchedulerJob) bool {
if job.GetAnnotations() == nil {
log := ctxlogrus.Extract(ctx)
log.Warnf("can't evict job %s: annotations not initialised", job.GetId())
return false
}
priorityClassName := job.GetPriorityClassName()
priorityClass, ok := priorityClasses[priorityClassName]
if !ok {
priorityClass = priorityClasses[defaultPriorityClass]
}
if priorityClass.Preemptible {
return true
}
return false
nodeFilter: func(_ context.Context, node *schedulerobjects.Node) bool {
return len(node.AllocatedByJobId) > 0 && random.Float64() < perNodeEvictionProbability
},
jobFilter: jobFilter,
postEvictFunc: defaultPostEvictFunc,
}
}
Expand Down
Loading