Skip to content

Commit

Permalink
Merge branch 'master' into f/chrisma/queuettl
Browse files Browse the repository at this point in the history
  • Loading branch information
d80tb7 authored Jun 6, 2024
2 parents 20895e8 + 0b52beb commit 7555bbc
Show file tree
Hide file tree
Showing 12 changed files with 16 additions and 83 deletions.
26 changes: 0 additions & 26 deletions internal/scheduler/configuration/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,32 +85,13 @@ type MetricsConfig struct {
// Allowed characters in resource names are [a-zA-Z_:][a-zA-Z0-9_:]*
// It can also be used to track multiple resources within the same metric, e.g., "nvidia.com/gpu" and "amd.com/gpu".
ResourceRenaming map[v1.ResourceName]string
// Controls the cycle time metrics.
// TODO(albin): Not used yet.
CycleTimeConfig PrometheusSummaryConfig
// The first matching regex of each error message is cached in an LRU cache.
// This setting controls the cache size.
MatchedRegexIndexByErrorMessageCacheSize uint64
// Reset metrics this often. Resetting periodically ensures inactive time series are garbage-collected.
ResetInterval time.Duration
}

// PrometheusSummaryConfig contains the relevant config for a prometheus.Summary.
type PrometheusSummaryConfig struct {
// Objectives defines the quantile rank estimates with their respective
// absolute error. If Objectives[q] = e, then the value reported for q
// will be the φ-quantile value for some φ between q-e and q+e. The
// default value is an empty map, resulting in a summary without
// quantiles.
Objectives map[float64]float64

// MaxAge defines the duration for which an observation stays relevant
// for the summary. Only applies to pre-calculated quantiles, does not
// apply to _sum and _count. Must be positive. The default value is
// DefMaxAge.
MaxAge time.Duration
}

