Skip to content

Commit

Permalink
Add nodedb.Node type (#2625)
Browse files Browse the repository at this point in the history
  • Loading branch information
zuqq committed Jul 1, 2023
1 parent 60944e7 commit 24fb052
Show file tree
Hide file tree
Showing 18 changed files with 551 additions and 448 deletions.
37 changes: 20 additions & 17 deletions internal/armada/server/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,12 +308,26 @@ func (q *AggregatedQueueServer) getJobs(ctx context.Context, req *api.StreamingL

// Nodes to be considered by the scheduler.
lastSeen := q.clock.Now()
nodes := make([]*schedulerobjects.Node, 0, len(req.Nodes))

nodeDb, err := nodedb.NewNodeDb(
q.schedulingConfig.Preemption.PriorityClasses,
q.schedulingConfig.MaxExtraNodesToConsider,
q.schedulingConfig.IndexedResources,
q.schedulingConfig.IndexedTaints,
q.schedulingConfig.IndexedNodeLabels,
)
if err != nil {
return nil, err
}
txn := nodeDb.Txn(true)
defer txn.Abort()

allocatedByQueueAndPriorityClassForCluster := make(map[string]schedulerobjects.QuantityByTAndResourceType[string], len(queues))
jobIdsByGangId := make(map[string]map[string]bool)
gangIdByJobId := make(map[string]string)
nodeIdByJobId := make(map[string]string)
for _, nodeInfo := range req.Nodes {
nodes := make([]*schedulerobjects.Node, len(req.Nodes))
for i, nodeInfo := range req.Nodes {
node, err := api.NewNodeFromNodeInfo(
&nodeInfo,
req.ClusterId,
Expand All @@ -326,6 +340,7 @@ func (q *AggregatedQueueServer) getJobs(ctx context.Context, req *api.StreamingL
)
continue
}
nodes[i] = node

jobIds := make([]string, 0, len(nodeInfo.RunIdsByState))
for jobId, jobState := range nodeInfo.RunIdsByState {
Expand Down Expand Up @@ -378,34 +393,22 @@ func (q *AggregatedQueueServer) getJobs(ctx context.Context, req *api.StreamingL
}

// Bind pods to nodes, thus ensuring resources are marked as allocated on the node.
if node, err = nodedb.BindJobsToNode(q.schedulingConfig.Preemption.PriorityClasses, jobs, node); err != nil {
if err := nodeDb.CreateAndInsertWithApiJobsWithTxn(txn, jobs, node); err != nil {
return nil, err
}

// Record which node each job is scheduled on. Necessary for gang preemption.
for _, job := range jobs {
nodeIdByJobId[job.Id] = node.Id
}
nodes = append(nodes, node)

// Record which queues have jobs running. Necessary to omit inactive queues.
for _, job := range jobs {
isActiveByQueueName[job.Queue] = true
}
}
nodeDb, err := nodedb.NewNodeDb(
q.schedulingConfig.Preemption.PriorityClasses,
q.schedulingConfig.MaxExtraNodesToConsider,
q.schedulingConfig.IndexedResources,
q.schedulingConfig.IndexedTaints,
q.schedulingConfig.IndexedNodeLabels,
)
if err != nil {
return nil, err
}
if err := nodeDb.UpsertMany(nodes); err != nil {
return nil, err
}

txn.Commit()

// Load allocation reports for all executors from Redis.
reportsByExecutor, err := q.usageRepository.GetClusterQueueResourceUsage()
Expand Down
9 changes: 4 additions & 5 deletions internal/scheduler/context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -657,9 +657,8 @@ func JobSchedulingContextsFromJobs[J interfaces.LegacySchedulerJob](priorityClas
type PodSchedulingContext struct {
// Time at which this context was created.
Created time.Time
// Node the pod was assigned to.
// If nil, the pod could not be assigned to any node.
Node *schedulerobjects.Node
// ID of the node that the pod was assigned to, or empty.
NodeId string
// Score indicates how well the pod fits on the selected node.
Score int
// Node types on which this pod could be scheduled.
Expand All @@ -673,8 +672,8 @@ type PodSchedulingContext struct {
func (pctx *PodSchedulingContext) String() string {
var sb strings.Builder
w := tabwriter.NewWriter(&sb, 1, 1, 1, ' ', 0)
if pctx.Node != nil {
fmt.Fprintf(w, "Node:\t%s\n", pctx.Node.Id)
if pctx.NodeId != "" {
fmt.Fprintf(w, "Node:\t%s\n", pctx.NodeId)
} else {
fmt.Fprint(w, "Node:\tnone\n")
}
Expand Down
8 changes: 6 additions & 2 deletions internal/scheduler/gang_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,8 +221,12 @@ func TestGangScheduler(t *testing.T) {
testfixtures.TestIndexedNodeLabels,
)
require.NoError(t, err)
err = nodeDb.UpsertMany(tc.Nodes)
require.NoError(t, err)
txn := nodeDb.Txn(true)
for _, node := range tc.Nodes {
err := nodeDb.CreateAndInsertWithJobDbJobsWithTxn(txn, nil, node)
require.NoError(t, err)
}
txn.Commit()
if tc.TotalResources.Resources == nil {
// Default to NodeDb total.
tc.TotalResources = nodeDb.TotalResources()
Expand Down
Loading

0 comments on commit 24fb052

Please sign in to comment.