Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid copying nodes where possible #2621

Merged
merged 9 commits into from
Jun 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 2 additions & 18 deletions internal/armada/server/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,24 +378,8 @@ func (q *AggregatedQueueServer) getJobs(ctx context.Context, req *api.StreamingL
}

// Bind pods to nodes, thus ensuring resources are marked as allocated on the node.
skipNode := false
for _, job := range jobs {
node, err = nodedb.BindJobToNode(
q.schedulingConfig.Preemption.PriorityClasses,
job,
node,
)
if err != nil {
logging.WithStacktrace(log, err).Warnf(
"skipping node %s from executor %s: failed to bind job %s to node",
nodeInfo.GetName(), req.GetClusterId(), job.Id,
)
skipNode = true
break
}
}
if skipNode {
continue
if node, err = nodedb.BindJobsToNode(q.schedulingConfig.Preemption.PriorityClasses, jobs, node); err != nil {
return nil, err
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This behavior is now the same as in the new scheduler.

}

// Record which node each job is scheduled on. Necessary for gang preemption.
Expand Down
2 changes: 1 addition & 1 deletion internal/scheduler/context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -638,7 +638,7 @@ func (jctx *JobSchedulingContext) IsSuccessful() bool {
return jctx.UnschedulableReason == ""
}

func JobSchedulingContextsFromJobs[T interfaces.LegacySchedulerJob](priorityClasses map[string]configuration.PriorityClass, jobs []T) []*JobSchedulingContext {
func JobSchedulingContextsFromJobs[J interfaces.LegacySchedulerJob](priorityClasses map[string]configuration.PriorityClass, jobs []J) []*JobSchedulingContext {
jctxs := make([]*JobSchedulingContext, len(jobs))
timestamp := time.Now()
for i, job := range jobs {
Expand Down
114 changes: 79 additions & 35 deletions internal/scheduler/nodedb/nodedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -486,13 +486,31 @@ func (nodeDb *NodeDb) selectNodeForPodWithIt(
return selectedNode, nil
}

// BindJobsToNode returns a copy of node with all elements of jobs bound to it.
func BindJobsToNode[J interfaces.LegacySchedulerJob](priorityClasses map[string]configuration.PriorityClass, jobs []J, node *schedulerobjects.Node) (*schedulerobjects.Node, error) {
node = node.DeepCopy()
for _, job := range jobs {
if err := bindJobToNodeInPlace(priorityClasses, job, node); err != nil {
return nil, err
}
}
return node, nil
}

// BindJobToNode returns a copy of node with job bound to it.
func BindJobToNode(priorityClasses map[string]configuration.PriorityClass, job interfaces.LegacySchedulerJob, node *schedulerobjects.Node) (*schedulerobjects.Node, error) {
node = node.DeepCopy()
if err := bindJobToNodeInPlace(priorityClasses, job, node); err != nil {
return nil, err
}
return node, nil
}

// bindJobToNodeInPlace is like BindJobToNode, but doesn't make a copy of node.
func bindJobToNodeInPlace(priorityClasses map[string]configuration.PriorityClass, job interfaces.LegacySchedulerJob, node *schedulerobjects.Node) error {
jobId := job.GetId()
requests := job.GetResourceRequirements().Requests

node = node.DeepCopy()

_, isEvicted := node.EvictedJobRunIds[jobId]
delete(node.EvictedJobRunIds, jobId)

Expand All @@ -501,7 +519,7 @@ func BindJobToNode(priorityClasses map[string]configuration.PriorityClass, job i
node.AllocatedByJobId = make(map[string]schedulerobjects.ResourceList)
}
if allocatedToJob, ok := node.AllocatedByJobId[jobId]; ok {
return nil, errors.Errorf("job %s already has resources allocated on node %s", jobId, node.Id)
return errors.Errorf("job %s already has resources allocated on node %s", jobId, node.Id)
} else {
allocatedToJob.AddV1ResourceList(requests)
node.AllocatedByJobId[jobId] = allocatedToJob
Expand All @@ -523,60 +541,86 @@ func BindJobToNode(priorityClasses map[string]configuration.PriorityClass, job i
allocatable.MarkAllocatableV1ResourceList(evictedPriority, requests)
}

return node, nil
return nil
}

// EvictJobFromNode returns a copy of node with job evicted from it. Specifically:
// EvictJobsFromNode returns a copy of node with all elements of jobs for which jobFilter returns
// true evicted from it, together with a slice containing exactly those jobs.
//
// - The job is marked as evicted on the node.
// - AllocatedByJobId and AllocatedByQueue are not updated.
// - Resources requested by the evicted pod are marked as allocated at priority evictedPriority.
func EvictJobFromNode(priorityClasses map[string]configuration.PriorityClass, job interfaces.LegacySchedulerJob, node *schedulerobjects.Node) (*schedulerobjects.Node, error) {
jobId := job.GetId()
queue := job.GetQueue()
requests := job.GetResourceRequirements().Requests

// Specifically:
//
// - The jobs that jobFilter returns true for are marked as evicted on the node.
// - Within AllocatableByPriorityAndResource, the resources allocated to these jobs are moved from
// the jobs' priorities to evictedPriority; they are not subtracted from AllocatedByJobId and
// AllocatedByQueue.
func EvictJobsFromNode(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could avoid some copies here by passing in a slice to which the evicted nodes are appended.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good idea; will follow up.

priorityClasses map[string]configuration.PriorityClass,
jobFilter func(interfaces.LegacySchedulerJob) bool,
jobs []interfaces.LegacySchedulerJob,
node *schedulerobjects.Node,
) ([]interfaces.LegacySchedulerJob, *schedulerobjects.Node, error) {
evicted := make([]interfaces.LegacySchedulerJob, 0)
node = node.DeepCopy()

// Ensure we track allocated resources at evictedPriority.
if _, ok := node.AllocatableByPriorityAndResource[evictedPriority]; !ok {
pMin := int32(math.MaxInt32)
ok := false
for p := range node.AllocatableByPriorityAndResource {
if p < pMin {
pMin = p
ok = true
}
for _, job := range jobs {
if jobFilter != nil && !jobFilter(job) {
continue
}
if ok {
node.AllocatableByPriorityAndResource[evictedPriority] = node.AllocatableByPriorityAndResource[pMin].DeepCopy()
evicted = append(evicted, job)
if err := evictJobFromNodeInPlace(priorityClasses, job, node); err != nil {
return nil, nil, err
}
}
return evicted, node, nil
}

// evictJobFromNodeInPlace is the in-place operation backing EvictJobsFromNode.
func evictJobFromNodeInPlace(priorityClasses map[string]configuration.PriorityClass, job interfaces.LegacySchedulerJob, node *schedulerobjects.Node) error {
jobId := job.GetId()
if _, ok := node.AllocatedByJobId[jobId]; !ok {
return nil, errors.Errorf("job %s has no resources allocated on node %s", jobId, node.Id)
return errors.Errorf("job %s has no resources allocated on node %s", jobId, node.Id)
}

queue := job.GetQueue()
if _, ok := node.AllocatedByQueue[queue]; !ok {
return nil, errors.Errorf("queue %s has no resources allocated on node %s", queue, node.Id)
return errors.Errorf("queue %s has no resources allocated on node %s", queue, node.Id)
}

if node.EvictedJobRunIds == nil {
node.EvictedJobRunIds = make(map[string]bool)
}
if _, ok := node.EvictedJobRunIds[jobId]; ok {
// TODO: We're using run ids instead of job ids for now.
return nil, errors.Errorf("job %s is already evicted from node %s", jobId, node.Id)
} else {
node.EvictedJobRunIds[jobId] = true
return errors.Errorf("job %s is already evicted from node %s", jobId, node.Id)
}
node.EvictedJobRunIds[jobId] = true

if _, ok := node.AllocatableByPriorityAndResource[evictedPriority]; !ok {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can get rid of this check once we stop storing Node values in the node database.

minimumPriority := int32(math.MaxInt32)
foundPriority := false
for p := range node.AllocatableByPriorityAndResource {
if p < minimumPriority {
minimumPriority = p
foundPriority = true
}
}
if foundPriority {
node.AllocatableByPriorityAndResource[evictedPriority] = node.AllocatableByPriorityAndResource[minimumPriority].DeepCopy()
} else {
// We do not expect to hit this branch; however, if we do, then we need
// to make sure that evictedPriority is in this map so that the call to
// MarkAllocatedV1ResourceList() below knows about it.
node.AllocatableByPriorityAndResource[evictedPriority] = schedulerobjects.ResourceList{}
}
}
allocatable := schedulerobjects.AllocatableByPriorityAndResourceType(node.AllocatableByPriorityAndResource)
priority := priorityClasses[job.GetPriorityClassName()].Priority
requests := job.GetResourceRequirements().Requests
allocatable.MarkAllocatableV1ResourceList(priority, requests)
allocatable.MarkAllocatedV1ResourceList(evictedPriority, requests)
return node, nil

return nil
}

// UnbindJobsFromNode returns a node with all reqs unbound from it.
// UnbindJobsFromNode returns a node with all elements of jobs unbound from it.
func UnbindJobsFromNode(priorityClasses map[string]configuration.PriorityClass, jobs []interfaces.LegacySchedulerJob, node *schedulerobjects.Node) (*schedulerobjects.Node, error) {
node = node.DeepCopy()
for _, job := range jobs {
Expand All @@ -587,7 +631,7 @@ func UnbindJobsFromNode(priorityClasses map[string]configuration.PriorityClass,
return node, nil
}

// UnbindJobFromNode returns a copy of node with req unbound from it.
// UnbindJobFromNode returns a copy of node with job unbound from it.
func UnbindJobFromNode(priorityClasses map[string]configuration.PriorityClass, job interfaces.LegacySchedulerJob, node *schedulerobjects.Node) (*schedulerobjects.Node, error) {
node = node.DeepCopy()
if err := unbindJobFromNodeInPlace(priorityClasses, job, node); err != nil {
Expand All @@ -596,7 +640,7 @@ func UnbindJobFromNode(priorityClasses map[string]configuration.PriorityClass, j
return node, nil
}

// unbindPodFromNodeInPlace is like UnbindJobFromNode, but doesn't make a copy of the node.
// unbindPodFromNodeInPlace is like UnbindJobFromNode, but doesn't make a copy of node.
func unbindJobFromNodeInPlace(priorityClasses map[string]configuration.PriorityClass, job interfaces.LegacySchedulerJob, node *schedulerobjects.Node) error {
jobId := job.GetId()
requests := job.GetResourceRequirements().Requests
Expand Down
58 changes: 55 additions & 3 deletions internal/scheduler/nodedb/nodedb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ func TestSelectNodeForPod_NodeIdLabel_Failure(t *testing.T) {
}

func TestNodeBindingEvictionUnbinding(t *testing.T) {
jobFilter := func(job interfaces.LegacySchedulerJob) bool { return true }
node := testfixtures.Test8GpuNode(append(testfixtures.TestPriorities, evictedPriority))
job := testfixtures.Test1GpuJob("A", testfixtures.PriorityClass0)
request := schedulerobjects.ResourceListFromV1ResourceList(job.GetResourceRequirements().Requests)
Expand All @@ -130,16 +131,17 @@ func TestNodeBindingEvictionUnbinding(t *testing.T) {
unboundMultipleNode, err := UnbindJobsFromNode(testfixtures.TestPriorityClasses, []interfaces.LegacySchedulerJob{job}, boundNode)
require.NoError(t, err)

evictedNode, err := EvictJobFromNode(testfixtures.TestPriorityClasses, job, boundNode)
evictedJobs, evictedNode, err := EvictJobsFromNode(testfixtures.TestPriorityClasses, jobFilter, []interfaces.LegacySchedulerJob{job}, boundNode)
require.NoError(t, err)
assert.Equal(t, []interfaces.LegacySchedulerJob{job}, evictedJobs)

evictedUnboundNode, err := UnbindJobFromNode(testfixtures.TestPriorityClasses, job, evictedNode)
require.NoError(t, err)

evictedBoundNode, err := BindJobToNode(testfixtures.TestPriorityClasses, job, evictedNode)
require.NoError(t, err)

_, err = EvictJobFromNode(testfixtures.TestPriorityClasses, job, node)
_, _, err = EvictJobsFromNode(testfixtures.TestPriorityClasses, jobFilter, []interfaces.LegacySchedulerJob{job}, node)
require.Error(t, err)

_, err = UnbindJobFromNode(testfixtures.TestPriorityClasses, job, node)
Expand All @@ -148,7 +150,7 @@ func TestNodeBindingEvictionUnbinding(t *testing.T) {
_, err = BindJobToNode(testfixtures.TestPriorityClasses, job, boundNode)
require.Error(t, err)

_, err = EvictJobFromNode(testfixtures.TestPriorityClasses, job, evictedNode)
_, _, err = EvictJobsFromNode(testfixtures.TestPriorityClasses, jobFilter, []interfaces.LegacySchedulerJob{job}, evictedNode)
require.Error(t, err)

assertNodeAccountingEqual(t, node, unboundNode)
Expand Down Expand Up @@ -255,6 +257,56 @@ func assertNodeAccountingEqual(t *testing.T, node1, node2 *schedulerobjects.Node
return rv
}

func TestEviction(t *testing.T) {
tests := map[string]struct {
jobFilter func(interfaces.LegacySchedulerJob) bool
expectedEvictions []int32
}{
"jobFilter always returns false": {
jobFilter: func(_ interfaces.LegacySchedulerJob) bool { return false },
expectedEvictions: []int32{},
},
"jobFilter always returns true": {
jobFilter: func(_ interfaces.LegacySchedulerJob) bool { return true },
expectedEvictions: []int32{0, 1},
},
"jobFilter returns true for preemptible jobs": {
jobFilter: func(job interfaces.LegacySchedulerJob) bool {
priorityClassName := job.GetPriorityClassName()
priorityClass := testfixtures.TestPriorityClasses[priorityClassName]
return priorityClass.Preemptible
},
expectedEvictions: []int32{0},
},
"jobFilter nil": {
jobFilter: nil,
expectedEvictions: []int32{0, 1},
},
}
for name, tc := range tests {
t.Run(name, func(t *testing.T) {
node := testfixtures.Test32CpuNode(testfixtures.TestPriorities)
jobs := []interfaces.LegacySchedulerJob{
testfixtures.Test1Cpu4GiJob("queue-alice", testfixtures.PriorityClass0),
testfixtures.Test1Cpu4GiJob("queue-alice", testfixtures.PriorityClass3),
}
var err error
for _, job := range jobs {
node, err = BindJobToNode(testfixtures.TestPriorityClasses, job, node)
require.NoError(t, err)
}

actualEvictions, _, err := EvictJobsFromNode(testfixtures.TestPriorityClasses, tc.jobFilter, jobs, node)
require.NoError(t, err)
expectedEvictions := make([]interfaces.LegacySchedulerJob, 0, len(tc.expectedEvictions))
for _, i := range tc.expectedEvictions {
expectedEvictions = append(expectedEvictions, jobs[i])
}
assert.Equal(t, expectedEvictions, actualEvictions)
})
}
}

func TestScheduleIndividually(t *testing.T) {
tests := map[string]struct {
Nodes []*schedulerobjects.Node
Expand Down
45 changes: 23 additions & 22 deletions internal/scheduler/preempting_queue_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"github.com/armadaproject/armada/internal/armada/configuration"
armadamaps "github.com/armadaproject/armada/internal/common/maps"
armadaslices "github.com/armadaproject/armada/internal/common/slices"
"github.com/armadaproject/armada/internal/common/util"
schedulerconfig "github.com/armadaproject/armada/internal/scheduler/configuration"
schedulerconstraints "github.com/armadaproject/armada/internal/scheduler/constraints"
schedulercontext "github.com/armadaproject/armada/internal/scheduler/context"
Expand Down Expand Up @@ -788,46 +787,48 @@ func NewOversubscribedEvictor(
// Any job for which jobFilter returns true is evicted (if the node was not skipped).
// If a job was evicted from a node, postEvictFunc is called with the corresponding job and node.
func (evi *Evictor) Evict(ctx context.Context, it nodedb.NodeIterator) (*EvictorResult, error) {
var jobFilter func(job interfaces.LegacySchedulerJob) bool
if evi.jobFilter != nil {
jobFilter = func(job interfaces.LegacySchedulerJob) bool { return evi.jobFilter(ctx, job) }
}
evictedJobsById := make(map[string]interfaces.LegacySchedulerJob)
affectedNodesById := make(map[string]*schedulerobjects.Node)
nodeIdByJobId := make(map[string]string)
for node := it.NextNode(); node != nil; node = it.NextNode() {
if evi.nodeFilter != nil && !evi.nodeFilter(ctx, node) {
continue
}
jobIds := util.Filter(
maps.Keys(node.AllocatedByJobId),
func(jobId string) bool {
_, ok := node.EvictedJobRunIds[jobId]
return !ok
},
)
jobIds := make([]string, 0)
for jobId := range node.AllocatedByJobId {
if _, ok := node.EvictedJobRunIds[jobId]; !ok {
jobIds = append(jobIds, jobId)
}
}
jobs, err := evi.jobRepo.GetExistingJobsByIds(jobIds)
if err != nil {
return nil, err
}
for _, job := range jobs {
if evi.jobFilter != nil && !evi.jobFilter(ctx, job) {
continue
}
node, err = nodedb.EvictJobFromNode(evi.priorityClasses, job, node)
if err != nil {
return nil, err
}
evictedJobs, node, err := nodedb.EvictJobsFromNode(evi.priorityClasses, jobFilter, jobs, node)
if err != nil {
return nil, err
}
for _, job := range evictedJobs {
evictedJobsById[job.GetId()] = job
nodeIdByJobId[job.GetId()] = node.Id
if evi.postEvictFunc != nil {
evi.postEvictFunc(ctx, job, node)
}

evictedJobsById[job.GetId()] = job
nodeIdByJobId[job.GetId()] = node.Id
}
affectedNodesById[node.Id] = node
if len(evictedJobs) > 0 {
affectedNodesById[node.Id] = node
}
}
return &EvictorResult{
result := &EvictorResult{
EvictedJobsById: evictedJobsById,
AffectedNodesById: affectedNodesById,
NodeIdByJobId: nodeIdByJobId,
}, nil
}
return result, nil
}

// TODO: This is only necessary for jobs not scheduled in this cycle.
Expand Down
Loading