Skip to content

Commit

Permalink
Make the node database operate on jobs instead of pods
Browse files Browse the repository at this point in the history
  • Loading branch information
zuqq committed Jun 27, 2023
1 parent 7542cd0 commit 144f087
Show file tree
Hide file tree
Showing 29 changed files with 448 additions and 626 deletions.
8 changes: 3 additions & 5 deletions internal/armada/server/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,11 +370,9 @@ func (q *AggregatedQueueServer) getJobs(ctx context.Context, req *api.StreamingL
// Bind pods to nodes, thus ensuring resources are marked as allocated on the node.
skipNode := false
for _, job := range jobs {
node, err = nodedb.BindPodToNode(
scheduler.PodRequirementFromLegacySchedulerJob(
job,
q.schedulingConfig.Preemption.PriorityClasses,
),
node, err = nodedb.BindJobToNode(
q.schedulingConfig.Preemption.PriorityClasses,
job,
node,
)
if err != nil {
Expand Down
60 changes: 0 additions & 60 deletions internal/scheduler/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"fmt"
"math"
"strconv"
"time"

"github.com/pkg/errors"
"golang.org/x/exp/maps"
Expand All @@ -13,7 +12,6 @@ import (
armadamaps "github.com/armadaproject/armada/internal/common/maps"
armadaslices "github.com/armadaproject/armada/internal/common/slices"
schedulerconfig "github.com/armadaproject/armada/internal/scheduler/configuration"
schedulercontext "github.com/armadaproject/armada/internal/scheduler/context"
"github.com/armadaproject/armada/internal/scheduler/interfaces"
"github.com/armadaproject/armada/internal/scheduler/schedulerobjects"
)
Expand Down Expand Up @@ -112,27 +110,6 @@ func JobsSummary(jobs []interfaces.LegacySchedulerJob) string {
)
}

func jobSchedulingContextsFromJobs[T interfaces.LegacySchedulerJob](jobs []T, executorId string, priorityClasses map[string]configuration.PriorityClass) []*schedulercontext.JobSchedulingContext {
if jobs == nil {
return nil
}
if len(jobs) == 0 {
return make([]*schedulercontext.JobSchedulingContext, 0)
}
jctxs := make([]*schedulercontext.JobSchedulingContext, len(jobs))
timestamp := time.Now()
for i, job := range jobs {
jctxs[i] = &schedulercontext.JobSchedulingContext{
Created: timestamp,
ExecutorId: executorId,
JobId: job.GetId(),
Job: job,
Req: PodRequirementFromLegacySchedulerJob(job, priorityClasses),
}
}
return jctxs
}

func isEvictedJob(job interfaces.LegacySchedulerJob) bool {
return job.GetAnnotations()[schedulerconfig.IsEvictedAnnotation] == "true"
}
Expand Down Expand Up @@ -180,40 +157,3 @@ func ResourceListAsWeightedMillis(weights map[string]float64, rl schedulerobject
}
return rv
}

func PodRequirementsFromLegacySchedulerJobs[S ~[]E, E interfaces.LegacySchedulerJob](jobs S, priorityClasses map[string]configuration.PriorityClass) []*schedulerobjects.PodRequirements {
rv := make([]*schedulerobjects.PodRequirements, len(jobs))
for i, job := range jobs {
rv[i] = PodRequirementFromLegacySchedulerJob(job, priorityClasses)
}
return rv
}

func PodRequirementFromLegacySchedulerJob[E interfaces.LegacySchedulerJob](job E, priorityClasses map[string]configuration.PriorityClass) *schedulerobjects.PodRequirements {
annotations := make(map[string]string, len(configuration.ArmadaManagedAnnotations)+len(schedulerconfig.ArmadaSchedulerManagedAnnotations))
for _, key := range configuration.ArmadaManagedAnnotations {
if value, ok := job.GetAnnotations()[key]; ok {
annotations[key] = value
}
}
for _, key := range schedulerconfig.ArmadaSchedulerManagedAnnotations {
if value, ok := job.GetAnnotations()[key]; ok {
annotations[key] = value
}
}
annotations[schedulerconfig.JobIdAnnotation] = job.GetId()
annotations[schedulerconfig.QueueAnnotation] = job.GetQueue()
info := job.GetJobSchedulingInfo(priorityClasses)
req := PodRequirementFromJobSchedulingInfo(info)
req.Annotations = annotations
return req
}

func PodRequirementFromJobSchedulingInfo(info *schedulerobjects.JobSchedulingInfo) *schedulerobjects.PodRequirements {
for _, oreq := range info.ObjectRequirements {
if preq := oreq.GetPodRequirements(); preq != nil {
return preq
}
}
return nil
}
8 changes: 3 additions & 5 deletions internal/scheduler/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,11 @@ import (

"github.com/armadaproject/armada/internal/armada/configuration"
"github.com/armadaproject/armada/internal/common/util"
schedulerconfig "github.com/armadaproject/armada/internal/scheduler/configuration"
"github.com/armadaproject/armada/internal/scheduler/schedulerobjects"
"github.com/armadaproject/armada/pkg/api"
)

func TestPodRequirementFromLegacySchedulerJob(t *testing.T) {
func TestGetPodRequirements(t *testing.T) {
resourceLimit := v1.ResourceList{
"cpu": resource.MustParse("1"),
"memory": resource.MustParse("128Mi"),
Expand Down Expand Up @@ -64,13 +63,12 @@ func TestPodRequirementFromLegacySchedulerJob(t *testing.T) {
PreemptionPolicy: string(v1.PreemptLowerPriority),
ResourceRequirements: requirements,
Annotations: map[string]string{
"something": "test",
configuration.GangIdAnnotation: "gang-id",
configuration.GangCardinalityAnnotation: "1",
schedulerconfig.JobIdAnnotation: j.Id,
schedulerconfig.QueueAnnotation: j.Queue,
},
}
actual := PodRequirementFromLegacySchedulerJob(j, map[string]configuration.PriorityClass{"armada-default": {Priority: int32(1)}})
actual := j.GetPodRequirements(map[string]configuration.PriorityClass{"armada-default": {Priority: int32(1)}})
assert.Equal(t, expected, actual)
}

Expand Down
17 changes: 4 additions & 13 deletions internal/scheduler/configuration/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,23 +10,14 @@ import (
)

const (
// IsEvictedAnnotation, indicates a pod was evicted in this round and is currently running.
// Used by the scheduler to differentiate between pods from running and queued jobs.
// IsEvictedAnnotation is set on evicted jobs; the scheduler uses it to differentiate between
// already-running and queued jobs.
IsEvictedAnnotation = "armadaproject.io/isEvicted"
// JobIdAnnotation if set on a pod, indicates which job this pod is part of.
JobIdAnnotation = "armadaproject.io/jobId"
// QueueAnnotation if set on a pod, indicates which queue this pod is part of.
QueueAnnotation = "armadaproject.io/queue"
// IdNodeLabel is automatically added to nodes in the NodeDb.
// NodeIdLabel is set on evicted jobs, so that the scheduler only tries to schedule them on the
// nodes that they are already running on; nodedb is responsible for labelling its Node objects.
NodeIdLabel = "armadaproject.io/nodeId"
)

var ArmadaSchedulerManagedAnnotations = []string{
IsEvictedAnnotation,
JobIdAnnotation,
QueueAnnotation,
}

type Configuration struct {
// Database configuration
Postgres configuration.PostgresConfig
Expand Down
50 changes: 26 additions & 24 deletions internal/scheduler/context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,12 +222,12 @@ func (sctx *SchedulingContext) AddJobSchedulingContext(jctx *JobSchedulingContex
}
if jctx.IsSuccessful() {
if evictedInThisRound {
sctx.EvictedResources.SubV1ResourceList(jctx.Req.ResourceRequirements.Requests)
sctx.EvictedResourcesByPriorityClass.SubV1ResourceList(jctx.Job.GetPriorityClassName(), jctx.Req.ResourceRequirements.Requests)
sctx.EvictedResources.SubV1ResourceList(jctx.PodRequirements.ResourceRequirements.Requests)
sctx.EvictedResourcesByPriorityClass.SubV1ResourceList(jctx.Job.GetPriorityClassName(), jctx.PodRequirements.ResourceRequirements.Requests)
sctx.NumEvictedJobs--
} else {
sctx.ScheduledResources.AddV1ResourceList(jctx.Req.ResourceRequirements.Requests)
sctx.ScheduledResourcesByPriorityClass.AddV1ResourceList(jctx.Job.GetPriorityClassName(), jctx.Req.ResourceRequirements.Requests)
sctx.ScheduledResources.AddV1ResourceList(jctx.PodRequirements.ResourceRequirements.Requests)
sctx.ScheduledResourcesByPriorityClass.AddV1ResourceList(jctx.Job.GetPriorityClassName(), jctx.PodRequirements.ResourceRequirements.Requests)
sctx.NumScheduledJobs++
}
}
Expand Down Expand Up @@ -434,23 +434,23 @@ func (qctx *QueueSchedulingContext) AddJobSchedulingContext(jctx *JobSchedulingC
}
_, evictedInThisRound := qctx.EvictedJobsById[jctx.JobId]
if jctx.IsSuccessful() {
if jctx.Req == nil {
if jctx.PodRequirements == nil {
return false, errors.Errorf("failed adding job %s to queue: job requirements are missing", jctx.JobId)
}

// Always update ResourcesByPriority.
// Since ResourcesByPriority is used to order queues by fraction of fair share.
qctx.Allocated.AddV1ResourceList(jctx.Req.ResourceRequirements.Requests)
qctx.AllocatedByPriorityClass.AddV1ResourceList(jctx.Job.GetPriorityClassName(), jctx.Req.ResourceRequirements.Requests)
qctx.Allocated.AddV1ResourceList(jctx.PodRequirements.ResourceRequirements.Requests)
qctx.AllocatedByPriorityClass.AddV1ResourceList(jctx.Job.GetPriorityClassName(), jctx.PodRequirements.ResourceRequirements.Requests)

// Only if the job is not evicted, update ScheduledResourcesByPriority.
// Since ScheduledResourcesByPriority is used to control per-round scheduling constraints.
if evictedInThisRound {
delete(qctx.EvictedJobsById, jctx.JobId)
qctx.EvictedResourcesByPriorityClass.SubV1ResourceList(jctx.Job.GetPriorityClassName(), jctx.Req.ResourceRequirements.Requests)
qctx.EvictedResourcesByPriorityClass.SubV1ResourceList(jctx.Job.GetPriorityClassName(), jctx.PodRequirements.ResourceRequirements.Requests)
} else {
qctx.SuccessfulJobSchedulingContexts[jctx.JobId] = jctx
qctx.ScheduledResourcesByPriorityClass.AddV1ResourceList(jctx.Job.GetPriorityClassName(), jctx.Req.ResourceRequirements.Requests)
qctx.ScheduledResourcesByPriorityClass.AddV1ResourceList(jctx.Job.GetPriorityClassName(), jctx.PodRequirements.ResourceRequirements.Requests)
}
} else {
qctx.UnsuccessfulJobSchedulingContexts[jctx.JobId] = jctx
Expand Down Expand Up @@ -512,7 +512,7 @@ func NewGangSchedulingContext(jctxs []*JobSchedulingContext) *GangSchedulingCont
totalResourceRequests := schedulerobjects.NewResourceList(4)
for _, jctx := range jctxs {
allJobsEvicted = allJobsEvicted && isEvictedJob(jctx.Job)
totalResourceRequests.AddV1ResourceList(jctx.Req.ResourceRequirements.Requests)
totalResourceRequests.AddV1ResourceList(jctx.PodRequirements.ResourceRequirements.Requests)
}
return &GangSchedulingContext{
Created: time.Now(),
Expand All @@ -524,14 +524,6 @@ func NewGangSchedulingContext(jctxs []*JobSchedulingContext) *GangSchedulingCont
}
}

func (gctx GangSchedulingContext) PodRequirements() []*schedulerobjects.PodRequirements {
rv := make([]*schedulerobjects.PodRequirements, len(gctx.JobSchedulingContexts))
for i, jctx := range gctx.JobSchedulingContexts {
rv[i] = jctx.Req
}
return rv
}

func isEvictedJob(job interfaces.LegacySchedulerJob) bool {
return job.GetAnnotations()[schedulerconfig.IsEvictedAnnotation] == "true"
}
Expand All @@ -541,17 +533,13 @@ func isEvictedJob(job interfaces.LegacySchedulerJob) bool {
type JobSchedulingContext struct {
// Time at which this context was created.
Created time.Time
// Executor this job was attempted to be assigned to.
ExecutorId string
// Total number of nodes in the cluster when trying to schedule.
NumNodes int
// Id of the job this pod corresponds to.
JobId string
// Job spec.
Job interfaces.LegacySchedulerJob
// Scheduling requirements of this job.
// We currently require that each job contains exactly one pod spec.
Req *schedulerobjects.PodRequirements
PodRequirements *schedulerobjects.PodRequirements
// Reason for why the job could not be scheduled.
// Empty if the job was scheduled successfully.
UnschedulableReason string
Expand All @@ -564,7 +552,6 @@ func (jctx *JobSchedulingContext) String() string {
w := tabwriter.NewWriter(&sb, 1, 1, 1, ' ', 0)
fmt.Fprintf(w, "Time:\t%s\n", jctx.Created)
fmt.Fprintf(w, "Job ID:\t%s\n", jctx.JobId)
fmt.Fprintf(w, "Number of nodes in cluster:\t%d\n", jctx.NumNodes)
if jctx.UnschedulableReason != "" {
fmt.Fprintf(w, "UnschedulableReason:\t%s\n", jctx.UnschedulableReason)
} else {
Expand All @@ -581,6 +568,20 @@ func (jctx *JobSchedulingContext) IsSuccessful() bool {
return jctx.UnschedulableReason == ""
}

func JobSchedulingContextsFromJobs[T interfaces.LegacySchedulerJob](priorityClasses map[string]configuration.PriorityClass, jobs []T) []*JobSchedulingContext {
jctxs := make([]*JobSchedulingContext, len(jobs))
timestamp := time.Now()
for i, job := range jobs {
jctxs[i] = &JobSchedulingContext{
Created: timestamp,
JobId: job.GetId(),
Job: job,
PodRequirements: job.GetPodRequirements(priorityClasses),
}
}
return jctxs
}

// PodSchedulingContext is returned by SelectAndBindNodeToPod and
// contains detailed information on the scheduling decision made for this pod.
type PodSchedulingContext struct {
Expand All @@ -607,6 +608,7 @@ func (pctx *PodSchedulingContext) String() string {
} else {
fmt.Fprint(w, "Node:\tnone\n")
}
fmt.Fprintf(w, "Number of nodes in cluster:\t%d\n", pctx.NumNodes)
if len(pctx.NumExcludedNodesByReason) == 0 {
fmt.Fprint(w, "Excluded nodes:\tnone\n")
} else {
Expand Down
8 changes: 3 additions & 5 deletions internal/scheduler/context/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,8 @@ func testNSmallCpuJobSchedulingContext(queue, priorityClassName string, n int) [
func testSmallCpuJobSchedulingContext(queue, priorityClassName string) *JobSchedulingContext {
job := testfixtures.Test1CpuJob(queue, priorityClassName)
return &JobSchedulingContext{
ExecutorId: "executor",
NumNodes: 1,
JobId: job.GetId(),
Job: job,
Req: job.GetJobSchedulingInfo(nil).ObjectRequirements[0].GetPodRequirements(),
JobId: job.GetId(),
Job: job,
PodRequirements: job.GetPodRequirements(testfixtures.TestPriorityClasses),
}
}
18 changes: 3 additions & 15 deletions internal/scheduler/gang_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ import (
"context"
"fmt"

"github.com/pkg/errors"

"github.com/armadaproject/armada/internal/common/util"
schedulerconstraints "github.com/armadaproject/armada/internal/scheduler/constraints"
schedulercontext "github.com/armadaproject/armada/internal/scheduler/context"
Expand Down Expand Up @@ -94,7 +92,7 @@ func (sch *GangScheduler) Schedule(ctx context.Context, gctx *schedulercontext.G
// Check that the job is large enough for this executor.
// This check needs to be here, since it relates to a specific job.
// Only perform limit checks for new jobs to avoid preempting jobs if, e.g., MinimumJobSize changes.
if ok, unschedulableReason = requestIsLargeEnough(gctx.TotalResourceRequests, sch.constraints.MinimumJobSize); !ok {
if ok, unschedulableReason = requestsAreLargeEnough(gctx.TotalResourceRequests, sch.constraints.MinimumJobSize); !ok {
return
}
if ok, unschedulableReason, err = sch.constraints.CheckPerQueueAndPriorityClassConstraints(
Expand All @@ -112,20 +110,10 @@ func (sch *GangScheduler) Schedule(ctx context.Context, gctx *schedulercontext.G
}

func (sch *GangScheduler) trySchedule(ctx context.Context, gctx *schedulercontext.GangSchedulingContext) (bool, string, error) {
pctxs, ok, err := sch.nodeDb.ScheduleMany(gctx.PodRequirements())
ok, err := sch.nodeDb.ScheduleMany(gctx.JobSchedulingContexts)
if err != nil {
return false, "", err
}
if len(pctxs) > len(gctx.JobSchedulingContexts) {
return false, "", errors.Errorf(
"received %d pod scheduling context(s), but gang has cardinality %d",
len(pctxs), len(gctx.JobSchedulingContexts),
)
}
for i, pctx := range pctxs {
gctx.JobSchedulingContexts[i].PodSchedulingContext = pctx
gctx.JobSchedulingContexts[i].NumNodes = pctx.NumNodes
}
if !ok {
unschedulableReason := ""
if len(gctx.JobSchedulingContexts) > 1 {
Expand All @@ -138,7 +126,7 @@ func (sch *GangScheduler) trySchedule(ctx context.Context, gctx *schedulercontex
return true, "", nil
}

func requestIsLargeEnough(totalResourceRequests, minRequest schedulerobjects.ResourceList) (bool, string) {
func requestsAreLargeEnough(totalResourceRequests, minRequest schedulerobjects.ResourceList) (bool, string) {
if len(minRequest.Resources) == 0 {
return true, ""
}
Expand Down
2 changes: 1 addition & 1 deletion internal/scheduler/gang_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ func TestGangScheduler(t *testing.T) {

var actualScheduledIndices []int
for i, gang := range tc.Gangs {
jctxs := jobSchedulingContextsFromJobs(gang, "", testfixtures.TestPriorityClasses)
jctxs := schedulercontext.JobSchedulingContextsFromJobs(testfixtures.TestPriorityClasses, gang)
gctx := schedulercontext.NewGangSchedulingContext(jctxs)
ok, reason, err := sch.Schedule(context.Background(), gctx)
require.NoError(t, err)
Expand Down
2 changes: 1 addition & 1 deletion internal/scheduler/interfaces/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ type LegacySchedulerJob interface {
GetPerQueuePriority() uint32
GetSubmitTime() time.Time
GetAnnotations() map[string]string
GetJobSchedulingInfo(map[string]configuration.PriorityClass) *schedulerobjects.JobSchedulingInfo
GetPodRequirements(priorityClasses map[string]configuration.PriorityClass) *schedulerobjects.PodRequirements
GetPriorityClassName() string
GetNodeSelector() map[string]string
GetAffinity() *v1.Affinity
Expand Down
Loading

0 comments on commit 144f087

Please sign in to comment.