From 13722af873a098ed2bbf933ac686552dff1be640 Mon Sep 17 00:00:00 2001 From: Noah Held Date: Thu, 29 Jun 2023 10:01:47 +0100 Subject: [PATCH] Add `Entry` --- internal/armada/server/lease.go | 37 +- internal/scheduler/context/context.go | 9 +- internal/scheduler/gang_scheduler_test.go | 8 +- internal/scheduler/nodedb/nodedb.go | 429 ++++++++++-------- internal/scheduler/nodedb/nodedb_test.go | 184 ++++---- internal/scheduler/nodedb/nodeiteration.go | 44 +- .../scheduler/nodedb/nodeiteration_test.go | 71 ++- internal/scheduler/pool_assigner.go | 15 +- .../scheduler/preempting_queue_scheduler.go | 26 +- .../preempting_queue_scheduler_test.go | 69 +-- internal/scheduler/queue_scheduler.go | 5 +- internal/scheduler/queue_scheduler_test.go | 22 +- .../schedulerobjects/nodematching.go | 24 +- .../schedulerobjects/nodematching_test.go | 2 +- .../scheduler/schedulerobjects/nodetype.go | 4 - internal/scheduler/scheduling_algo.go | 32 +- internal/scheduler/scheduling_algo_test.go | 7 - internal/scheduler/submitcheck.go | 11 +- 18 files changed, 551 insertions(+), 448 deletions(-) diff --git a/internal/armada/server/lease.go b/internal/armada/server/lease.go index 13602f1c8be..9981c971742 100644 --- a/internal/armada/server/lease.go +++ b/internal/armada/server/lease.go @@ -308,12 +308,26 @@ func (q *AggregatedQueueServer) getJobs(ctx context.Context, req *api.StreamingL // Nodes to be considered by the scheduler. lastSeen := q.clock.Now() - nodes := make([]*schedulerobjects.Node, 0, len(req.Nodes)) + + nodeDb, err := nodedb.NewNodeDb( + q.schedulingConfig.Preemption.PriorityClasses, + q.schedulingConfig.MaxExtraNodesToConsider, + q.schedulingConfig.IndexedResources, + q.schedulingConfig.IndexedTaints, + q.schedulingConfig.IndexedNodeLabels, + ) + if err != nil { + return nil, err + } + txn := nodeDb.Txn(true) + defer txn.Abort() + allocatedByQueueAndPriorityClassForCluster := make(map[string]schedulerobjects.QuantityByTAndResourceType[string], len(queues)) jobIdsByGangId := make(map[string]map[string]bool) gangIdByJobId := make(map[string]string) nodeIdByJobId := make(map[string]string) - for _, nodeInfo := range req.Nodes { + nodes := make([]*schedulerobjects.Node, len(req.Nodes)) + for i, nodeInfo := range req.Nodes { node, err := api.NewNodeFromNodeInfo( &nodeInfo, req.ClusterId, @@ -326,6 +340,7 @@ func (q *AggregatedQueueServer) getJobs(ctx context.Context, req *api.StreamingL ) continue } + nodes[i] = node jobIds := make([]string, 0, len(nodeInfo.RunIdsByState)) for jobId, jobState := range nodeInfo.RunIdsByState { @@ -378,7 +393,7 @@ func (q *AggregatedQueueServer) getJobs(ctx context.Context, req *api.StreamingL } // Bind pods to nodes, thus ensuring resources are marked as allocated on the node. - if node, err = nodedb.BindJobsToNode(q.schedulingConfig.Preemption.PriorityClasses, jobs, node); err != nil { + if err := nodeDb.CreateAndInsertWithApiJobsWithTxn(txn, jobs, node); err != nil { return nil, err } @@ -386,26 +401,14 @@ func (q *AggregatedQueueServer) getJobs(ctx context.Context, req *api.StreamingL for _, job := range jobs { nodeIdByJobId[job.Id] = node.Id } - nodes = append(nodes, node) // Record which queues have jobs running. Necessary to omit inactive queues. for _, job := range jobs { isActiveByQueueName[job.Queue] = true } } - nodeDb, err := nodedb.NewNodeDb( - q.schedulingConfig.Preemption.PriorityClasses, - q.schedulingConfig.MaxExtraNodesToConsider, - q.schedulingConfig.IndexedResources, - q.schedulingConfig.IndexedTaints, - q.schedulingConfig.IndexedNodeLabels, - ) - if err != nil { - return nil, err - } - if err := nodeDb.UpsertMany(nodes); err != nil { - return nil, err - } + + txn.Commit() // Load allocation reports for all executors from Redis. reportsByExecutor, err := q.usageRepository.GetClusterQueueResourceUsage() diff --git a/internal/scheduler/context/context.go b/internal/scheduler/context/context.go index ea82eef6c6a..f28756fac25 100644 --- a/internal/scheduler/context/context.go +++ b/internal/scheduler/context/context.go @@ -657,9 +657,8 @@ func JobSchedulingContextsFromJobs[J interfaces.LegacySchedulerJob](priorityClas type PodSchedulingContext struct { // Time at which this context was created. Created time.Time - // Node the pod was assigned to. - // If nil, the pod could not be assigned to any node. - Node *schedulerobjects.Node + // ID of the node that the pod was assigned to, or empty. + NodeId string // Score indicates how well the pod fits on the selected node. Score int // Node types on which this pod could be scheduled. @@ -673,8 +672,8 @@ type PodSchedulingContext struct { func (pctx *PodSchedulingContext) String() string { var sb strings.Builder w := tabwriter.NewWriter(&sb, 1, 1, 1, ' ', 0) - if pctx.Node != nil { - fmt.Fprintf(w, "Node:\t%s\n", pctx.Node.Id) + if pctx.NodeId != "" { + fmt.Fprintf(w, "Node:\t%s\n", pctx.NodeId) } else { fmt.Fprint(w, "Node:\tnone\n") } diff --git a/internal/scheduler/gang_scheduler_test.go b/internal/scheduler/gang_scheduler_test.go index 6fc67fc4bb7..2b98f6652ba 100644 --- a/internal/scheduler/gang_scheduler_test.go +++ b/internal/scheduler/gang_scheduler_test.go @@ -221,8 +221,12 @@ func TestGangScheduler(t *testing.T) { testfixtures.TestIndexedNodeLabels, ) require.NoError(t, err) - err = nodeDb.UpsertMany(tc.Nodes) - require.NoError(t, err) + txn := nodeDb.Txn(true) + for _, node := range tc.Nodes { + err := nodeDb.CreateAndInsertWithJobDbJobsWithTxn(txn, nil, node) + require.NoError(t, err) + } + txn.Commit() if tc.TotalResources.Resources == nil { // Default to NodeDb total. tc.TotalResources = nodeDb.TotalResources() diff --git a/internal/scheduler/nodedb/nodedb.go b/internal/scheduler/nodedb/nodedb.go index 4b8f4c816c6..68826e191d8 100644 --- a/internal/scheduler/nodedb/nodedb.go +++ b/internal/scheduler/nodedb/nodedb.go @@ -17,20 +17,176 @@ import ( "github.com/armadaproject/armada/internal/armada/configuration" "github.com/armadaproject/armada/internal/common/armadaerrors" + armadamaps "github.com/armadaproject/armada/internal/common/maps" "github.com/armadaproject/armada/internal/common/util" schedulerconfig "github.com/armadaproject/armada/internal/scheduler/configuration" schedulercontext "github.com/armadaproject/armada/internal/scheduler/context" "github.com/armadaproject/armada/internal/scheduler/interfaces" + "github.com/armadaproject/armada/internal/scheduler/jobdb" "github.com/armadaproject/armada/internal/scheduler/schedulerobjects" + "github.com/armadaproject/armada/pkg/api" ) // evictedPriority is the priority class priority resources consumed by evicted jobs are accounted for at. // This helps avoid scheduling new jobs onto nodes that make it impossible to re-schedule evicted jobs. const evictedPriority int32 = -1 +type Node struct { + Id string + Name string + + // We need to store taints and labels separately from the node type: the latter only includes + // indexed taints and labels, but we need all of them when checking pod requirements. + Taints []v1.Taint + Labels map[string]string + + TotalResources schedulerobjects.ResourceList + + // This field is set when inserting the Node into the node database. + Keys [][]byte + + NodeTypeId uint64 + + AllocatableByPriority schedulerobjects.AllocatableByPriorityAndResourceType + AllocatedByQueue map[string]schedulerobjects.ResourceList + AllocatedByJobId map[string]schedulerobjects.ResourceList + EvictedJobRunIds map[string]bool +} + +// UnsafeCopy returns a pointer to a new value of type Node; it is unsafe because it only makes +// shallow copies of fields that are not mutated by methods of NodeDb. +func (node *Node) UnsafeCopy() *Node { + return &Node{ + Id: node.Id, + Name: node.Name, + + Taints: node.Taints, + Labels: node.Labels, + + TotalResources: node.TotalResources, + + Keys: nil, + + NodeTypeId: node.NodeTypeId, + + AllocatableByPriority: armadamaps.DeepCopy(node.AllocatableByPriority), + AllocatedByQueue: armadamaps.DeepCopy(node.AllocatedByQueue), + AllocatedByJobId: armadamaps.DeepCopy(node.AllocatedByJobId), + EvictedJobRunIds: maps.Clone(node.EvictedJobRunIds), + } +} + +func (nodeDb *NodeDb) create(node *schedulerobjects.Node) (*Node, error) { + taints := node.GetTaints() + if node.Unschedulable { + taints = append(slices.Clone(taints), UnschedulableTaint()) + } + + labels := maps.Clone(node.GetLabels()) + if labels == nil { + labels = make(map[string]string) + } + labels[schedulerconfig.NodeIdLabel] = node.Id + + totalResources := node.TotalResources + + nodeType := schedulerobjects.NewNodeType( + taints, + labels, + nodeDb.indexedTaints, + nodeDb.indexedNodeLabels, + ) + + allocatableByPriority := schedulerobjects.AllocatableByPriorityAndResourceType(node.AllocatableByPriorityAndResource).DeepCopy() + minimumPriority := int32(math.MaxInt32) + for p := range allocatableByPriority { + if p < minimumPriority { + minimumPriority = p + } + } + if minimumPriority < 0 { + return nil, errors.Errorf("found negative priority %d on node %s; negative priorities are reserved for internal use", minimumPriority, node.Id) + } + allocatableByPriority[evictedPriority] = allocatableByPriority[minimumPriority].DeepCopy() + + allocatedByQueue := node.AllocatedByQueue + if allocatedByQueue == nil { + allocatedByQueue = make(map[string]schedulerobjects.ResourceList) + } + + allocatedByJobId := node.AllocatedByJobId + if allocatedByJobId == nil { + allocatedByJobId = make(map[string]schedulerobjects.ResourceList) + } + + evictedJobRunIds := node.EvictedJobRunIds + if evictedJobRunIds == nil { + evictedJobRunIds = make(map[string]bool) + } + + nodeDb.mu.Lock() + nodeDb.numNodes++ + nodeDb.numNodesByNodeType[nodeType.Id]++ + nodeDb.totalResources.Add(totalResources) + nodeDb.nodeTypes[nodeType.Id] = nodeType + nodeDb.mu.Unlock() + + entry := &Node{ + Id: node.Id, + Name: node.Name, + + Taints: taints, + Labels: labels, + + TotalResources: totalResources, + + Keys: nil, + + NodeTypeId: nodeType.Id, + + AllocatableByPriority: allocatableByPriority, + AllocatedByQueue: allocatedByQueue, + AllocatedByJobId: allocatedByJobId, + EvictedJobRunIds: evictedJobRunIds, + } + return entry, nil +} + +func (nodeDb *NodeDb) CreateAndInsertWithApiJobsWithTxn(txn *memdb.Txn, jobs []*api.Job, node *schedulerobjects.Node) error { + entry, err := nodeDb.create(node) + if err != nil { + return err + } + for _, job := range jobs { + if err := bindJobToNodeInPlace(nodeDb.priorityClasses, job, entry); err != nil { + return err + } + } + if err := nodeDb.UpsertWithTxn(txn, entry); err != nil { + return err + } + return nil +} + +func (nodeDb *NodeDb) CreateAndInsertWithJobDbJobsWithTxn(txn *memdb.Txn, jobs []*jobdb.Job, node *schedulerobjects.Node) error { + entry, err := nodeDb.create(node) + if err != nil { + return err + } + for _, job := range jobs { + if err := bindJobToNodeInPlace(nodeDb.priorityClasses, job, entry); err != nil { + return err + } + } + if err := nodeDb.UpsertWithTxn(txn, entry); err != nil { + return err + } + return nil +} + // NodeDb is the scheduler-internal system used to efficiently find nodes on which a pod could be scheduled. type NodeDb struct { - // In-memory database storing *schedulerobjects.Node. + // In-memory database storing *Node. db *memdb.MemDB // Once a node has been found on which a pod can be scheduled, // the NodeDb will consider up to the next maxExtraNodesToConsider nodes. @@ -77,6 +233,10 @@ type NodeDb struct { // // If not set, no labels are indexed. indexedNodeLabels map[string]interface{} + + // Mutex for the remaining fields of this struct, which are mutated after initialization. + mu sync.Mutex + // Total number of nodes in the db. numNodes int // Number of nodes in the db by node type. @@ -89,8 +249,6 @@ type NodeDb struct { // Map from podRequirementsNotMetReason Sum64() to the string representation of that reason. // Used to avoid allocs. podRequirementsNotMetReasonStringCache map[uint64]string - // Mutex to control access to totalResources and NodeTypes. - mu sync.Mutex } func NewNodeDb( @@ -201,13 +359,13 @@ func (nodeDb *NodeDb) Txn(write bool) *memdb.Txn { } // GetNode returns a node in the db with given id. -func (nodeDb *NodeDb) GetNode(id string) (*schedulerobjects.Node, error) { +func (nodeDb *NodeDb) GetNode(id string) (*Node, error) { return nodeDb.GetNodeWithTxn(nodeDb.Txn(false), id) } // GetNodeWithTxn returns a node in the db with given id, // within the provided transactions. -func (nodeDb *NodeDb) GetNodeWithTxn(txn *memdb.Txn, id string) (*schedulerobjects.Node, error) { +func (nodeDb *NodeDb) GetNodeWithTxn(txn *memdb.Txn, id string) (*Node, error) { it, err := txn.Get("nodes", "id", id) if err != nil { return nil, errors.WithStack(err) @@ -216,20 +374,16 @@ func (nodeDb *NodeDb) GetNodeWithTxn(txn *memdb.Txn, id string) (*schedulerobjec if obj == nil { return nil, nil } - if node, ok := obj.(*schedulerobjects.Node); !ok { - panic(fmt.Sprintf("expected *Node, but got %T", obj)) - } else { - return node, nil - } + return obj.(*Node), nil } // NodeJobDiff compares two snapshots of the NodeDb memdb and returns // - a map from job ids of all preempted jobs to the node they used to be on // - a map from job ids of all scheduled jobs to the node they were scheduled on // that happened between the two snapshots. -func NodeJobDiff(txnA, txnB *memdb.Txn) (map[string]*schedulerobjects.Node, map[string]*schedulerobjects.Node, error) { - preempted := make(map[string]*schedulerobjects.Node) - scheduled := make(map[string]*schedulerobjects.Node) +func NodeJobDiff(txnA, txnB *memdb.Txn) (map[string]*Node, map[string]*Node, error) { + preempted := make(map[string]*Node) + scheduled := make(map[string]*Node) nodePairIterator, err := NewNodePairIterator(txnA, txnB) if err != nil { return nil, nil, err @@ -286,7 +440,7 @@ func (nodeDb *NodeDb) ScheduleMany(jctxs []*schedulercontext.JobSchedulingContex if pctx == nil { continue } - pctx.Node = nil + pctx.NodeId = "" } } return ok, err @@ -295,19 +449,18 @@ func (nodeDb *NodeDb) ScheduleMany(jctxs []*schedulercontext.JobSchedulingContex func (nodeDb *NodeDb) ScheduleManyWithTxn(txn *memdb.Txn, jctxs []*schedulercontext.JobSchedulingContext) (bool, error) { // Attempt to schedule pods one by one in a transaction. for _, jctx := range jctxs { - if err := nodeDb.SelectNodeForJobWithTxn(txn, jctx); err != nil { + node, err := nodeDb.SelectNodeForJobWithTxn(txn, jctx) + if err != nil { return false, err } - pctx := jctx.PodSchedulingContext // If we found a node for this pod, bind it and continue to the next pod. - if pctx != nil && pctx.Node != nil { - if node, err := BindJobToNode(nodeDb.priorityClasses, jctx.Job, pctx.Node); err != nil { + if node != nil { + if node, err := bindJobToNode(nodeDb.priorityClasses, jctx.Job, node); err != nil { return false, err } else { if err := nodeDb.UpsertWithTxn(txn, node); err != nil { return false, err } - pctx.Node = node } } else { return false, nil @@ -317,13 +470,13 @@ func (nodeDb *NodeDb) ScheduleManyWithTxn(txn *memdb.Txn, jctxs []*schedulercont } // SelectNodeForJobWithTxn selects a node on which the job can be scheduled. -func (nodeDb *NodeDb) SelectNodeForJobWithTxn(txn *memdb.Txn, jctx *schedulercontext.JobSchedulingContext) error { +func (nodeDb *NodeDb) SelectNodeForJobWithTxn(txn *memdb.Txn, jctx *schedulercontext.JobSchedulingContext) (*Node, error) { req := jctx.PodRequirements // Collect all node types that could potentially schedule the pod. matchingNodeTypes, numExcludedNodesByReason, err := nodeDb.NodeTypesMatchingPod(req) if err != nil { - return err + return nil, err } // Create a pctx to be returned to the caller. @@ -337,7 +490,7 @@ func (nodeDb *NodeDb) SelectNodeForJobWithTxn(txn *memdb.Txn, jctx *schedulercon // For pods that failed to schedule, add an exclusion reason for implicitly excluded nodes. defer func() { - if pctx.Node != nil { + if pctx.NodeId != "" { return } numExplicitlyExcludedNodes := 0 @@ -354,12 +507,12 @@ func (nodeDb *NodeDb) SelectNodeForJobWithTxn(txn *memdb.Txn, jctx *schedulercon // and schedule onto that node even if it requires preempting other jobs. if nodeId, ok := req.NodeSelector[schedulerconfig.NodeIdLabel]; ok { if it, err := txn.Get("nodes", "id", nodeId); err != nil { - return errors.WithStack(err) + return nil, errors.WithStack(err) } else { - if _, err := nodeDb.selectNodeForPodWithIt(pctx, it, req.Priority, req, true); err != nil { - return err + if node, err := nodeDb.selectNodeForPodWithIt(pctx, it, req.Priority, req, true); err != nil { + return nil, err } else { - return nil + return node, nil } } } @@ -378,21 +531,22 @@ func (nodeDb *NodeDb) SelectNodeForJobWithTxn(txn *memdb.Txn, jctx *schedulercon // Try to find a node at this priority. node, err := nodeDb.selectNodeForPodAtPriority(txn, pctx, priority, req) if err != nil { - return err + return nil, err } if node != nil { - if pctx.Node == nil { - return errors.New("pctx.Node not set") + if pctx.NodeId == "" { + return nil, errors.New("pctx.NodeId not set") } - if node.Id != pctx.Node.Id { - return errors.New("pctx.Node.Id does not match that of the returned node") + if node.Id != pctx.NodeId { + return nil, errors.New("pctx.NodeId does not match that of the returned node") } - return nil - } else if pctx.Node != nil { - return errors.New("pctx.Node is set, but no node was returned") + return node, nil + } + if pctx.NodeId != "" { + return nil, errors.New("pctx.NodeId is set, but no node was returned") } } - return nil + return nil, nil } func (nodeDb *NodeDb) selectNodeForPodAtPriority( @@ -400,7 +554,7 @@ func (nodeDb *NodeDb) selectNodeForPodAtPriority( pctx *schedulercontext.PodSchedulingContext, priority int32, req *schedulerobjects.PodRequirements, -) (*schedulerobjects.Node, error) { +) (*Node, error) { nodeTypeIds := make([]uint64, len(pctx.MatchingNodeTypes)) for i, nodeType := range pctx.MatchingNodeTypes { nodeTypeIds[i] = nodeType.Id @@ -442,27 +596,37 @@ func (nodeDb *NodeDb) selectNodeForPodWithIt( priority int32, req *schedulerobjects.PodRequirements, onlyCheckDynamicRequirements bool, -) (*schedulerobjects.Node, error) { - var selectedNode *schedulerobjects.Node +) (*Node, error) { + var selectedNode *Node var selectedNodeScore int - var numConsideredNodes uint + var numExtraNodes uint for obj := it.Next(); obj != nil; obj = it.Next() { - node := obj.(*schedulerobjects.Node) + if selectedNode != nil { + numExtraNodes++ + if numExtraNodes > nodeDb.maxExtraNodesToConsider { + break + } + } + + node := obj.(*Node) if node == nil { return nil, nil } + var matches bool var score int var reason schedulerobjects.PodRequirementsNotMetReason var err error if onlyCheckDynamicRequirements { - matches, score, reason, err = node.DynamicPodRequirementsMet(priority, req) + matches, score, reason, err = schedulerobjects.DynamicPodRequirementsMet(node.AllocatableByPriority[priority], req) } else { - matches, score, reason, err = node.PodRequirementsMet(priority, req) + matches, score, reason, err = schedulerobjects.PodRequirementsMet(node.Taints, node.Labels, node.TotalResources, node.AllocatableByPriority[priority], req) } if err != nil { return nil, err - } else if matches { + } + + if matches { if selectedNode == nil || score > selectedNodeScore { selectedNode = node selectedNodeScore = score @@ -474,40 +638,26 @@ func (nodeDb *NodeDb) selectNodeForPodWithIt( s := nodeDb.stringFromPodRequirementsNotMetReason(reason) pctx.NumExcludedNodesByReason[s] += 1 } - if selectedNode != nil { - numConsideredNodes++ - if numConsideredNodes == nodeDb.maxExtraNodesToConsider+1 { - break - } - } } - pctx.Node = selectedNode - pctx.Score = selectedNodeScore - return selectedNode, nil -} -// BindJobsToNode returns a copy of node with all elements of jobs bound to it. -func BindJobsToNode[J interfaces.LegacySchedulerJob](priorityClasses map[string]configuration.PriorityClass, jobs []J, node *schedulerobjects.Node) (*schedulerobjects.Node, error) { - node = node.DeepCopy() - for _, job := range jobs { - if err := bindJobToNodeInPlace(priorityClasses, job, node); err != nil { - return nil, err - } + if selectedNode != nil { + pctx.NodeId = selectedNode.Id + pctx.Score = selectedNodeScore } - return node, nil + return selectedNode, nil } -// BindJobToNode returns a copy of node with job bound to it. -func BindJobToNode(priorityClasses map[string]configuration.PriorityClass, job interfaces.LegacySchedulerJob, node *schedulerobjects.Node) (*schedulerobjects.Node, error) { - node = node.DeepCopy() +// bindJobToNode returns a copy of node with job bound to it. +func bindJobToNode(priorityClasses map[string]configuration.PriorityClass, job interfaces.LegacySchedulerJob, node *Node) (*Node, error) { + node = node.UnsafeCopy() if err := bindJobToNodeInPlace(priorityClasses, job, node); err != nil { return nil, err } return node, nil } -// bindJobToNodeInPlace is like BindJobToNode, but doesn't make a copy of node. -func bindJobToNodeInPlace(priorityClasses map[string]configuration.PriorityClass, job interfaces.LegacySchedulerJob, node *schedulerobjects.Node) error { +// bindJobToNodeInPlace is like bindJobToNode, but doesn't make a copy of node. +func bindJobToNodeInPlace(priorityClasses map[string]configuration.PriorityClass, job interfaces.LegacySchedulerJob, node *Node) error { jobId := job.GetId() requests := job.GetResourceRequirements().Requests @@ -534,7 +684,7 @@ func bindJobToNodeInPlace(priorityClasses map[string]configuration.PriorityClass node.AllocatedByQueue[queue] = allocatedToQueue } - allocatable := schedulerobjects.AllocatableByPriorityAndResourceType(node.AllocatableByPriorityAndResource) + allocatable := node.AllocatableByPriority priority := priorityClasses[job.GetPriorityClassName()].Priority allocatable.MarkAllocatedV1ResourceList(priority, requests) if isEvicted { @@ -557,10 +707,10 @@ func EvictJobsFromNode( priorityClasses map[string]configuration.PriorityClass, jobFilter func(interfaces.LegacySchedulerJob) bool, jobs []interfaces.LegacySchedulerJob, - node *schedulerobjects.Node, -) ([]interfaces.LegacySchedulerJob, *schedulerobjects.Node, error) { + node *Node, +) ([]interfaces.LegacySchedulerJob, *Node, error) { evicted := make([]interfaces.LegacySchedulerJob, 0) - node = node.DeepCopy() + node = node.UnsafeCopy() for _, job := range jobs { if jobFilter != nil && !jobFilter(job) { continue @@ -574,7 +724,7 @@ func EvictJobsFromNode( } // evictJobFromNodeInPlace is the in-place operation backing EvictJobsFromNode. -func evictJobFromNodeInPlace(priorityClasses map[string]configuration.PriorityClass, job interfaces.LegacySchedulerJob, node *schedulerobjects.Node) error { +func evictJobFromNodeInPlace(priorityClasses map[string]configuration.PriorityClass, job interfaces.LegacySchedulerJob, node *Node) error { jobId := job.GetId() if _, ok := node.AllocatedByJobId[jobId]; !ok { return errors.Errorf("job %s has no resources allocated on node %s", jobId, node.Id) @@ -593,25 +743,7 @@ func evictJobFromNodeInPlace(priorityClasses map[string]configuration.PriorityCl } node.EvictedJobRunIds[jobId] = true - if _, ok := node.AllocatableByPriorityAndResource[evictedPriority]; !ok { - minimumPriority := int32(math.MaxInt32) - foundPriority := false - for p := range node.AllocatableByPriorityAndResource { - if p < minimumPriority { - minimumPriority = p - foundPriority = true - } - } - if foundPriority { - node.AllocatableByPriorityAndResource[evictedPriority] = node.AllocatableByPriorityAndResource[minimumPriority].DeepCopy() - } else { - // We do not expect to hit this branch; however, if we do, then we need - // to make sure that evictedPriority is in this map so that the call to - // MarkAllocatedV1ResourceList() below knows about it. - node.AllocatableByPriorityAndResource[evictedPriority] = schedulerobjects.ResourceList{} - } - } - allocatable := schedulerobjects.AllocatableByPriorityAndResourceType(node.AllocatableByPriorityAndResource) + allocatable := node.AllocatableByPriority priority := priorityClasses[job.GetPriorityClassName()].Priority requests := job.GetResourceRequirements().Requests allocatable.MarkAllocatableV1ResourceList(priority, requests) @@ -621,8 +753,8 @@ func evictJobFromNodeInPlace(priorityClasses map[string]configuration.PriorityCl } // UnbindJobsFromNode returns a node with all elements of jobs unbound from it. -func UnbindJobsFromNode(priorityClasses map[string]configuration.PriorityClass, jobs []interfaces.LegacySchedulerJob, node *schedulerobjects.Node) (*schedulerobjects.Node, error) { - node = node.DeepCopy() +func UnbindJobsFromNode(priorityClasses map[string]configuration.PriorityClass, jobs []interfaces.LegacySchedulerJob, node *Node) (*Node, error) { + node = node.UnsafeCopy() for _, job := range jobs { if err := unbindJobFromNodeInPlace(priorityClasses, job, node); err != nil { return nil, err @@ -632,8 +764,8 @@ func UnbindJobsFromNode(priorityClasses map[string]configuration.PriorityClass, } // UnbindJobFromNode returns a copy of node with job unbound from it. -func UnbindJobFromNode(priorityClasses map[string]configuration.PriorityClass, job interfaces.LegacySchedulerJob, node *schedulerobjects.Node) (*schedulerobjects.Node, error) { - node = node.DeepCopy() +func UnbindJobFromNode(priorityClasses map[string]configuration.PriorityClass, job interfaces.LegacySchedulerJob, node *Node) (*Node, error) { + node = node.UnsafeCopy() if err := unbindJobFromNodeInPlace(priorityClasses, job, node); err != nil { return nil, err } @@ -641,7 +773,7 @@ func UnbindJobFromNode(priorityClasses map[string]configuration.PriorityClass, j } // unbindPodFromNodeInPlace is like UnbindJobFromNode, but doesn't make a copy of node. -func unbindJobFromNodeInPlace(priorityClasses map[string]configuration.PriorityClass, job interfaces.LegacySchedulerJob, node *schedulerobjects.Node) error { +func unbindJobFromNodeInPlace(priorityClasses map[string]configuration.PriorityClass, job interfaces.LegacySchedulerJob, node *Node) error { jobId := job.GetId() requests := job.GetResourceRequirements().Requests @@ -664,7 +796,7 @@ func unbindJobFromNodeInPlace(priorityClasses map[string]configuration.PriorityC } } - allocatable := schedulerobjects.AllocatableByPriorityAndResourceType(node.AllocatableByPriorityAndResource) + allocatable := node.AllocatableByPriority priority := priorityClasses[job.GetPriorityClassName()].Priority if isEvicted { priority = evictedPriority @@ -696,7 +828,7 @@ func (nodeDb *NodeDb) NodeTypesMatchingPod(req *schedulerobjects.PodRequirements return selectedNodeTypes, numExcludedNodesByReason, nil } -func (nodeDb *NodeDb) UpsertMany(nodes []*schedulerobjects.Node) error { +func (nodeDb *NodeDb) UpsertMany(nodes []*Node) error { txn := nodeDb.db.Txn(true) defer txn.Abort() if err := nodeDb.UpsertManyWithTxn(txn, nodes); err != nil { @@ -706,7 +838,7 @@ func (nodeDb *NodeDb) UpsertMany(nodes []*schedulerobjects.Node) error { return nil } -func (nodeDb *NodeDb) UpsertManyWithTxn(txn *memdb.Txn, nodes []*schedulerobjects.Node) error { +func (nodeDb *NodeDb) UpsertManyWithTxn(txn *memdb.Txn, nodes []*Node) error { for _, node := range nodes { if err := nodeDb.UpsertWithTxn(txn, node); err != nil { return err @@ -715,7 +847,7 @@ func (nodeDb *NodeDb) UpsertManyWithTxn(txn *memdb.Txn, nodes []*schedulerobject return nil } -func (nodeDb *NodeDb) Upsert(node *schedulerobjects.Node) error { +func (nodeDb *NodeDb) Upsert(node *Node) error { txn := nodeDb.Txn(true) defer txn.Abort() if err := nodeDb.UpsertWithTxn(txn, node); err != nil { @@ -725,91 +857,16 @@ func (nodeDb *NodeDb) Upsert(node *schedulerobjects.Node) error { return nil } -func (nodeDb *NodeDb) UpsertWithTxn(txn *memdb.Txn, node *schedulerobjects.Node) error { - if len(node.AllocatableByPriorityAndResource) == 0 { - return errors.Errorf("can't upsert node with AllocatableByPriorityAndResource: %v", node.AllocatableByPriorityAndResource) - } - - // Mutating the node once inserted is forbidden. - // TODO: We shouldn't need a copy here. - node = node.DeepCopy() - - // Add an evictedPriority record to the node. - // TODO: We should make NodeDb responsible for creating new nodes and add this record at creation instead of upsert. - if len(node.EvictedJobRunIds) != 0 { - q := schedulerobjects.AllocatableByPriorityAndResourceType(node.AllocatableByPriorityAndResource).Get(evictedPriority, "cpu") - if q.Cmp(node.TotalResources.Get("cpu")) == 0 { - return errors.Errorf("inconsistent node accounting: node %s has evicted jobs but no evicted resources", node.Id) - } - } - - // Ensure we track allocated resources at evictedPriority. - if _, ok := node.AllocatableByPriorityAndResource[evictedPriority]; !ok { - pMin := int32(math.MaxInt32) - ok := false - for p := range node.AllocatableByPriorityAndResource { - if p < pMin { - pMin = p - ok = true - } - } - if ok { - node.AllocatableByPriorityAndResource[evictedPriority] = node.AllocatableByPriorityAndResource[pMin].DeepCopy() - } - } - - // Make sure nodes have a label containing the nodeId. - if node.Labels == nil { - node.Labels = map[string]string{schedulerconfig.NodeIdLabel: node.Id} - } else { - node.Labels[schedulerconfig.NodeIdLabel] = node.Id - } - - // Add a special taint to unschedulable nodes before inserting. - // Adding a corresponding toleration to evicted pods ensures they can be re-scheduled. - // To prevent scheduling new pods onto cordoned nodes, only evicted pods should have this toleration. - if node.Unschedulable { - node.Taints = append(node.Taints, UnschedulableTaint()) - } - - // Compute the node type of the node. - nodeType := schedulerobjects.NewNodeType( - node.GetTaints(), - node.GetLabels(), - nodeDb.indexedTaints, - nodeDb.indexedNodeLabels, - ) - node.NodeTypeId = nodeType.Id - node.NodeType = nodeType - - // Compute the keys necessary to efficiently iterate over nodes. - node.NodeDbKeys = make([][]byte, len(nodeDb.prioritiesToTryAssigningAt)) +func (nodeDb *NodeDb) UpsertWithTxn(txn *memdb.Txn, node *Node) error { + keys := make([][]byte, len(nodeDb.prioritiesToTryAssigningAt)) for i, p := range nodeDb.prioritiesToTryAssigningAt { - node.NodeDbKeys[i] = nodeDb.nodeDbKeyFromNode(node.NodeDbKeys[i], node, p) + keys[i] = nodeDb.nodeDbKey(keys[i], node.NodeTypeId, node.AllocatableByPriority[p]) } + node.Keys = keys - // Add the node to the db. - isNewNode := false - if existingNode, err := nodeDb.GetNodeWithTxn(txn, node.Id); err != nil { - return err - } else if existingNode == nil { - isNewNode = true - } if err := txn.Insert("nodes", node); err != nil { return errors.WithStack(err) } - - // If this is a new node, update overall statistics. - // Note that these are not rolled back on txn abort. - nodeDb.mu.Lock() - if isNewNode { - nodeDb.numNodes++ - nodeDb.numNodesByNodeType[nodeType.Id]++ - nodeDb.totalResources.Add(node.TotalResources) - } - nodeDb.nodeTypes[nodeType.Id] = nodeType - nodeDb.mu.Unlock() - return nil } @@ -821,10 +878,10 @@ func (nodeDb *NodeDb) ClearAllocated() error { if err != nil { return err } - newNodes := make([]*schedulerobjects.Node, 0) + newNodes := make([]*Node, 0) for node := it.NextNode(); node != nil; node = it.NextNode() { - node = node.DeepCopy() - node.AllocatableByPriorityAndResource = schedulerobjects.NewAllocatableByPriorityAndResourceType( + node = node.UnsafeCopy() + node.AllocatableByPriority = schedulerobjects.NewAllocatableByPriorityAndResourceType( nodeDb.prioritiesToTryAssigningAt, node.TotalResources, ) @@ -872,6 +929,8 @@ func nodeIndexName(keyIndex int) string { // using a cache to avoid allocating new strings when possible. func (nodeDb *NodeDb) stringFromPodRequirementsNotMetReason(reason schedulerobjects.PodRequirementsNotMetReason) string { h := reason.Sum64() + nodeDb.mu.Lock() + defer nodeDb.mu.Unlock() if s, ok := nodeDb.podRequirementsNotMetReasonStringCache[h]; ok { return s } else { @@ -881,15 +940,15 @@ func (nodeDb *NodeDb) stringFromPodRequirementsNotMetReason(reason schedulerobje } } -// nodeDbKeyFromNode returns the index key for a particular node and resource. +// nodeDbKey returns the index key for a particular node. // Allocatable resources are rounded down to the closest multiple of nodeDb.indexedResourceResolutionMillis. // This improves efficiency by reducing the number of distinct values in the index. -func (nodeDb *NodeDb) nodeDbKeyFromNode(out []byte, node *schedulerobjects.Node, priority int32) []byte { +func (nodeDb *NodeDb) nodeDbKey(out []byte, nodeTypeId uint64, allocatable schedulerobjects.ResourceList) []byte { return RoundedNodeIndexKeyFromResourceList( out, - node.NodeTypeId, + nodeTypeId, nodeDb.indexedResources, nodeDb.indexedResourceResolutionMillis, - node.AllocatableByPriorityAndResource[priority], + allocatable, ) } diff --git a/internal/scheduler/nodedb/nodedb_test.go b/internal/scheduler/nodedb/nodedb_test.go index 2bd63ef9712..54cd2773647 100644 --- a/internal/scheduler/nodedb/nodedb_test.go +++ b/internal/scheduler/nodedb/nodedb_test.go @@ -27,40 +27,38 @@ func TestNodeDbSchema(t *testing.T) { // Test the accounting of total resources across all nodes. func TestTotalResources(t *testing.T) { nodeDb, err := createNodeDb([]*schedulerobjects.Node{}) - if !assert.NoError(t, err) { - return - } + require.NoError(t, err) + expected := schedulerobjects.ResourceList{Resources: make(map[string]resource.Quantity)} - assert.True(t, expected.Equal(nodeDb.totalResources)) + assert.True(t, expected.Equal(nodeDb.TotalResources())) // Upserting nodes for the first time should increase the resource count. nodes := testfixtures.N32CpuNodes(2, testfixtures.TestPriorities) for _, node := range nodes { expected.Add(node.TotalResources) } - err = nodeDb.UpsertMany(nodes) - if !assert.NoError(t, err) { - return + txn := nodeDb.Txn(true) + for _, node := range nodes { + err := nodeDb.CreateAndInsertWithJobDbJobsWithTxn(txn, nil, node) + require.NoError(t, err) } - assert.True(t, expected.Equal(nodeDb.totalResources)) + txn.Commit() - // Upserting the same nodes again should not affect total resource count. - err = nodeDb.UpsertMany(nodes) - if !assert.NoError(t, err) { - return - } - assert.True(t, expected.Equal(nodeDb.totalResources)) + assert.True(t, expected.Equal(nodeDb.TotalResources())) // Upserting new nodes should increase the resource count. nodes = testfixtures.N8GpuNodes(3, testfixtures.TestPriorities) for _, node := range nodes { expected.Add(node.TotalResources) } - err = nodeDb.UpsertMany(nodes) - if !assert.NoError(t, err) { - return + txn = nodeDb.Txn(true) + for _, node := range nodes { + err := nodeDb.CreateAndInsertWithJobDbJobsWithTxn(txn, nil, node) + require.NoError(t, err) } - assert.True(t, expected.Equal(nodeDb.totalResources)) + txn.Commit() + + assert.True(t, expected.Equal(nodeDb.TotalResources())) } func TestSelectNodeForPod_NodeIdLabel_Success(t *testing.T) { @@ -76,15 +74,17 @@ func TestSelectNodeForPod_NodeIdLabel_Success(t *testing.T) { jctxs := schedulercontext.JobSchedulingContextsFromJobs(testfixtures.TestPriorityClasses, jobs) for _, jctx := range jctxs { txn := db.Txn(false) - err := db.SelectNodeForJobWithTxn(txn, jctx) + node, err := db.SelectNodeForJobWithTxn(txn, jctx) txn.Abort() if !assert.NoError(t, err) { continue } pctx := jctx.PodSchedulingContext + require.NotNil(t, node) + assert.Equal(t, nodeId, node.Id) + require.NotNil(t, pctx) - require.NotNil(t, pctx.Node) - assert.Equal(t, nodeId, pctx.Node.Id) + assert.Equal(t, nodeId, pctx.NodeId) assert.Equal(t, 0, len(pctx.NumExcludedNodesByReason)) assert.Empty(t, pctx.NumExcludedNodesByReason) } @@ -103,26 +103,33 @@ func TestSelectNodeForPod_NodeIdLabel_Failure(t *testing.T) { jctxs := schedulercontext.JobSchedulingContextsFromJobs(testfixtures.TestPriorityClasses, jobs) for _, jctx := range jctxs { txn := db.Txn(false) - err := db.SelectNodeForJobWithTxn(txn, jctx) + node, err := db.SelectNodeForJobWithTxn(txn, jctx) txn.Abort() if !assert.NoError(t, err) { continue } + assert.Nil(t, node) + pctx := jctx.PodSchedulingContext require.NotNil(t, pctx) - assert.Nil(t, pctx.Node) + assert.Equal(t, "", pctx.NodeId) assert.Equal(t, 1, len(pctx.NumExcludedNodesByReason)) } } func TestNodeBindingEvictionUnbinding(t *testing.T) { + node := testfixtures.Test8GpuNode(testfixtures.TestPriorities) + nodeDb, err := createNodeDb([]*schedulerobjects.Node{node}) + require.NoError(t, err) + entry, err := nodeDb.GetNode(node.Id) + require.NoError(t, err) + jobFilter := func(job interfaces.LegacySchedulerJob) bool { return true } - node := testfixtures.Test8GpuNode(append(testfixtures.TestPriorities, evictedPriority)) job := testfixtures.Test1GpuJob("A", testfixtures.PriorityClass0) request := schedulerobjects.ResourceListFromV1ResourceList(job.GetResourceRequirements().Requests) jobId := job.GetId() - boundNode, err := BindJobToNode(testfixtures.TestPriorityClasses, job, node) + boundNode, err := bindJobToNode(testfixtures.TestPriorityClasses, job, entry) require.NoError(t, err) unboundNode, err := UnbindJobFromNode(testfixtures.TestPriorityClasses, job, boundNode) @@ -138,23 +145,23 @@ func TestNodeBindingEvictionUnbinding(t *testing.T) { evictedUnboundNode, err := UnbindJobFromNode(testfixtures.TestPriorityClasses, job, evictedNode) require.NoError(t, err) - evictedBoundNode, err := BindJobToNode(testfixtures.TestPriorityClasses, job, evictedNode) + evictedBoundNode, err := bindJobToNode(testfixtures.TestPriorityClasses, job, evictedNode) require.NoError(t, err) - _, _, err = EvictJobsFromNode(testfixtures.TestPriorityClasses, jobFilter, []interfaces.LegacySchedulerJob{job}, node) + _, _, err = EvictJobsFromNode(testfixtures.TestPriorityClasses, jobFilter, []interfaces.LegacySchedulerJob{job}, entry) require.Error(t, err) - _, err = UnbindJobFromNode(testfixtures.TestPriorityClasses, job, node) + _, err = UnbindJobFromNode(testfixtures.TestPriorityClasses, job, entry) require.Error(t, err) - _, err = BindJobToNode(testfixtures.TestPriorityClasses, job, boundNode) + _, err = bindJobToNode(testfixtures.TestPriorityClasses, job, boundNode) require.Error(t, err) _, _, err = EvictJobsFromNode(testfixtures.TestPriorityClasses, jobFilter, []interfaces.LegacySchedulerJob{job}, evictedNode) require.Error(t, err) - assertNodeAccountingEqual(t, node, unboundNode) - assertNodeAccountingEqual(t, node, evictedUnboundNode) + assertNodeAccountingEqual(t, entry, unboundNode) + assertNodeAccountingEqual(t, entry, evictedUnboundNode) assertNodeAccountingEqual(t, unboundNode, evictedUnboundNode) assertNodeAccountingEqual(t, boundNode, evictedBoundNode) assertNodeAccountingEqual(t, unboundNode, unboundMultipleNode) @@ -192,29 +199,24 @@ func TestNodeBindingEvictionUnbinding(t *testing.T) { expectedAllocatable := boundNode.TotalResources.DeepCopy() expectedAllocatable.Sub(request) priority := testfixtures.TestPriorityClasses[job.GetPriorityClassName()].Priority - assert.True(t, expectedAllocatable.Equal(boundNode.AllocatableByPriorityAndResource[priority])) + assert.True(t, expectedAllocatable.Equal(boundNode.AllocatableByPriority[priority])) assert.Empty(t, unboundNode.AllocatedByJobId) assert.Empty(t, unboundNode.AllocatedByQueue) assert.Empty(t, unboundNode.EvictedJobRunIds) } -func assertNodeAccountingEqual(t *testing.T, node1, node2 *schedulerobjects.Node) bool { - rv := true - rv = rv && assert.True( +func assertNodeAccountingEqual(t *testing.T, node1, node2 *Node) { + allocatable1 := schedulerobjects.QuantityByTAndResourceType[int32](node1.AllocatableByPriority) + allocatable2 := schedulerobjects.QuantityByTAndResourceType[int32](node2.AllocatableByPriority) + assert.True( t, - schedulerobjects.QuantityByTAndResourceType[int32]( - node1.AllocatableByPriorityAndResource, - ).Equal( - schedulerobjects.QuantityByTAndResourceType[int32]( - node2.AllocatableByPriorityAndResource, - ), - ), + allocatable1.Equal(allocatable2), "expected %v, but got %v", - node1.AllocatableByPriorityAndResource, - node2.AllocatableByPriorityAndResource, + node1.AllocatableByPriority, + node2.AllocatableByPriority, ) - rv = rv && assert.True( + assert.True( t, armadamaps.DeepEqual( node1.AllocatedByJobId, @@ -224,7 +226,7 @@ func assertNodeAccountingEqual(t *testing.T, node1, node2 *schedulerobjects.Node node1.AllocatedByJobId, node2.AllocatedByJobId, ) - rv = rv && assert.True( + assert.True( t, armadamaps.DeepEqual( node1.AllocatedByQueue, @@ -234,7 +236,7 @@ func assertNodeAccountingEqual(t *testing.T, node1, node2 *schedulerobjects.Node node1.AllocatedByQueue, node2.AllocatedByQueue, ) - rv = rv && assert.True( + assert.True( t, maps.Equal( node1.EvictedJobRunIds, @@ -244,17 +246,6 @@ func assertNodeAccountingEqual(t *testing.T, node1, node2 *schedulerobjects.Node node1.EvictedJobRunIds, node2.EvictedJobRunIds, ) - rv = rv && assert.True( - t, - armadamaps.DeepEqual( - node1.NonArmadaAllocatedResources, - node2.NonArmadaAllocatedResources, - ), - "expected %v, but got %v", - node1.NonArmadaAllocatedResources, - node2.NonArmadaAllocatedResources, - ) - return rv } func TestEviction(t *testing.T) { @@ -285,18 +276,25 @@ func TestEviction(t *testing.T) { } for name, tc := range tests { t.Run(name, func(t *testing.T) { - node := testfixtures.Test32CpuNode(testfixtures.TestPriorities) - jobs := []interfaces.LegacySchedulerJob{ + nodeDb, err := createNodeDb([]*schedulerobjects.Node{}) + require.NoError(t, err) + txn := nodeDb.Txn(true) + jobs := []*jobdb.Job{ testfixtures.Test1Cpu4GiJob("queue-alice", testfixtures.PriorityClass0), testfixtures.Test1Cpu4GiJob("queue-alice", testfixtures.PriorityClass3), } - var err error - for _, job := range jobs { - node, err = BindJobToNode(testfixtures.TestPriorityClasses, job, node) - require.NoError(t, err) - } + node := testfixtures.Test32CpuNode(testfixtures.TestPriorities) + err = nodeDb.CreateAndInsertWithJobDbJobsWithTxn(txn, jobs, node) + txn.Commit() + require.NoError(t, err) + entry, err := nodeDb.GetNode(node.Id) + require.NoError(t, err) - actualEvictions, _, err := EvictJobsFromNode(testfixtures.TestPriorityClasses, tc.jobFilter, jobs, node) + existingJobs := make([]interfaces.LegacySchedulerJob, len(jobs)) + for i, job := range jobs { + existingJobs[i] = job + } + actualEvictions, _, err := EvictJobsFromNode(testfixtures.TestPriorityClasses, tc.jobFilter, existingJobs, entry) require.NoError(t, err) expectedEvictions := make([]interfaces.LegacySchedulerJob, 0, len(tc.expectedEvictions)) for _, i := range tc.expectedEvictions { @@ -447,7 +445,7 @@ func TestScheduleIndividually(t *testing.T) { if !tc.ExpectSuccess[i] { assert.False(t, ok) if pctx != nil { - assert.Nil(t, pctx.Node) + assert.Equal(t, "", pctx.NodeId) } continue } @@ -455,14 +453,17 @@ func TestScheduleIndividually(t *testing.T) { assert.True(t, ok) require.NotNil(t, pctx) - node := pctx.Node + nodeId := pctx.NodeId if !tc.ExpectSuccess[i] { - assert.Nil(t, node) + assert.Equal(t, "", nodeId) continue } - require.NotNil(t, node) + require.NotEqual(t, "", nodeId) job := jctx.Job + node, err := nodeDb.GetNode(nodeId) + require.NoError(t, err) + require.NotNil(t, node) expected := schedulerobjects.ResourceListFromV1ResourceList(job.GetResourceRequirements().Requests) actual, ok := node.AllocatedByJobId[job.GetId()] require.True(t, ok) @@ -526,7 +527,7 @@ func TestScheduleMany(t *testing.T) { pctx := jctx.PodSchedulingContext require.NotNil(t, pctx) if tc.ExpectSuccess[i] { - assert.NotNil(t, pctx.Node) + assert.NotEqual(t, "", pctx.NodeId) } } } @@ -535,22 +536,28 @@ func TestScheduleMany(t *testing.T) { } func benchmarkUpsert(nodes []*schedulerobjects.Node, b *testing.B) { - db, err := NewNodeDb( + nodeDb, err := NewNodeDb( testfixtures.TestPriorityClasses, testfixtures.TestMaxExtraNodesToConsider, testfixtures.TestResources, testfixtures.TestIndexedTaints, testfixtures.TestIndexedNodeLabels, ) - if !assert.NoError(b, err) { - return + require.NoError(b, err) + txn := nodeDb.Txn(true) + entries := make([]*Node, len(nodes)) + for i, node := range nodes { + err := nodeDb.CreateAndInsertWithJobDbJobsWithTxn(txn, nil, node) + require.NoError(b, err) + entry, err := nodeDb.GetNode(node.Id) + require.NoError(b, err) + entries[i] = entry } + txn.Commit() b.ResetTimer() for n := 0; n < b.N; n++ { - err := db.UpsertMany(nodes) - if !assert.NoError(b, err) { - return - } + err := nodeDb.UpsertMany(entries) + require.NoError(b, err) } } @@ -575,9 +582,12 @@ func benchmarkScheduleMany(b *testing.B, nodes []*schedulerobjects.Node, jobs [] testfixtures.TestIndexedNodeLabels, ) require.NoError(b, err) - - err = nodeDb.UpsertMany(nodes) - require.NoError(b, err) + txn := nodeDb.Txn(true) + for _, node := range nodes { + err := nodeDb.CreateAndInsertWithJobDbJobsWithTxn(txn, nil, node) + require.NoError(b, err) + } + txn.Commit() b.ResetTimer() for n := 0; n < b.N; n++ { @@ -687,7 +697,7 @@ func BenchmarkScheduleManyResourceConstrained(b *testing.B) { } func createNodeDb(nodes []*schedulerobjects.Node) (*NodeDb, error) { - db, err := NewNodeDb( + nodeDb, err := NewNodeDb( testfixtures.TestPriorityClasses, testfixtures.TestMaxExtraNodesToConsider, testfixtures.TestResources, @@ -697,10 +707,14 @@ func createNodeDb(nodes []*schedulerobjects.Node) (*NodeDb, error) { if err != nil { return nil, err } - if err := db.UpsertMany(nodes); err != nil { - return nil, err + txn := nodeDb.Txn(true) + for _, node := range nodes { + if err := nodeDb.CreateAndInsertWithJobDbJobsWithTxn(txn, nil, node); err != nil { + return nil, err + } } - return db, nil + txn.Commit() + return nodeDb, nil } func BenchmarkNodeDbStringFromPodRequirementsNotMetReason(b *testing.B) { diff --git a/internal/scheduler/nodedb/nodeiteration.go b/internal/scheduler/nodedb/nodeiteration.go index 3ae075edea4..fb2715c6676 100644 --- a/internal/scheduler/nodedb/nodeiteration.go +++ b/internal/scheduler/nodedb/nodeiteration.go @@ -9,12 +9,10 @@ import ( "github.com/pkg/errors" "golang.org/x/exp/slices" "k8s.io/apimachinery/pkg/api/resource" - - "github.com/armadaproject/armada/internal/scheduler/schedulerobjects" ) type NodeIterator interface { - NextNode() *schedulerobjects.Node + NextNode() *Node } // NodesIterator is an iterator over all nodes in the db. @@ -36,16 +34,12 @@ func (it *NodesIterator) WatchCh() <-chan struct{} { panic("not implemented") } -func (it *NodesIterator) NextNode() *schedulerobjects.Node { +func (it *NodesIterator) NextNode() *Node { obj := it.it.Next() if obj == nil { return nil } - node, ok := obj.(*schedulerobjects.Node) - if !ok { - panic(fmt.Sprintf("expected *Node, but got %T", obj)) - } - return node + return obj.(*Node) } func (it *NodesIterator) Next() interface{} { @@ -55,13 +49,13 @@ func (it *NodesIterator) Next() interface{} { type NodePairIterator struct { itA *NodesIterator itB *NodesIterator - nodeA *schedulerobjects.Node - nodeB *schedulerobjects.Node + nodeA *Node + nodeB *Node } type NodePairIteratorItem struct { - NodeA *schedulerobjects.Node - NodeB *schedulerobjects.Node + NodeA *Node + NodeB *Node } func NewNodePairIterator(txnA, txnB *memdb.Txn) (*NodePairIterator, error) { @@ -144,10 +138,10 @@ func (index *NodeIndex) FromArgs(args ...interface{}) ([]byte, error) { return args[0].([]byte), nil } -// FromObject extracts the index key from a *schedulerobjects.Node. +// FromObject extracts the index key from a *Node. func (index *NodeIndex) FromObject(raw interface{}) (bool, []byte, error) { - node := raw.(*schedulerobjects.Node) - return true, node.NodeDbKeys[index.KeyIndex], nil + node := raw.(*Node) + return true, node.Keys[index.KeyIndex], nil } // NodeTypesIterator is an iterator over all nodes of the given nodeTypes @@ -212,7 +206,7 @@ func (it *NodeTypesIterator) Next() interface{} { return v } -func (it *NodeTypesIterator) NextNode() (*schedulerobjects.Node, error) { +func (it *NodeTypesIterator) NextNode() (*Node, error) { if it.pq.Len() == 0 { return nil, nil } @@ -236,7 +230,7 @@ type nodeTypesIteratorPQ struct { } type nodeTypesIteratorPQItem struct { - node *schedulerobjects.Node + node *Node it *NodeTypeIterator // The index of the item in the heap. Maintained by the heap.Interface methods. index int @@ -248,9 +242,9 @@ func (pq *nodeTypesIteratorPQ) Less(i, j int) bool { return pq.less(pq.items[i].node, pq.items[j].node) } -func (it *nodeTypesIteratorPQ) less(a, b *schedulerobjects.Node) bool { - allocatableByPriorityA := a.AllocatableByPriorityAndResource[it.priority] - allocatableByPriorityB := b.AllocatableByPriorityAndResource[it.priority] +func (it *nodeTypesIteratorPQ) less(a, b *Node) bool { + allocatableByPriorityA := a.AllocatableByPriority[it.priority] + allocatableByPriorityB := b.AllocatableByPriority[it.priority] for _, t := range it.indexedResources { qa := allocatableByPriorityA.Get(t) qb := allocatableByPriorityB.Get(t) @@ -376,13 +370,13 @@ func (it *NodeTypeIterator) Next() interface{} { return v } -func (it *NodeTypeIterator) NextNode() (*schedulerobjects.Node, error) { +func (it *NodeTypeIterator) NextNode() (*Node, error) { for { v := it.memdbIterator.Next() if v == nil { return nil, nil } - node := v.(*schedulerobjects.Node) + node := v.(*Node) if node.Id == it.previousNodeId { panic(fmt.Sprintf("iterator received the same node twice consecutively: %s", node.Id)) } @@ -391,9 +385,9 @@ func (it *NodeTypeIterator) NextNode() (*schedulerobjects.Node, error) { // There are no more nodes of this nodeType. return nil, nil } - allocatableByPriority := node.AllocatableByPriorityAndResource[it.priority] + allocatableByPriority := node.AllocatableByPriority[it.priority] if len(allocatableByPriority.Resources) == 0 { - return nil, errors.Errorf("node %s has no resources registered at priority %d: %v", node.Id, it.priority, node.AllocatableByPriorityAndResource) + return nil, errors.Errorf("node %s has no resources registered at priority %d: %v", node.Id, it.priority, node.AllocatableByPriority) } for i, t := range it.indexedResources { nodeQuantity := allocatableByPriority.Get(t) diff --git a/internal/scheduler/nodedb/nodeiteration_test.go b/internal/scheduler/nodedb/nodeiteration_test.go index 7b5edaa270d..71d3ac0fac6 100644 --- a/internal/scheduler/nodedb/nodeiteration_test.go +++ b/internal/scheduler/nodedb/nodeiteration_test.go @@ -64,29 +64,36 @@ func TestNodesIterator(t *testing.T) { } func TestNodePairIterator(t *testing.T) { + nodeDb, err := createNodeDb(nil) + require.NoError(t, err) + nodes := testfixtures.TestCluster() for i, c := range []string{"A", "B", "C"} { nodes[i].Id = c } + entries := make([]*Node, len(nodes)) + for i, node := range nodes { + entry, err := nodeDb.create(node) + require.NoError(t, err) + entries[i] = entry + } - nodeDb, err := createNodeDb(nil) - require.NoError(t, err) - for _, node := range nodes { - node.NodeDbKeys = make([][]byte, len(nodeDb.prioritiesToTryAssigningAt)) + for _, node := range entries { + node.Keys = make([][]byte, len(nodeDb.prioritiesToTryAssigningAt)) for i, p := range nodeDb.prioritiesToTryAssigningAt { - node.NodeDbKeys[i] = nodeDb.nodeDbKeyFromNode(node.NodeDbKeys[i], node, p) + node.Keys[i] = nodeDb.nodeDbKey(node.Keys[i], node.NodeTypeId, node.AllocatableByPriority[p]) } } txn := nodeDb.Txn(true) - require.NoError(t, txn.Insert("nodes", nodes[0])) - require.NoError(t, txn.Insert("nodes", nodes[1])) + require.NoError(t, txn.Insert("nodes", entries[0])) + require.NoError(t, txn.Insert("nodes", entries[1])) txn.Commit() txnA := nodeDb.Txn(false) txn = nodeDb.Txn(true) - require.NoError(t, txn.Delete("nodes", nodes[0])) - require.NoError(t, txn.Insert("nodes", nodes[2])) + require.NoError(t, txn.Delete("nodes", entries[0])) + require.NoError(t, txn.Insert("nodes", entries[2])) txn.Commit() txnB := nodeDb.Txn(false) @@ -99,16 +106,16 @@ func TestNodePairIterator(t *testing.T) { } expected := []*NodePairIteratorItem{ { - NodeA: nodes[0], + NodeA: entries[0], NodeB: nil, }, { - NodeA: nodes[1], - NodeB: nodes[1], + NodeA: entries[1], + NodeB: entries[1], }, { NodeA: nil, - NodeB: nodes[2], + NodeB: entries[2], }, } assert.Equal(t, expected, actual) @@ -425,15 +432,23 @@ func TestNodeTypeIterator(t *testing.T) { indexByNodeId[node.Id] = i } + entries := make([]*Node, len(tc.nodes)) + for i, node := range tc.nodes { + entry, err := nodeDb.create(node) + require.NoError(t, err) + entry.NodeTypeId = node.NodeTypeId + entries[i] = entry + } + // Compute the keys necessary to efficiently iterate over nodes // and populate the database. We do this manually instead of using nodeDb.Upsert to control the nodeTypeId. - for _, node := range tc.nodes { - node.NodeDbKeys = make([][]byte, len(nodeDb.prioritiesToTryAssigningAt)) + for _, node := range entries { + node.Keys = make([][]byte, len(nodeDb.prioritiesToTryAssigningAt)) for i, p := range nodeDb.prioritiesToTryAssigningAt { - node.NodeDbKeys[i] = nodeDb.nodeDbKeyFromNode(node.NodeDbKeys[i], node, p) + node.Keys[i] = nodeDb.nodeDbKey(node.Keys[i], node.NodeTypeId, node.AllocatableByPriority[p]) } } - require.NoError(t, populateDatabase(nodeDb.db, tc.nodes)) + require.NoError(t, populateDatabase(nodeDb.db, entries)) // Create iterator. indexedResourceRequests := make([]resource.Quantity, len(testfixtures.TestResources)) @@ -811,15 +826,23 @@ func TestNodeTypesIterator(t *testing.T) { indexByNodeId[node.Id] = i } + entries := make([]*Node, len(tc.nodes)) + for i, node := range tc.nodes { + entry, err := nodeDb.create(node) + require.NoError(t, err) + entry.NodeTypeId = node.NodeTypeId + entries[i] = entry + } + // Compute the keys necessary to efficiently iterate over nodes // and populate the database. We do this manually instead of using nodeDb.Upsert to control the nodeTypeId. - for _, node := range tc.nodes { - node.NodeDbKeys = make([][]byte, len(nodeDb.prioritiesToTryAssigningAt)) + for _, node := range entries { + node.Keys = make([][]byte, len(nodeDb.prioritiesToTryAssigningAt)) for i, p := range nodeDb.prioritiesToTryAssigningAt { - node.NodeDbKeys[i] = nodeDb.nodeDbKeyFromNode(node.NodeDbKeys[i], node, p) + node.Keys[i] = nodeDb.nodeDbKey(node.Keys[i], node.NodeTypeId, node.AllocatableByPriority[p]) } } - require.NoError(t, populateDatabase(nodeDb.db, tc.nodes)) + require.NoError(t, populateDatabase(nodeDb.db, entries)) indexedResourceRequests := make([]resource.Quantity, len(testfixtures.TestResources)) for i, t := range testfixtures.TestResourceNames { @@ -858,11 +881,11 @@ func TestNodeTypesIterator(t *testing.T) { } } -func populateDatabase(db *memdb.MemDB, items []*schedulerobjects.Node) error { +func populateDatabase(db *memdb.MemDB, nodes []*Node) error { txn := db.Txn(true) defer txn.Abort() - for _, item := range items { - err := txn.Insert("nodes", item) + for _, node := range nodes { + err := txn.Insert("nodes", node) if err != nil { return errors.WithStack(err) } diff --git a/internal/scheduler/pool_assigner.go b/internal/scheduler/pool_assigner.go index 0387fa3d58c..29d16e4d957 100644 --- a/internal/scheduler/pool_assigner.go +++ b/internal/scheduler/pool_assigner.go @@ -137,13 +137,12 @@ func (p *DefaultPoolAssigner) AssignPool(j *jobdb.Job) (string, error) { Job: j, PodRequirements: j.GetPodRequirements(p.priorityClasses), } - err := nodeDb.SelectNodeForJobWithTxn(txn, jctx) + node, err := nodeDb.SelectNodeForJobWithTxn(txn, jctx) txn.Abort() if err != nil { return "", errors.WithMessagef(err, "error selecting node for job %s", j.Id()) } - pctx := jctx.PodSchedulingContext - if pctx != nil && pctx.Node != nil { + if node != nil { p.poolCache.Add(schedulingKey, pool) return pool, nil } @@ -164,10 +163,14 @@ func (p *DefaultPoolAssigner) constructNodeDb(nodes []*schedulerobjects.Node) (* if err != nil { return nil, err } - err = nodeDb.UpsertMany(nodes) - if err != nil { - return nil, err + txn := nodeDb.Txn(true) + defer txn.Abort() + for _, node := range nodes { + if err := nodeDb.CreateAndInsertWithJobDbJobsWithTxn(txn, nil, node); err != nil { + return nil, err + } } + txn.Commit() err = nodeDb.ClearAllocated() if err != nil { return nil, err diff --git a/internal/scheduler/preempting_queue_scheduler.go b/internal/scheduler/preempting_queue_scheduler.go index c534fc10266..7b58d355d6a 100644 --- a/internal/scheduler/preempting_queue_scheduler.go +++ b/internal/scheduler/preempting_queue_scheduler.go @@ -21,7 +21,6 @@ import ( schedulercontext "github.com/armadaproject/armada/internal/scheduler/context" "github.com/armadaproject/armada/internal/scheduler/interfaces" "github.com/armadaproject/armada/internal/scheduler/nodedb" - "github.com/armadaproject/armada/internal/scheduler/schedulerobjects" ) // PreemptingQueueScheduler is a scheduler that makes a unified decisions on which jobs to preempt and schedule. @@ -276,10 +275,6 @@ func (sch *PreemptingQueueScheduler) Schedule(ctx context.Context) (*SchedulerRe } if sch.enableAssertions { err := sch.assertions( - ctxlogrus.ToContext( - ctx, - log.WithField("stage", "validate consistency"), - ), snapshot, preemptedJobsById, scheduledJobsById, @@ -450,7 +445,7 @@ func (sch *PreemptingQueueScheduler) setEvictedGangCardinality(evictedJobsById m return nil } -func (sch *PreemptingQueueScheduler) evictionAssertions(evictedJobsById map[string]interfaces.LegacySchedulerJob, affectedNodesById map[string]*schedulerobjects.Node) error { +func (sch *PreemptingQueueScheduler) evictionAssertions(evictedJobsById map[string]interfaces.LegacySchedulerJob, affectedNodesById map[string]*nodedb.Node) error { for _, qctx := range sch.schedulingContext.QueueSchedulingContexts { if !qctx.AllocatedByPriorityClass.IsStrictlyNonNegative() { return errors.Errorf("negative allocation for queue %s after eviction: %s", qctx.Queue, qctx.AllocatedByPriorityClass) @@ -590,7 +585,6 @@ func (sch *PreemptingQueueScheduler) updateGangAccounting(preemptedJobs, schedul // Compare the nodedb.NodeJobDiff with expected preempted/scheduled jobs to ensure NodeDb is consistent. // This is only to validate that nothing unexpected happened during scheduling. func (sch *PreemptingQueueScheduler) assertions( - ctx context.Context, snapshot *memdb.Txn, preemptedJobsById, scheduledJobsById map[string]interfaces.LegacySchedulerJob, @@ -659,16 +653,16 @@ func (sch *PreemptingQueueScheduler) assertions( type Evictor struct { jobRepo JobRepository priorityClasses map[string]configuration.PriorityClass - nodeFilter func(context.Context, *schedulerobjects.Node) bool + nodeFilter func(context.Context, *nodedb.Node) bool jobFilter func(context.Context, interfaces.LegacySchedulerJob) bool - postEvictFunc func(context.Context, interfaces.LegacySchedulerJob, *schedulerobjects.Node) + postEvictFunc func(context.Context, interfaces.LegacySchedulerJob, *nodedb.Node) } type EvictorResult struct { // Map from job id to job, containing all evicted jobs. EvictedJobsById map[string]interfaces.LegacySchedulerJob // Map from node id to node, containing all nodes on which at least one job was evicted. - AffectedNodesById map[string]*schedulerobjects.Node + AffectedNodesById map[string]*nodedb.Node // For each evicted job, maps the id of the job to the id of the node it was evicted from. NodeIdByJobId map[string]string } @@ -689,7 +683,7 @@ func NewNodeEvictor( return &Evictor{ jobRepo: jobRepo, priorityClasses: priorityClasses, - nodeFilter: func(_ context.Context, node *schedulerobjects.Node) bool { + nodeFilter: func(_ context.Context, node *nodedb.Node) bool { return len(node.AllocatedByJobId) > 0 && random.Float64() < perNodeEvictionProbability }, jobFilter: jobFilter, @@ -711,7 +705,7 @@ func NewFilteredEvictor( return &Evictor{ jobRepo: jobRepo, priorityClasses: priorityClasses, - nodeFilter: func(_ context.Context, node *schedulerobjects.Node) bool { + nodeFilter: func(_ context.Context, node *nodedb.Node) bool { shouldEvict := nodeIdsToEvict[node.Id] return shouldEvict }, @@ -746,9 +740,9 @@ func NewOversubscribedEvictor( return &Evictor{ jobRepo: jobRepo, priorityClasses: priorityClasses, - nodeFilter: func(_ context.Context, node *schedulerobjects.Node) bool { + nodeFilter: func(_ context.Context, node *nodedb.Node) bool { overSubscribedPriorities = make(map[int32]bool) - for p, rl := range node.AllocatableByPriorityAndResource { + for p, rl := range node.AllocatableByPriority { if p < 0 { // Negative priorities correspond to already evicted jobs. continue @@ -792,7 +786,7 @@ func (evi *Evictor) Evict(ctx context.Context, it nodedb.NodeIterator) (*Evictor jobFilter = func(job interfaces.LegacySchedulerJob) bool { return evi.jobFilter(ctx, job) } } evictedJobsById := make(map[string]interfaces.LegacySchedulerJob) - affectedNodesById := make(map[string]*schedulerobjects.Node) + affectedNodesById := make(map[string]*nodedb.Node) nodeIdByJobId := make(map[string]string) for node := it.NextNode(); node != nil; node = it.NextNode() { if evi.nodeFilter != nil && !evi.nodeFilter(ctx, node) { @@ -833,7 +827,7 @@ func (evi *Evictor) Evict(ctx context.Context, it nodedb.NodeIterator) (*Evictor // TODO: This is only necessary for jobs not scheduled in this cycle. // Since jobs scheduled in this cycle can be re-scheduled onto another node without triggering a preemption. -func defaultPostEvictFunc(ctx context.Context, job interfaces.LegacySchedulerJob, node *schedulerobjects.Node) { +func defaultPostEvictFunc(ctx context.Context, job interfaces.LegacySchedulerJob, node *nodedb.Node) { // Add annotation indicating to the scheduler this this job was evicted. annotations := job.GetAnnotations() if annotations == nil { diff --git a/internal/scheduler/preempting_queue_scheduler_test.go b/internal/scheduler/preempting_queue_scheduler_test.go index dd278d97e2d..59a82e899de 100644 --- a/internal/scheduler/preempting_queue_scheduler_test.go +++ b/internal/scheduler/preempting_queue_scheduler_test.go @@ -27,18 +27,18 @@ import ( ) func TestEvictOversubscribed(t *testing.T) { - nodes := testfixtures.N32CpuNodes(1, testfixtures.TestPriorities) - node := nodes[0] - var err error jobs := append( testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 20), testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass1, 20)..., ) - for _, job := range jobs { - node, err = nodedb.BindJobToNode(testfixtures.TestPriorityClasses, job, node) - require.NoError(t, err) - } - nodes[0] = node + node := testfixtures.Test32CpuNode(testfixtures.TestPriorities) + nodeDb, err := NewNodeDb() + require.NoError(t, err) + txn := nodeDb.Txn(true) + err = nodeDb.CreateAndInsertWithJobDbJobsWithTxn(txn, jobs, node) + require.NoError(t, err) + entry, err := nodeDb.GetNode(node.Id) + require.NoError(t, err) jobRepo := NewInMemoryJobRepository(testfixtures.TestPriorityClasses) for _, job := range jobs { @@ -51,7 +51,7 @@ func TestEvictOversubscribed(t *testing.T) { 1, nil, ) - it := NewInMemoryNodeIterator(nodes) + it := NewInMemoryNodeIterator([]*nodedb.Node{entry}) result, err := evictor.Evict(context.Background(), it) require.NoError(t, err) @@ -60,7 +60,7 @@ func TestEvictOversubscribed(t *testing.T) { slices.Sort(priorities) for nodeId, node := range result.AffectedNodesById { for _, p := range priorities { - for resourceType, q := range node.AllocatableByPriorityAndResource[p].Resources { + for resourceType, q := range node.AllocatableByPriority[p].Resources { assert.NotEqual(t, -1, q.Cmp(resource.Quantity{}), "resource %s oversubscribed by %s on node %s", resourceType, q.String(), nodeId) } } @@ -69,16 +69,16 @@ func TestEvictOversubscribed(t *testing.T) { type InMemoryNodeIterator struct { i int - nodes []*schedulerobjects.Node + nodes []*nodedb.Node } -func NewInMemoryNodeIterator(nodes []*schedulerobjects.Node) *InMemoryNodeIterator { +func NewInMemoryNodeIterator(nodes []*nodedb.Node) *InMemoryNodeIterator { return &InMemoryNodeIterator{ nodes: slices.Clone(nodes), } } -func (it *InMemoryNodeIterator) NextNode() *schedulerobjects.Node { +func (it *InMemoryNodeIterator) NextNode() *nodedb.Node { if it.i >= len(it.nodes) { return nil } @@ -1277,8 +1277,14 @@ func TestPreemptingQueueScheduler(t *testing.T) { } for name, tc := range tests { t.Run(name, func(t *testing.T) { - nodeDb, err := NewNodeDb(tc.Nodes) + nodeDb, err := NewNodeDb() require.NoError(t, err) + txn := nodeDb.Txn(true) + for _, node := range tc.Nodes { + err := nodeDb.CreateAndInsertWithJobDbJobsWithTxn(txn, nil, node) + require.NoError(t, err) + } + txn.Commit() // Repo. for storing jobs to be queued. // The Redis job repo. doesn't order by pc, so we disable pc ordering here too. @@ -1338,8 +1344,8 @@ func TestPreemptingQueueScheduler(t *testing.T) { for _, j := range round.NodeIndicesToCordon { node, err := nodeDb.GetNode(tc.Nodes[j].Id) require.NoError(t, err) - node = node.DeepCopy() - node.Unschedulable = true + node = node.UnsafeCopy() + node.Taints = append(slices.Clone(node.Taints), nodedb.UnschedulableTaint()) err = nodeDb.Upsert(node) require.NoError(t, err) } @@ -1491,7 +1497,7 @@ func TestPreemptingQueueScheduler(t *testing.T) { require.NoError(t, err) for node := it.NextNode(); node != nil; node = it.NextNode() { for _, p := range priorities { - for resourceType, q := range node.AllocatableByPriorityAndResource[p].Resources { + for resourceType, q := range node.AllocatableByPriority[p].Resources { assert.NotEqual(t, -1, q.Cmp(resource.Quantity{}), "resource %s oversubscribed by %s on node %s", resourceType, q.String(), node.Id) } } @@ -1603,8 +1609,15 @@ func BenchmarkPreemptingQueueScheduler(b *testing.B) { priorityFactorByQueue[queue] = float64(rand.Intn(tc.MaxPriorityFactor-tc.MinPriorityFactor+1) + tc.MinPriorityFactor) } - nodeDb, err := NewNodeDb(tc.Nodes) + nodeDb, err := NewNodeDb() require.NoError(b, err) + txn := nodeDb.Txn(true) + for _, node := range tc.Nodes { + err := nodeDb.CreateAndInsertWithJobDbJobsWithTxn(txn, nil, node) + require.NoError(b, err) + } + txn.Commit() + jobRepo := NewInMemoryJobRepository(testfixtures.TestPriorityClasses) jobs := make([]interfaces.LegacySchedulerJob, 0) @@ -1658,19 +1671,19 @@ func BenchmarkPreemptingQueueScheduler(b *testing.B) { jobRepo.jobsByQueue[queue] = armadaslices.Filter(jobs, func(job interfaces.LegacySchedulerJob) bool { return scheduledJobs[job.GetId()] }) } - nodesById := make(map[string]*schedulerobjects.Node) - for _, node := range tc.Nodes { - nodesById[node.Id] = node - } - for _, job := range result.ScheduledJobs { + jobsByNodeId := make(map[string][]*jobdb.Job) + for _, job := range ScheduledJobsFromSchedulerResult[*jobdb.Job](result) { nodeId := result.NodeIdByJobId[job.GetId()] - node := nodesById[nodeId] - node, err = nodedb.BindJobToNode(tc.SchedulingConfig.Preemption.PriorityClasses, job, node) - require.NoError(b, err) - nodesById[nodeId] = node + jobsByNodeId[nodeId] = append(jobsByNodeId[nodeId], job) } - nodeDb, err = NewNodeDb(maps.Values(nodesById)) + nodeDb, err = NewNodeDb() require.NoError(b, err) + txn = nodeDb.Txn(true) + for _, node := range tc.Nodes { + err := nodeDb.CreateAndInsertWithJobDbJobsWithTxn(txn, jobsByNodeId[node.Id], node) + require.NoError(b, err) + } + txn.Commit() allocatedByQueueAndPriorityClass := sctx.AllocatedByQueueAndPriority() diff --git a/internal/scheduler/queue_scheduler.go b/internal/scheduler/queue_scheduler.go index 067fc294d75..956232b38f4 100644 --- a/internal/scheduler/queue_scheduler.go +++ b/internal/scheduler/queue_scheduler.go @@ -111,8 +111,9 @@ func (sch *QueueScheduler) Schedule(ctx context.Context) (*SchedulerResult, erro } else if ok { for _, jctx := range gctx.JobSchedulingContexts { scheduledJobs = append(scheduledJobs, jctx.Job) - if jctx.PodSchedulingContext != nil && jctx.PodSchedulingContext.Node != nil { - nodeIdByJobId[jctx.JobId] = jctx.PodSchedulingContext.Node.Id + pctx := jctx.PodSchedulingContext + if pctx != nil && pctx.NodeId != "" { + nodeIdByJobId[jctx.JobId] = pctx.NodeId } } } else if schedulerconstraints.IsTerminalUnschedulableReason(unschedulableReason) { diff --git a/internal/scheduler/queue_scheduler_test.go b/internal/scheduler/queue_scheduler_test.go index 8939e8a0879..84d42dd3e36 100644 --- a/internal/scheduler/queue_scheduler_test.go +++ b/internal/scheduler/queue_scheduler_test.go @@ -424,8 +424,14 @@ func TestQueueScheduler(t *testing.T) { } for name, tc := range tests { t.Run(name, func(t *testing.T) { - nodeDb, err := NewNodeDb(tc.Nodes) + nodeDb, err := NewNodeDb() require.NoError(t, err) + txn := nodeDb.Txn(true) + for _, node := range tc.Nodes { + err := nodeDb.CreateAndInsertWithJobDbJobsWithTxn(txn, nil, node) + require.NoError(t, err) + } + txn.Commit() if tc.TotalResources.Resources == nil { // Default to NodeDb total. tc.TotalResources = nodeDb.TotalResources() @@ -551,12 +557,11 @@ func TestQueueScheduler(t *testing.T) { for _, qctx := range sctx.QueueSchedulingContexts { for _, jctx := range qctx.SuccessfulJobSchedulingContexts { assert.NotNil(t, jctx.PodSchedulingContext) - assert.NotNil(t, jctx.PodSchedulingContext.Node) - assert.Equal(t, result.NodeIdByJobId[jctx.JobId], jctx.PodSchedulingContext.Node.Id) + assert.Equal(t, result.NodeIdByJobId[jctx.JobId], jctx.PodSchedulingContext.NodeId) } for _, jctx := range qctx.UnsuccessfulJobSchedulingContexts { if jctx.PodSchedulingContext != nil { - assert.Nil(t, jctx.PodSchedulingContext.Node) + assert.Equal(t, "", jctx.PodSchedulingContext.NodeId) } } } @@ -596,8 +601,8 @@ func TestQueueScheduler(t *testing.T) { } } -func NewNodeDb(nodes []*schedulerobjects.Node) (*nodedb.NodeDb, error) { - db, err := nodedb.NewNodeDb( +func NewNodeDb() (*nodedb.NodeDb, error) { + nodeDb, err := nodedb.NewNodeDb( testfixtures.TestPriorityClasses, testfixtures.TestMaxExtraNodesToConsider, testfixtures.TestResources, @@ -607,8 +612,5 @@ func NewNodeDb(nodes []*schedulerobjects.Node) (*nodedb.NodeDb, error) { if err != nil { return nil, err } - if err := db.UpsertMany(nodes); err != nil { - return nil, err - } - return db, nil + return nodeDb, nil } diff --git a/internal/scheduler/schedulerobjects/nodematching.go b/internal/scheduler/schedulerobjects/nodematching.go index a9e9668a5d4..6db3402526a 100644 --- a/internal/scheduler/schedulerobjects/nodematching.go +++ b/internal/scheduler/schedulerobjects/nodematching.go @@ -124,34 +124,34 @@ func (nodeType *NodeType) PodRequirementsMet(req *PodRequirements) (bool, PodReq // - 1: Pod can be scheduled without preempting any running pods. // If the requirements are not met, it returns the reason why. // If the requirements can't be parsed, an error is returned. -func (node *Node) PodRequirementsMet(priority int32, req *PodRequirements) (bool, int, PodRequirementsNotMetReason, error) { - matches, reason, err := node.StaticPodRequirementsMet(req) +func PodRequirementsMet(taints []v1.Taint, labels map[string]string, totalResources ResourceList, allocatableResources ResourceList, req *PodRequirements) (bool, int, PodRequirementsNotMetReason, error) { + matches, reason, err := StaticPodRequirementsMet(taints, labels, totalResources, req) if !matches || err != nil { return matches, 0, reason, err } - return node.DynamicPodRequirementsMet(priority, req) + return DynamicPodRequirementsMet(allocatableResources, req) } // StaticPodRequirementsMet checks if a pod can be scheduled onto this node, // accounting for taints, node selectors, node affinity, and total resources available on the node. -func (node *Node) StaticPodRequirementsMet(req *PodRequirements) (bool, PodRequirementsNotMetReason, error) { - matches, reason, err := podTolerationRequirementsMet(node.GetTaints(), req) +func StaticPodRequirementsMet(taints []v1.Taint, labels map[string]string, totalResources ResourceList, req *PodRequirements) (bool, PodRequirementsNotMetReason, error) { + matches, reason, err := podTolerationRequirementsMet(taints, req) if !matches || err != nil { return matches, reason, err } - matches, reason, err = podNodeSelectorRequirementsMet(node.GetLabels(), nil, req) + matches, reason, err = podNodeSelectorRequirementsMet(labels, nil, req) if !matches || err != nil { return matches, reason, err } - matches, reason, err = podNodeAffinityRequirementsMet(node.GetLabels(), req) + matches, reason, err = podNodeAffinityRequirementsMet(labels, req) if !matches || err != nil { return matches, reason, err } for resource, required := range req.ResourceRequirements.Requests { - available := node.TotalResources.Get(string(resource)) + available := totalResources.Get(string(resource)) if required.Cmp(available) == 1 { return false, &InsufficientResources{ Resource: string(resource), @@ -166,8 +166,8 @@ func (node *Node) StaticPodRequirementsMet(req *PodRequirements) (bool, PodRequi // DynamicPodRequirementsMet checks if a pod can be scheduled onto this node, // accounting for resources allocated to pods already assigned to this node. -func (node *Node) DynamicPodRequirementsMet(priority int32, req *PodRequirements) (bool, int, PodRequirementsNotMetReason, error) { - matches, reason, err := podResourceRequirementsMet(priority, node.AllocatableByPriorityAndResource, req) +func DynamicPodRequirementsMet(allocatableResources ResourceList, req *PodRequirements) (bool, int, PodRequirementsNotMetReason, error) { + matches, reason, err := podResourceRequirementsMet(allocatableResources, req) return matches, SchedulableScore, reason, err } @@ -235,9 +235,9 @@ func podNodeAffinityRequirementsMet(nodeLabels map[string]string, req *PodRequir return true, nil, nil } -func podResourceRequirementsMet(priority int32, allocatableResources AllocatableByPriorityAndResourceType, req *PodRequirements) (bool, PodRequirementsNotMetReason, error) { +func podResourceRequirementsMet(allocatableResources ResourceList, req *PodRequirements) (bool, PodRequirementsNotMetReason, error) { for resource, required := range req.ResourceRequirements.Requests { - available := allocatableResources.Get(priority, string(resource)) + available := allocatableResources.Get(string(resource)) if required.Cmp(available) == 1 { return false, &InsufficientResources{ Resource: string(resource), diff --git a/internal/scheduler/schedulerobjects/nodematching_test.go b/internal/scheduler/schedulerobjects/nodematching_test.go index a64db33a018..6e2d836fbec 100644 --- a/internal/scheduler/schedulerobjects/nodematching_test.go +++ b/internal/scheduler/schedulerobjects/nodematching_test.go @@ -375,7 +375,7 @@ func TestNodeSchedulingRequirementsMet(t *testing.T) { } for name, tc := range tests { t.Run(name, func(t *testing.T) { - matches, _, reason, err := tc.node.PodRequirementsMet(tc.req.Priority, tc.req) + matches, _, reason, err := PodRequirementsMet(tc.node.Taints, tc.node.Labels, tc.node.TotalResources, tc.node.AllocatableByPriorityAndResource[tc.req.Priority], tc.req) assert.NoError(t, err) if tc.expectSuccess { // TODO: Test score set correctly. assert.True(t, matches) diff --git a/internal/scheduler/schedulerobjects/nodetype.go b/internal/scheduler/schedulerobjects/nodetype.go index bc14d91cb51..d52d35e36fb 100644 --- a/internal/scheduler/schedulerobjects/nodetype.go +++ b/internal/scheduler/schedulerobjects/nodetype.go @@ -12,10 +12,6 @@ type ( labelsFilterFunc func(key, value string) bool ) -func NewNodeTypeFromNode(node *v1.Node, indexedTaints map[string]interface{}, indexedLabels map[string]interface{}) *NodeType { - return NewNodeType(node.Spec.Taints, node.GetLabels(), indexedTaints, indexedLabels) -} - func NewNodeType(taints []v1.Taint, labels map[string]string, indexedTaints map[string]interface{}, indexedLabels map[string]interface{}) *NodeType { if taints == nil { taints = make([]v1.Taint, 0) diff --git a/internal/scheduler/scheduling_algo.go b/internal/scheduler/scheduling_algo.go index ce0742a8884..0ad7039b0ba 100644 --- a/internal/scheduler/scheduling_algo.go +++ b/internal/scheduler/scheduling_algo.go @@ -433,6 +433,18 @@ func (repo *schedulerJobRepositoryAdapter) GetExistingJobsByIds(ids []string) ([ // constructNodeDb constructs a node db with all jobs bound to it. func (l *FairSchedulingAlgo) constructNodeDb(priorityClasses map[string]configuration.PriorityClass, jobs []*jobdb.Job, nodes []*schedulerobjects.Node) (*nodedb.NodeDb, error) { + nodeDb, err := nodedb.NewNodeDb( + priorityClasses, + l.config.MaxExtraNodesToConsider, + l.indexedResources, + l.config.IndexedTaints, + l.config.IndexedNodeLabels, + ) + if err != nil { + return nil, err + } + txn := nodeDb.Txn(true) + defer txn.Abort() nodesByName := make(map[string]*schedulerobjects.Node, len(nodes)) for _, node := range nodes { nodesByName[node.Name] = node @@ -452,26 +464,12 @@ func (l *FairSchedulingAlgo) constructNodeDb(priorityClasses map[string]configur } jobsByNodeName[nodeName] = append(jobsByNodeName[nodeName], job) } - for nodeName, jobsOnNode := range jobsByNodeName { - node, err := nodedb.BindJobsToNode(priorityClasses, jobsOnNode, nodesByName[nodeName]) - if err != nil { + for _, node := range nodes { + if err := nodeDb.CreateAndInsertWithJobDbJobsWithTxn(txn, jobsByNodeName[node.Name], node); err != nil { return nil, err } - nodesByName[nodeName] = node - } - nodeDb, err := nodedb.NewNodeDb( - priorityClasses, - l.config.MaxExtraNodesToConsider, - l.indexedResources, - l.config.IndexedTaints, - l.config.IndexedNodeLabels, - ) - if err != nil { - return nil, err - } - if err := nodeDb.UpsertMany(maps.Values(nodesByName)); err != nil { - return nil, err } + txn.Commit() return nodeDb, nil } diff --git a/internal/scheduler/scheduling_algo_test.go b/internal/scheduler/scheduling_algo_test.go index 6ced0a5f041..b27a4756f3f 100644 --- a/internal/scheduler/scheduling_algo_test.go +++ b/internal/scheduler/scheduling_algo_test.go @@ -18,7 +18,6 @@ import ( "github.com/armadaproject/armada/internal/scheduler/database" "github.com/armadaproject/armada/internal/scheduler/jobdb" schedulermocks "github.com/armadaproject/armada/internal/scheduler/mocks" - "github.com/armadaproject/armada/internal/scheduler/nodedb" "github.com/armadaproject/armada/internal/scheduler/schedulerobjects" "github.com/armadaproject/armada/internal/scheduler/testfixtures" ) @@ -369,18 +368,12 @@ func TestLegacySchedulingAlgo_TestSchedule(t *testing.T) { for executorId, jobsByNodeName := range tc.existingRunningIndices { for nodeName, jobIndices := range jobsByNodeName { node := nodes[executorId][nodeName] - for _, i := range jobIndices { job := tc.existingJobs[i].WithQueued(false).WithNewRun(executorId, nodeName) jobsToUpsert = append(jobsToUpsert, job) run := job.LatestRun() node.StateByJobRunId[run.Id().String()] = schedulerobjects.JobRunState_RUNNING - - node, err = nodedb.BindJobToNode(tc.schedulingConfig.Preemption.PriorityClasses, job, node) - require.NoError(t, err) } - - nodes[executorId][nodeName] = node } } diff --git a/internal/scheduler/submitcheck.go b/internal/scheduler/submitcheck.go index c80d6a17c85..716d0d16720 100644 --- a/internal/scheduler/submitcheck.go +++ b/internal/scheduler/submitcheck.go @@ -228,7 +228,7 @@ func (srv *SubmitChecker) getSchedulingResult(jctxs []*schedulercontext.JobSched numSuccessfullyScheduled := 0 for _, jctx := range jctxs { pctx := jctx.PodSchedulingContext - if pctx != nil && pctx.Node != nil { + if pctx != nil && pctx.NodeId != "" { numSuccessfullyScheduled++ } } @@ -278,7 +278,14 @@ func (srv *SubmitChecker) constructNodeDb(nodes []*schedulerobjects.Node) (*node if err != nil { return nil, err } - err = nodeDb.UpsertMany(nodes) + txn := nodeDb.Txn(true) + defer txn.Abort() + for _, node := range nodes { + if err := nodeDb.CreateAndInsertWithJobDbJobsWithTxn(txn, nil, node); err != nil { + return nil, err + } + } + txn.Commit() if err != nil { return nil, err }