Skip to content

Commit

Permalink
Scheduler: refactor node creation
Browse files Browse the repository at this point in the history
  • Loading branch information
robertdavidsmith committed Sep 25, 2024
1 parent f45aefa commit f9bb01a
Show file tree
Hide file tree
Showing 13 changed files with 197 additions and 121 deletions.
3 changes: 2 additions & 1 deletion internal/scheduler/gang_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
schedulerconstraints "github.com/armadaproject/armada/internal/scheduler/constraints"
schedulercontext "github.com/armadaproject/armada/internal/scheduler/context"
"github.com/armadaproject/armada/internal/scheduler/floatingresources"
"github.com/armadaproject/armada/internal/scheduler/internaltypes"
"github.com/armadaproject/armada/internal/scheduler/jobdb"
"github.com/armadaproject/armada/internal/scheduler/nodedb"
"github.com/armadaproject/armada/internal/scheduler/schedulerobjects"
Expand Down Expand Up @@ -194,7 +195,7 @@ func (sch *GangScheduler) trySchedule(ctx *armadacontext.Context, gctx *schedule
}
if ok {
currentFit := gctx.Fit()
if currentFit.NumScheduled == gctx.Cardinality() && currentFit.MeanPreemptedAtPriority == float64(nodedb.MinPriority) {
if currentFit.NumScheduled == gctx.Cardinality() && currentFit.MeanPreemptedAtPriority == float64(internaltypes.MinPriority) {
// Best possible; no need to keep looking.
txn.Commit()
return true, "", nil
Expand Down
89 changes: 81 additions & 8 deletions internal/scheduler/internaltypes/node.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,24 @@
package internaltypes

import (
"math"

"golang.org/x/exp/maps"
v1 "k8s.io/api/core/v1"

Check failure on line 8 in internal/scheduler/internaltypes/node.go

View workflow job for this annotation

GitHub Actions / lint / Lint Go

File is not `goimports`-ed with -local github.com/armadaproject/armada (goimports)
"github.com/armadaproject/armada/internal/scheduler/configuration"
"github.com/armadaproject/armada/internal/scheduler/kubernetesobjects/label"
koTaint "github.com/armadaproject/armada/internal/scheduler/kubernetesobjects/taint"
"github.com/armadaproject/armada/internal/scheduler/schedulerobjects"
"github.com/pkg/errors"
)

const (
// evictedPriority is the priority class priority resources consumed by evicted jobs are accounted for at.
// This helps avoid scheduling new jobs onto nodes that make it impossible to re-schedule evicted jobs.
EvictedPriority int32 = -1
// MinPriority is the smallest possible priority class priority within the NodeDb.
MinPriority int32 = EvictedPriority
)

// Node is a scheduler-internal representation of one Kubernetes node.
Expand All @@ -18,10 +31,10 @@ type Node struct {
index uint64

// Executor this node belongs to and node name, which must be unique per executor.
executor string
name string
pool string
nodeTypeId uint64
executor string
name string
pool string
nodeType *NodeType

// We need to store taints and labels separately from the node type: the latter only includes
// indexed taints and labels, but we need all of them when checking pod requirements.
Expand All @@ -40,9 +53,65 @@ type Node struct {
EvictedJobRunIds map[string]bool
}

func FromSchedulerObjectsNode(node *schedulerobjects.Node,
nodeIndex uint64,
indexedTaints map[string]bool,
indexedNodeLabels map[string]bool,
resourceListFactory *ResourceListFactory,
) (*Node, error) {
taints := node.GetTaints()
if node.Unschedulable {
taints = append(koTaint.DeepCopyTaints(taints), UnschedulableTaint())
}

labels := maps.Clone(node.GetLabels())
if labels == nil {
labels = map[string]string{}
}
labels[configuration.NodeIdLabel] = node.Id

totalResources := node.TotalResources

nodeType := NewNodeType(
taints,
labels,
indexedTaints,
indexedNodeLabels,
)

allocatableByPriority := map[int32]ResourceList{}
minimumPriority := int32(math.MaxInt32)
for p, rl := range node.AllocatableByPriorityAndResource {
if p < minimumPriority {
minimumPriority = p
}
allocatableByPriority[p] = resourceListFactory.FromNodeProto(rl.Resources)
}
if minimumPriority < 0 {
return nil, errors.Errorf("found negative priority %d on node %s; negative priorities are reserved for internal use", minimumPriority, node.Id)
}
allocatableByPriority[EvictedPriority] = allocatableByPriority[minimumPriority]

return CreateNode(
node.Id,
nodeType,
nodeIndex,
node.Executor,
node.Name,
node.Pool,
taints,
labels,
resourceListFactory.FromNodeProto(totalResources.Resources),
allocatableByPriority,
map[string]ResourceList{},
map[string]ResourceList{},
map[string]bool{},
nil), nil
}

func CreateNode(
id string,
nodeTypeId uint64,
nodeType *NodeType,
index uint64,
executor string,
name string,
Expand All @@ -58,7 +127,7 @@ func CreateNode(
) *Node {
return &Node{
id: id,
nodeTypeId: nodeTypeId,
nodeType: nodeType,
index: index,
executor: executor,
name: name,
Expand Down Expand Up @@ -95,7 +164,11 @@ func (node *Node) GetExecutor() string {
}

func (node *Node) GetNodeTypeId() uint64 {
return node.nodeTypeId
return node.nodeType.GetId()
}

func (node *Node) GetNodeType() *NodeType {
return node.nodeType
}

func (node *Node) GetLabels() map[string]string {
Expand Down Expand Up @@ -139,7 +212,7 @@ func (node *Node) DeepCopyNilKeys() *Node {
executor: node.executor,
name: node.name,
pool: node.pool,
nodeTypeId: node.nodeTypeId,
nodeType: node.nodeType,
taints: node.taints,
labels: node.labels,
totalResources: node.totalResources,
Expand Down
12 changes: 10 additions & 2 deletions internal/scheduler/internaltypes/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,16 @@ func TestNode(t *testing.T) {
},
}

nodeType := NewNodeType(
taints,
labels,
map[string]bool{"foo": true},
map[string]bool{"key": true},
)

node := CreateNode(
id,
nodeTypeId,
nodeType,
index,
executor,
name,
Expand All @@ -102,7 +109,8 @@ func TestNode(t *testing.T) {
)

assert.Equal(t, id, node.GetId())
assert.Equal(t, nodeTypeId, node.GetNodeTypeId())
assert.Equal(t, nodeType.GetId(), node.GetNodeTypeId())
assert.Equal(t, nodeType.GetId(), node.GetNodeType().GetId())
assert.Equal(t, index, node.GetIndex())
assert.Equal(t, executor, node.GetExecutor())
assert.Equal(t, name, node.GetName())
Expand Down
2 changes: 1 addition & 1 deletion internal/scheduler/internaltypes/node_type.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ type (
labelsFilterFunc func(key, value string) bool
)

func NewNodeType(taints []v1.Taint, labels map[string]string, indexedTaints map[string]interface{}, indexedLabels map[string]interface{}) *NodeType {
func NewNodeType(taints []v1.Taint, labels map[string]string, indexedTaints map[string]bool, indexedLabels map[string]bool) *NodeType {
if taints == nil {
taints = make([]v1.Taint, 0)
}
Expand Down
4 changes: 2 additions & 2 deletions internal/scheduler/internaltypes/node_type_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func makeSut() *NodeType {
return NewNodeType(
taints,
labels,
map[string]interface{}{"taint1": true, "taint2": true, "taint3": true},
map[string]interface{}{"label1": true, "label2": true, "label3": true},
map[string]bool{"taint1": true, "taint2": true, "taint3": true},
map[string]bool{"label1": true, "label2": true, "label3": true},
)
}
13 changes: 13 additions & 0 deletions internal/scheduler/internaltypes/resource_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,19 @@ func (rl ResourceList) GetResources() []Resource {
return result
}

func (rl ResourceList) ToMap() map[string]k8sResource.Quantity {
if rl.IsEmpty() {
return map[string]k8sResource.Quantity{}
}

result := map[string]k8sResource.Quantity{}
for i, q := range rl.resources {
quantity := k8sResource.NewScaledQuantity(q, rl.factory.scales[i])
result[rl.factory.indexToName[i]] = *quantity
}
return result
}

func (rl ResourceList) AllZero() bool {
if rl.IsEmpty() {
return true
Expand Down
16 changes: 16 additions & 0 deletions internal/scheduler/internaltypes/resource_list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,22 @@ func TestGetResources_HandlesEmptyCorrectly(t *testing.T) {
empty := ResourceList{}
assert.Equal(t, 0, len(empty.GetResources()))
}

Check failure on line 83 in internal/scheduler/internaltypes/resource_list_test.go

View workflow job for this annotation

GitHub Actions / lint / Lint Go

File is not `gofumpt`-ed (gofumpt)
func TestToMap(t *testing.T) {
factory := testFactory()
a := testResourceList(factory, "1", "1Gi")
expected := map[string]k8sResource.Quantity{
"memory": *k8sResource.NewScaledQuantity(1024*1024*1024, k8sResource.Scale(0)),
"ephemeral-storage": *k8sResource.NewScaledQuantity(0, k8sResource.Scale(0)),
"cpu": *k8sResource.NewScaledQuantity(1000, k8sResource.Milli),
"nvidia.com/gpu": *k8sResource.NewScaledQuantity(0, k8sResource.Milli),
}
assert.Equal(t, expected, a.ToMap())
}

func TestToMap_HandlesEmptyCorrectly(t *testing.T) {
empty := ResourceList{}
assert.Equal(t, map[string]k8sResource.Quantity{}, empty.ToMap())
}

func TestAllZero(t *testing.T) {
factory := testFactory()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package nodedb
package internaltypes

import v1 "k8s.io/api/core/v1"

Expand All @@ -16,11 +16,3 @@ func UnschedulableTaint() v1.Taint {
Effect: unschedulableTaintEffect,
}
}

// UnschedulableToleration returns a toleration that tolerates UnschedulableTaint().
func UnschedulableToleration() v1.Toleration {
return v1.Toleration{
Key: unschedulableTaintKey,
Value: unschedulableTaintValue,
}
}
Loading

0 comments on commit f9bb01a

Please sign in to comment.