Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make the node database operate on jobs instead of pods #2612

Merged
merged 7 commits into from
Jun 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions internal/armada/server/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,11 +370,9 @@ func (q *AggregatedQueueServer) getJobs(ctx context.Context, req *api.StreamingL
// Bind pods to nodes, thus ensuring resources are marked as allocated on the node.
skipNode := false
for _, job := range jobs {
node, err = nodedb.BindPodToNode(
scheduler.PodRequirementFromLegacySchedulerJob(
job,
q.schedulingConfig.Preemption.PriorityClasses,
),
node, err = nodedb.BindJobToNode(
q.schedulingConfig.Preemption.PriorityClasses,
job,
node,
)
if err != nil {
Expand Down
60 changes: 0 additions & 60 deletions internal/scheduler/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package scheduler
import (
"fmt"
"strconv"
"time"

"github.com/pkg/errors"
"golang.org/x/exp/maps"
Expand All @@ -12,7 +11,6 @@ import (
armadamaps "github.com/armadaproject/armada/internal/common/maps"
armadaslices "github.com/armadaproject/armada/internal/common/slices"
schedulerconfig "github.com/armadaproject/armada/internal/scheduler/configuration"
schedulercontext "github.com/armadaproject/armada/internal/scheduler/context"
"github.com/armadaproject/armada/internal/scheduler/interfaces"
"github.com/armadaproject/armada/internal/scheduler/schedulerobjects"
)
Expand Down Expand Up @@ -111,27 +109,6 @@ func JobsSummary(jobs []interfaces.LegacySchedulerJob) string {
)
}

func jobSchedulingContextsFromJobs[T interfaces.LegacySchedulerJob](jobs []T, executorId string, priorityClasses map[string]configuration.PriorityClass) []*schedulercontext.JobSchedulingContext {
if jobs == nil {
return nil
}
if len(jobs) == 0 {
return make([]*schedulercontext.JobSchedulingContext, 0)
}
jctxs := make([]*schedulercontext.JobSchedulingContext, len(jobs))
timestamp := time.Now()
for i, job := range jobs {
jctxs[i] = &schedulercontext.JobSchedulingContext{
Created: timestamp,
ExecutorId: executorId,
JobId: job.GetId(),
Job: job,
Req: PodRequirementFromLegacySchedulerJob(job, priorityClasses),
}
}
return jctxs
}

func isEvictedJob(job interfaces.LegacySchedulerJob) bool {
return job.GetAnnotations()[schedulerconfig.IsEvictedAnnotation] == "true"
}
Expand Down Expand Up @@ -168,40 +145,3 @@ func GangIdAndCardinalityFromAnnotations(annotations map[string]string) (string,
}
return gangId, gangCardinality, true, nil
}

func PodRequirementsFromLegacySchedulerJobs[S ~[]E, E interfaces.LegacySchedulerJob](jobs S, priorityClasses map[string]configuration.PriorityClass) []*schedulerobjects.PodRequirements {
rv := make([]*schedulerobjects.PodRequirements, len(jobs))
for i, job := range jobs {
rv[i] = PodRequirementFromLegacySchedulerJob(job, priorityClasses)
}
return rv
}

func PodRequirementFromLegacySchedulerJob[E interfaces.LegacySchedulerJob](job E, priorityClasses map[string]configuration.PriorityClass) *schedulerobjects.PodRequirements {
annotations := make(map[string]string, len(configuration.ArmadaManagedAnnotations)+len(schedulerconfig.ArmadaSchedulerManagedAnnotations))
for _, key := range configuration.ArmadaManagedAnnotations {
if value, ok := job.GetAnnotations()[key]; ok {
annotations[key] = value
}
}
for _, key := range schedulerconfig.ArmadaSchedulerManagedAnnotations {
if value, ok := job.GetAnnotations()[key]; ok {
annotations[key] = value
}
}
annotations[schedulerconfig.JobIdAnnotation] = job.GetId()
annotations[schedulerconfig.QueueAnnotation] = job.GetQueue()
info := job.GetJobSchedulingInfo(priorityClasses)
req := PodRequirementFromJobSchedulingInfo(info)
req.Annotations = annotations
return req
}

func PodRequirementFromJobSchedulingInfo(info *schedulerobjects.JobSchedulingInfo) *schedulerobjects.PodRequirements {
for _, oreq := range info.ObjectRequirements {
if preq := oreq.GetPodRequirements(); preq != nil {
return preq
}
}
return nil
}
8 changes: 3 additions & 5 deletions internal/scheduler/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,11 @@ import (

"github.com/armadaproject/armada/internal/armada/configuration"
"github.com/armadaproject/armada/internal/common/util"
schedulerconfig "github.com/armadaproject/armada/internal/scheduler/configuration"
"github.com/armadaproject/armada/internal/scheduler/schedulerobjects"
"github.com/armadaproject/armada/pkg/api"
)

func TestPodRequirementFromLegacySchedulerJob(t *testing.T) {
func TestGetPodRequirements(t *testing.T) {
resourceLimit := v1.ResourceList{
"cpu": resource.MustParse("1"),
"memory": resource.MustParse("128Mi"),
Expand Down Expand Up @@ -64,13 +63,12 @@ func TestPodRequirementFromLegacySchedulerJob(t *testing.T) {
PreemptionPolicy: string(v1.PreemptLowerPriority),
ResourceRequirements: requirements,
Annotations: map[string]string{
"something": "test",
configuration.GangIdAnnotation: "gang-id",
configuration.GangCardinalityAnnotation: "1",
schedulerconfig.JobIdAnnotation: j.Id,
schedulerconfig.QueueAnnotation: j.Queue,
},
}
actual := PodRequirementFromLegacySchedulerJob(j, map[string]configuration.PriorityClass{"armada-default": {Priority: int32(1)}})
actual := j.GetPodRequirements(map[string]configuration.PriorityClass{"armada-default": {Priority: int32(1)}})
assert.Equal(t, expected, actual)
}

Expand Down
17 changes: 4 additions & 13 deletions internal/scheduler/configuration/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,23 +10,14 @@ import (
)

const (
// IsEvictedAnnotation, indicates a pod was evicted in this round and is currently running.
// Used by the scheduler to differentiate between pods from running and queued jobs.
// IsEvictedAnnotation is set on evicted jobs; the scheduler uses it to differentiate between
// already-running and queued jobs.
IsEvictedAnnotation = "armadaproject.io/isEvicted"
// JobIdAnnotation if set on a pod, indicates which job this pod is part of.
JobIdAnnotation = "armadaproject.io/jobId"
// QueueAnnotation if set on a pod, indicates which queue this pod is part of.
QueueAnnotation = "armadaproject.io/queue"
// IdNodeLabel is automatically added to nodes in the NodeDb.
// NodeIdLabel is set on evicted jobs, so that the scheduler only tries to schedule them on the
// nodes that they are already running on; nodedb is responsible for labelling its Node objects.
NodeIdLabel = "armadaproject.io/nodeId"
)

var ArmadaSchedulerManagedAnnotations = []string{
IsEvictedAnnotation,
JobIdAnnotation,
QueueAnnotation,
}

type Configuration struct {
// Database configuration
Postgres configuration.PostgresConfig
Expand Down
50 changes: 26 additions & 24 deletions internal/scheduler/context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,12 +228,12 @@ func (sctx *SchedulingContext) AddJobSchedulingContext(jctx *JobSchedulingContex
}
if jctx.IsSuccessful() {
if evictedInThisRound {
sctx.EvictedResources.SubV1ResourceList(jctx.Req.ResourceRequirements.Requests)
sctx.EvictedResourcesByPriorityClass.SubV1ResourceList(jctx.Job.GetPriorityClassName(), jctx.Req.ResourceRequirements.Requests)
sctx.EvictedResources.SubV1ResourceList(jctx.PodRequirements.ResourceRequirements.Requests)
sctx.EvictedResourcesByPriorityClass.SubV1ResourceList(jctx.Job.GetPriorityClassName(), jctx.PodRequirements.ResourceRequirements.Requests)
sctx.NumEvictedJobs--
} else {
sctx.ScheduledResources.AddV1ResourceList(jctx.Req.ResourceRequirements.Requests)
sctx.ScheduledResourcesByPriorityClass.AddV1ResourceList(jctx.Job.GetPriorityClassName(), jctx.Req.ResourceRequirements.Requests)
sctx.ScheduledResources.AddV1ResourceList(jctx.PodRequirements.ResourceRequirements.Requests)
sctx.ScheduledResourcesByPriorityClass.AddV1ResourceList(jctx.Job.GetPriorityClassName(), jctx.PodRequirements.ResourceRequirements.Requests)
sctx.NumScheduledJobs++
}
}
Expand Down Expand Up @@ -440,23 +440,23 @@ func (qctx *QueueSchedulingContext) AddJobSchedulingContext(jctx *JobSchedulingC
}
_, evictedInThisRound := qctx.EvictedJobsById[jctx.JobId]
if jctx.IsSuccessful() {
if jctx.Req == nil {
if jctx.PodRequirements == nil {
return false, errors.Errorf("failed adding job %s to queue: job requirements are missing", jctx.JobId)
}

// Always update ResourcesByPriority.
// Since ResourcesByPriority is used to order queues by fraction of fair share.
qctx.Allocated.AddV1ResourceList(jctx.Req.ResourceRequirements.Requests)
qctx.AllocatedByPriorityClass.AddV1ResourceList(jctx.Job.GetPriorityClassName(), jctx.Req.ResourceRequirements.Requests)
qctx.Allocated.AddV1ResourceList(jctx.PodRequirements.ResourceRequirements.Requests)
qctx.AllocatedByPriorityClass.AddV1ResourceList(jctx.Job.GetPriorityClassName(), jctx.PodRequirements.ResourceRequirements.Requests)

// Only if the job is not evicted, update ScheduledResourcesByPriority.
// Since ScheduledResourcesByPriority is used to control per-round scheduling constraints.
if evictedInThisRound {
delete(qctx.EvictedJobsById, jctx.JobId)
qctx.EvictedResourcesByPriorityClass.SubV1ResourceList(jctx.Job.GetPriorityClassName(), jctx.Req.ResourceRequirements.Requests)
qctx.EvictedResourcesByPriorityClass.SubV1ResourceList(jctx.Job.GetPriorityClassName(), jctx.PodRequirements.ResourceRequirements.Requests)
} else {
qctx.SuccessfulJobSchedulingContexts[jctx.JobId] = jctx
qctx.ScheduledResourcesByPriorityClass.AddV1ResourceList(jctx.Job.GetPriorityClassName(), jctx.Req.ResourceRequirements.Requests)
qctx.ScheduledResourcesByPriorityClass.AddV1ResourceList(jctx.Job.GetPriorityClassName(), jctx.PodRequirements.ResourceRequirements.Requests)
}
} else {
qctx.UnsuccessfulJobSchedulingContexts[jctx.JobId] = jctx
Expand Down Expand Up @@ -531,7 +531,7 @@ func NewGangSchedulingContext(jctxs []*JobSchedulingContext) *GangSchedulingCont
totalResourceRequests := schedulerobjects.NewResourceList(4)
for _, jctx := range jctxs {
allJobsEvicted = allJobsEvicted && isEvictedJob(jctx.Job)
totalResourceRequests.AddV1ResourceList(jctx.Req.ResourceRequirements.Requests)
totalResourceRequests.AddV1ResourceList(jctx.PodRequirements.ResourceRequirements.Requests)
}
return &GangSchedulingContext{
Created: time.Now(),
Expand All @@ -543,14 +543,6 @@ func NewGangSchedulingContext(jctxs []*JobSchedulingContext) *GangSchedulingCont
}
}

func (gctx GangSchedulingContext) PodRequirements() []*schedulerobjects.PodRequirements {
rv := make([]*schedulerobjects.PodRequirements, len(gctx.JobSchedulingContexts))
for i, jctx := range gctx.JobSchedulingContexts {
rv[i] = jctx.Req
}
return rv
}

func isEvictedJob(job interfaces.LegacySchedulerJob) bool {
return job.GetAnnotations()[schedulerconfig.IsEvictedAnnotation] == "true"
}
Expand All @@ -560,17 +552,13 @@ func isEvictedJob(job interfaces.LegacySchedulerJob) bool {
type JobSchedulingContext struct {
// Time at which this context was created.
Created time.Time
// Executor this job was attempted to be assigned to.
ExecutorId string
// Total number of nodes in the cluster when trying to schedule.
NumNodes int
// Id of the job this pod corresponds to.
JobId string
// Job spec.
Job interfaces.LegacySchedulerJob
// Scheduling requirements of this job.
// We currently require that each job contains exactly one pod spec.
Req *schedulerobjects.PodRequirements
PodRequirements *schedulerobjects.PodRequirements
// Reason for why the job could not be scheduled.
// Empty if the job was scheduled successfully.
UnschedulableReason string
Expand All @@ -583,7 +571,6 @@ func (jctx *JobSchedulingContext) String() string {
w := tabwriter.NewWriter(&sb, 1, 1, 1, ' ', 0)
fmt.Fprintf(w, "Time:\t%s\n", jctx.Created)
fmt.Fprintf(w, "Job ID:\t%s\n", jctx.JobId)
fmt.Fprintf(w, "Number of nodes in cluster:\t%d\n", jctx.NumNodes)
if jctx.UnschedulableReason != "" {
fmt.Fprintf(w, "UnschedulableReason:\t%s\n", jctx.UnschedulableReason)
} else {
Expand All @@ -600,6 +587,20 @@ func (jctx *JobSchedulingContext) IsSuccessful() bool {
return jctx.UnschedulableReason == ""
}

func JobSchedulingContextsFromJobs[T interfaces.LegacySchedulerJob](priorityClasses map[string]configuration.PriorityClass, jobs []T) []*JobSchedulingContext {
jctxs := make([]*JobSchedulingContext, len(jobs))
timestamp := time.Now()
for i, job := range jobs {
jctxs[i] = &JobSchedulingContext{
Created: timestamp,
JobId: job.GetId(),
Job: job,
PodRequirements: job.GetPodRequirements(priorityClasses),
}
}
return jctxs
}

// PodSchedulingContext is returned by SelectAndBindNodeToPod and
// contains detailed information on the scheduling decision made for this pod.
type PodSchedulingContext struct {
Expand All @@ -626,6 +627,7 @@ func (pctx *PodSchedulingContext) String() string {
} else {
fmt.Fprint(w, "Node:\tnone\n")
}
fmt.Fprintf(w, "Number of nodes in cluster:\t%d\n", pctx.NumNodes)
if len(pctx.NumExcludedNodesByReason) == 0 {
fmt.Fprint(w, "Excluded nodes:\tnone\n")
} else {
Expand Down
8 changes: 3 additions & 5 deletions internal/scheduler/context/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,8 @@ func testNSmallCpuJobSchedulingContext(queue, priorityClassName string, n int) [
func testSmallCpuJobSchedulingContext(queue, priorityClassName string) *JobSchedulingContext {
job := testfixtures.Test1CpuJob(queue, priorityClassName)
return &JobSchedulingContext{
ExecutorId: "executor",
NumNodes: 1,
JobId: job.GetId(),
Job: job,
Req: job.GetJobSchedulingInfo(nil).ObjectRequirements[0].GetPodRequirements(),
JobId: job.GetId(),
Job: job,
PodRequirements: job.GetPodRequirements(testfixtures.TestPriorityClasses),
}
}
18 changes: 3 additions & 15 deletions internal/scheduler/gang_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ import (
"context"
"fmt"

"github.com/pkg/errors"

"github.com/armadaproject/armada/internal/common/util"
schedulerconstraints "github.com/armadaproject/armada/internal/scheduler/constraints"
schedulercontext "github.com/armadaproject/armada/internal/scheduler/context"
Expand Down Expand Up @@ -94,7 +92,7 @@ func (sch *GangScheduler) Schedule(ctx context.Context, gctx *schedulercontext.G
// Check that the job is large enough for this executor.
// This check needs to be here, since it relates to a specific job.
// Only perform limit checks for new jobs to avoid preempting jobs if, e.g., MinimumJobSize changes.
if ok, unschedulableReason = requestIsLargeEnough(gctx.TotalResourceRequests, sch.constraints.MinimumJobSize); !ok {
if ok, unschedulableReason = requestsAreLargeEnough(gctx.TotalResourceRequests, sch.constraints.MinimumJobSize); !ok {
return
}
if ok, unschedulableReason, err = sch.constraints.CheckPerQueueAndPriorityClassConstraints(
Expand All @@ -112,20 +110,10 @@ func (sch *GangScheduler) Schedule(ctx context.Context, gctx *schedulercontext.G
}

func (sch *GangScheduler) trySchedule(ctx context.Context, gctx *schedulercontext.GangSchedulingContext) (bool, string, error) {
pctxs, ok, err := sch.nodeDb.ScheduleMany(gctx.PodRequirements())
ok, err := sch.nodeDb.ScheduleMany(gctx.JobSchedulingContexts)
if err != nil {
return false, "", err
}
if len(pctxs) > len(gctx.JobSchedulingContexts) {
return false, "", errors.Errorf(
"received %d pod scheduling context(s), but gang has cardinality %d",
len(pctxs), len(gctx.JobSchedulingContexts),
)
}
for i, pctx := range pctxs {
gctx.JobSchedulingContexts[i].PodSchedulingContext = pctx
gctx.JobSchedulingContexts[i].NumNodes = pctx.NumNodes
}
if !ok {
unschedulableReason := ""
if len(gctx.JobSchedulingContexts) > 1 {
Expand All @@ -138,7 +126,7 @@ func (sch *GangScheduler) trySchedule(ctx context.Context, gctx *schedulercontex
return true, "", nil
}

func requestIsLargeEnough(totalResourceRequests, minRequest schedulerobjects.ResourceList) (bool, string) {
func requestsAreLargeEnough(totalResourceRequests, minRequest schedulerobjects.ResourceList) (bool, string) {
if len(minRequest.Resources) == 0 {
return true, ""
}
Expand Down
2 changes: 1 addition & 1 deletion internal/scheduler/gang_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ func TestGangScheduler(t *testing.T) {

var actualScheduledIndices []int
for i, gang := range tc.Gangs {
jctxs := jobSchedulingContextsFromJobs(gang, "", testfixtures.TestPriorityClasses)
jctxs := schedulercontext.JobSchedulingContextsFromJobs(testfixtures.TestPriorityClasses, gang)
gctx := schedulercontext.NewGangSchedulingContext(jctxs)
ok, reason, err := sch.Schedule(context.Background(), gctx)
require.NoError(t, err)
Expand Down
2 changes: 1 addition & 1 deletion internal/scheduler/interfaces/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ type LegacySchedulerJob interface {
GetPerQueuePriority() uint32
GetSubmitTime() time.Time
GetAnnotations() map[string]string
GetJobSchedulingInfo(map[string]configuration.PriorityClass) *schedulerobjects.JobSchedulingInfo
GetPodRequirements(priorityClasses map[string]configuration.PriorityClass) *schedulerobjects.PodRequirements
GetPriorityClassName() string
GetNodeSelector() map[string]string
GetAffinity() *v1.Affinity
Expand Down
Loading