type LeaderConfig struct {
// Valid modes are "standalone" or "kubernetes"
Mode string `validate:"required"`
Expand Down Expand Up @@ -233,13 +214,6 @@ type SchedulingConfig struct {
MaxRetries uint
// List of resource names, e.g., []string{"cpu", "memory"}, to consider when computing DominantResourceFairness.
DominantResourceFairnessResourcesToConsider []string
// Once a node has been found on which a pod can be scheduled,
// the scheduler will consider up to the next maxExtraNodesToConsider nodes.
// The scheduler selects the node with the best score out of the considered nodes.
// In particular, the score expresses whether preemption is necessary to schedule a pod.
// Hence, a larger MaxExtraNodesToConsider would reduce the expected number of preemptions.
// TODO(albin): Remove. It's unused.
MaxExtraNodesToConsider uint
// Resource types (e.g. memory or nvidia.com/gpu) that the scheduler keeps track of.
// Resource types not on this list will be ignored if seen on a node, and any jobs requesting them will fail.
SupportedResourceTypes []ResourceType
Expand Down
1 change: 0 additions & 1 deletion internal/scheduler/gang_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -509,7 +509,6 @@ func TestGangScheduler(t *testing.T) {
}
nodeDb, err := nodedb.NewNodeDb(
tc.SchedulingConfig.PriorityClasses,
tc.SchedulingConfig.MaxExtraNodesToConsider,
tc.SchedulingConfig.IndexedResources,
tc.SchedulingConfig.IndexedTaints,
tc.SchedulingConfig.IndexedNodeLabels,
Expand Down
35 changes: 5 additions & 30 deletions internal/scheduler/nodedb/nodedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,14 +157,6 @@ type EvictedJobSchedulingContext struct {
type NodeDb struct {
// In-memory database storing *Node.
db *memdb.MemDB
// Once a node has been found on which a pod can be scheduled,
// the NodeDb will consider up to the next maxExtraNodesToConsider nodes.
// The NodeDb selects the node with the best score out of the considered nodes.
// In particular, the score expresses whether preemption is necessary to schedule a pod.
// Hence, a larger maxExtraNodesToConsider would reduce the expected number of preemptions.
//
// TODO: Currently gives no benefit. Since all nodes are given the same score.
maxExtraNodesToConsider uint
// Allowed priority classes.
// Because the number of database indices scales linearly with the number of distinct priorities,
// the efficiency of the NodeDb relies on the number of distinct priorities being small.
Expand Down Expand Up @@ -245,7 +237,6 @@ type NodeDb struct {

func NewNodeDb(
priorityClasses map[string]types.PriorityClass,
maxExtraNodesToConsider uint,
indexedResources []configuration.ResourceType,
indexedTaints []string,
indexedNodeLabels []string,
Expand Down Expand Up @@ -297,7 +288,6 @@ func NewNodeDb(
nodeDb := NodeDb{
priorityClasses: priorityClasses,
nodeDbPriorities: nodeDbPriorities,
maxExtraNodesToConsider: maxExtraNodesToConsider,
indexedResources: indexedResourceNames,
indexedResourcesSet: mapFromSlice(indexedResourceNames),
indexedResourceResolution: indexedResourceResolution,
Expand Down Expand Up @@ -778,42 +768,27 @@ func (nodeDb *NodeDb) selectNodeForPodWithItAtPriority(
onlyCheckDynamicRequirements bool,
) (*internaltypes.Node, error) {
var selectedNode *internaltypes.Node
var selectedNodeScore int
var numExtraNodes uint
for obj := it.Next(); obj != nil; obj = it.Next() {
if selectedNode != nil {
numExtraNodes++
if numExtraNodes > nodeDb.maxExtraNodesToConsider {
break
}
}

node := obj.(*internaltypes.Node)
if node == nil {
return nil, nil
}

var matches bool
var score int
var reason PodRequirementsNotMetReason
var err error
if onlyCheckDynamicRequirements {
matches, score, reason = DynamicJobRequirementsMet(node.AllocatableByPriority[priority], jctx)
matches, reason = DynamicJobRequirementsMet(node.AllocatableByPriority[priority], jctx)
} else {
matches, score, reason, err = JobRequirementsMet(node, priority, jctx)
matches, reason, err = JobRequirementsMet(node, priority, jctx)
}
if err != nil {
return nil, err
}

if matches {
if selectedNode == nil || score > selectedNodeScore {
selectedNode = node
selectedNodeScore = score
if selectedNodeScore == SchedulableBestScore {
break
}
}
selectedNode = node
break
} else {
s := nodeDb.stringFromPodRequirementsNotMetReason(reason)
jctx.PodSchedulingContext.NumExcludedNodesByReason[s] += 1
Expand Down Expand Up @@ -873,7 +848,7 @@ func (nodeDb *NodeDb) selectNodeForJobWithFairPreemption(txn *memdb.Txn, jctx *s
if priority > maxPriority {
maxPriority = priority
}
matches, _, reason, err := JobRequirementsMet(
matches, reason, err := JobRequirementsMet(
node,
// At this point, we've unbound the jobs running on the node.
// Hence, we should check if the job is schedulable at evictedPriority,
Expand Down
4 changes: 0 additions & 4 deletions internal/scheduler/nodedb/nodedb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -554,7 +554,6 @@ func TestScheduleMany(t *testing.T) {
func TestAwayNodeTypes(t *testing.T) {
nodeDb, err := NewNodeDb(
testfixtures.TestPriorityClasses,
testfixtures.TestMaxExtraNodesToConsider,
testfixtures.TestResources,
testfixtures.TestIndexedTaints,
testfixtures.TestIndexedNodeLabels,
Expand Down Expand Up @@ -722,7 +721,6 @@ func TestMakeIndexedResourceResolution_ErrorsOnInvalidResolution(t *testing.T) {
func benchmarkUpsert(nodes []*schedulerobjects.Node, b *testing.B) {
nodeDb, err := NewNodeDb(
testfixtures.TestPriorityClasses,
testfixtures.TestMaxExtraNodesToConsider,
testfixtures.TestResources,
testfixtures.TestIndexedTaints,
testfixtures.TestIndexedNodeLabels,
Expand Down Expand Up @@ -763,7 +761,6 @@ func BenchmarkUpsert100000(b *testing.B) {
func benchmarkScheduleMany(b *testing.B, nodes []*schedulerobjects.Node, jobs []*jobdb.Job) {
nodeDb, err := NewNodeDb(
testfixtures.TestPriorityClasses,
testfixtures.TestMaxExtraNodesToConsider,
testfixtures.TestResources,
testfixtures.TestIndexedTaints,
testfixtures.TestIndexedNodeLabels,
Expand Down Expand Up @@ -890,7 +887,6 @@ func BenchmarkScheduleManyResourceConstrained(b *testing.B) {
func newNodeDbWithNodes(nodes []*schedulerobjects.Node) (*NodeDb, error) {
nodeDb, err := NewNodeDb(
testfixtures.TestPriorityClasses,
testfixtures.TestMaxExtraNodesToConsider,
testfixtures.TestResources,
testfixtures.TestIndexedTaints,
testfixtures.TestIndexedNodeLabels,
Expand Down
18 changes: 7 additions & 11 deletions internal/scheduler/nodedb/nodematching.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,6 @@ import (
)

const (
// When checking if a pod fits on a node, this score indicates how well the pods fits.
// However, all nodes are currently given the same score.
SchedulableScore = 0
SchedulableBestScore = SchedulableScore
PodRequirementsNotMetReasonUnknown = "unknown"
PodRequirementsNotMetReasonInsufficientResources = "insufficient resources available"
)
Expand Down Expand Up @@ -156,16 +152,16 @@ func NodeTypeJobRequirementsMet(nodeType *schedulerobjects.NodeType, jctx *sched
// - 1: Pod can be scheduled without preempting any running pods.
// If the requirements are not met, it returns the reason why.
// If the requirements can't be parsed, an error is returned.
func JobRequirementsMet(node *internaltypes.Node, priority int32, jctx *schedulercontext.JobSchedulingContext) (bool, int, PodRequirementsNotMetReason, error) {
func JobRequirementsMet(node *internaltypes.Node, priority int32, jctx *schedulercontext.JobSchedulingContext) (bool, PodRequirementsNotMetReason, error) {
matches, reason, err := StaticJobRequirementsMet(node, jctx)
if !matches || err != nil {
return matches, 0, reason, err
return matches, reason, err
}
matches, score, reason := DynamicJobRequirementsMet(node.AllocatableByPriority[priority], jctx)
matches, reason = DynamicJobRequirementsMet(node.AllocatableByPriority[priority], jctx)
if !matches {
return matches, 0, reason, nil
return matches, reason, nil
}
return true, score, nil, nil
return true, nil, nil
}

// StaticJobRequirementsMet checks if a job can be scheduled onto this node,
Expand Down Expand Up @@ -201,9 +197,9 @@ func StaticJobRequirementsMet(node *internaltypes.Node, jctx *schedulercontext.J

// DynamicJobRequirementsMet checks if a pod can be scheduled onto this node,
// accounting for resources allocated to pods already assigned to this node.
func DynamicJobRequirementsMet(allocatableResources internaltypes.ResourceList, jctx *schedulercontext.JobSchedulingContext) (bool, int, PodRequirementsNotMetReason) {
func DynamicJobRequirementsMet(allocatableResources internaltypes.ResourceList, jctx *schedulercontext.JobSchedulingContext) (bool, PodRequirementsNotMetReason) {
matches, reason := resourceRequirementsMet(allocatableResources, jctx.ResourceRequirements)
return matches, SchedulableScore, reason
return matches, reason
}

func TolerationRequirementsMet(taints []v1.Taint, tolerations ...[]v1.Toleration) (bool, PodRequirementsNotMetReason) {
Expand Down
2 changes: 1 addition & 1 deletion internal/scheduler/nodedb/nodematching_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ func TestNodeSchedulingRequirementsMet(t *testing.T) {
}
for name, tc := range tests {
t.Run(name, func(t *testing.T) {
matches, _, reason, err := JobRequirementsMet(
matches, reason, err := JobRequirementsMet(
tc.node,
tc.req.Priority,
// TODO(albin): Define a jctx in the test case instead.
Expand Down
1 change: 0 additions & 1 deletion internal/scheduler/queue_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -725,7 +725,6 @@ func TestQueueScheduler(t *testing.T) {
func NewNodeDb(config configuration.SchedulingConfig, stringInterner *stringinterner.StringInterner) (*nodedb.NodeDb, error) {
nodeDb, err := nodedb.NewNodeDb(
config.PriorityClasses,
config.MaxExtraNodesToConsider,
config.IndexedResources,
config.IndexedTaints,
config.IndexedNodeLabels,
Expand Down
1 change: 0 additions & 1 deletion internal/scheduler/scheduling_algo.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,6 @@ func (l *FairSchedulingAlgo) scheduleOnExecutors(
) (*SchedulerResult, *schedulercontext.SchedulingContext, error) {
nodeDb, err := nodedb.NewNodeDb(
l.schedulingConfig.PriorityClasses,
l.schedulingConfig.MaxExtraNodesToConsider,
l.schedulingConfig.IndexedResources,
l.schedulingConfig.IndexedTaints,
l.schedulingConfig.IndexedNodeLabels,
Expand Down
1 change: 0 additions & 1 deletion internal/scheduler/scheduling_algo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -530,7 +530,6 @@ func BenchmarkNodeDbConstruction(b *testing.B) {

nodeDb, err := nodedb.NewNodeDb(
schedulingConfig.PriorityClasses,
schedulingConfig.MaxExtraNodesToConsider,
schedulingConfig.IndexedResources,
schedulingConfig.IndexedTaints,
schedulingConfig.IndexedNodeLabels,
Expand Down
1 change: 0 additions & 1 deletion internal/scheduler/simulator/simulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,6 @@ func (s *Simulator) setupClusters() error {
for executorGroupIndex, executorGroup := range pool.ClusterGroups {
nodeDb, err := nodedb.NewNodeDb(
s.schedulingConfig.PriorityClasses,
s.schedulingConfig.MaxExtraNodesToConsider,
s.schedulingConfig.IndexedResources,
s.schedulingConfig.IndexedTaints,
s.schedulingConfig.IndexedNodeLabels,
Expand Down
1 change: 0 additions & 1 deletion internal/scheduler/submitcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,6 @@ func (srv *SubmitChecker) getSchedulingResult(gctx *schedulercontext.GangSchedul
func (srv *SubmitChecker) constructNodeDb(executor *schedulerobjects.Executor) (*nodedb.NodeDb, error) {
nodeDb, err := nodedb.NewNodeDb(
srv.schedulingConfig.PriorityClasses,
0,
srv.schedulingConfig.IndexedResources,
srv.schedulingConfig.IndexedTaints,
srv.schedulingConfig.IndexedNodeLabels,
Expand Down
8 changes: 3 additions & 5 deletions internal/scheduler/testfixtures/testfixtures.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,9 @@ var (
"armada-preemptible-away": {Priority: 30000, Preemptible: true, AwayNodeTypes: []types.AwayNodeType{{Priority: 29000, WellKnownNodeTypeName: "gpu"}}},
"armada-preemptible": {Priority: 30000, Preemptible: true},
}
TestDefaultPriorityClass = PriorityClass3
TestPriorities = []int32{0, 1, 2, 3}
TestMaxExtraNodesToConsider uint = 1
TestResources = []schedulerconfiguration.ResourceType{
TestDefaultPriorityClass = PriorityClass3
TestPriorities = []int32{0, 1, 2, 3}
TestResources = []schedulerconfiguration.ResourceType{
{Name: "cpu", Resolution: resource.MustParse("1")},
{Name: "memory", Resolution: resource.MustParse("128Mi")},
{Name: "nvidia.com/gpu", Resolution: resource.MustParse("1")},
Expand Down Expand Up @@ -168,7 +167,6 @@ func TestSchedulingConfig() schedulerconfiguration.SchedulingConfig {
MaximumSchedulingBurst: math.MaxInt,
MaximumPerQueueSchedulingRate: math.Inf(1),
MaximumPerQueueSchedulingBurst: math.MaxInt,
MaxExtraNodesToConsider: TestMaxExtraNodesToConsider,
IndexedResources: TestResources,
IndexedNodeLabels: TestIndexedNodeLabels,
IndexedTaints: TestIndexedTaints,
Expand Down

0 comments on commit 7555bbc

Please sign in to comment.