Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[POC] Add a cause field to JobPreemptedEvent #3881

Draft
wants to merge 6 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions internal/scheduler/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package scheduler

import (
"fmt"
"strings"

"golang.org/x/exp/maps"

Expand Down Expand Up @@ -70,3 +71,36 @@ func PrintJobSummary(ctx *armadacontext.Context, prefix string, jctxs []*schedul
ctx.Infof("%s %s", prefix, summary)
ctx.Debugf("%s %s", prefix, verbose)
}

func PopulatePreemptionDescriptions(preemptedJobs []*schedulercontext.JobSchedulingContext, scheduledJobs []*schedulercontext.JobSchedulingContext) {
jobsScheduledWithUrgencyBasedPreemptionByNode := map[string][]*schedulercontext.JobSchedulingContext{}
for _, schedJob := range scheduledJobs {
if schedJob.PodSchedulingContext.SchedulingMethod != schedulercontext.ScheduledWithUrgencyBasedPreemption {
continue
}

nodeId := schedJob.PodSchedulingContext.NodeId
if _, ok := jobsScheduledWithUrgencyBasedPreemptionByNode[nodeId]; !ok {
jobsScheduledWithUrgencyBasedPreemptionByNode[nodeId] = []*schedulercontext.JobSchedulingContext{}
}
jobsScheduledWithUrgencyBasedPreemptionByNode[nodeId] = append(jobsScheduledWithUrgencyBasedPreemptionByNode[nodeId], schedJob)
}
for _, job := range preemptedJobs {
if job.PreemptingJobId == "" {
potentialPreemptingJobs := jobsScheduledWithUrgencyBasedPreemptionByNode[job.GetAssignedNodeId()]

if len(potentialPreemptingJobs) == 0 {
job.PreemptionDescription = fmt.Sprintf("Preempted by scheduler using urgency preemption - unknown preempting job")
} else if len(potentialPreemptingJobs) == 1 {
job.PreemptionDescription = fmt.Sprintf("Preempted by scheduler using urgency preemption - preempting job %s", potentialPreemptingJobs[0].JobId)
} else {
jobIds := armadaslices.Map(potentialPreemptingJobs, func(job *schedulercontext.JobSchedulingContext) string {
return job.JobId
})
job.PreemptionDescription = fmt.Sprintf("Preempted by scheduler using urgency preemption - preemption caused by one of the following jobs %s", strings.Join(jobIds, ","))
}
} else {
job.PreemptionDescription = fmt.Sprintf("Preempted by scheduler using fair share preemption - preempting job %s", job.PreemptingJobId)
}
}
}
5 changes: 5 additions & 0 deletions internal/scheduler/context/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ type JobSchedulingContext struct {
// This is the node the pod is assigned to.
// This is only set for evicted jobs and is set alongside adding an additionalNodeSelector for the node
AssignedNodeId string
// Id of job that preempted this pod
PreemptingJobId string
// Description of the cause of preemption
PreemptionDescription string
}

func (jctx *JobSchedulingContext) String() string {
Expand Down Expand Up @@ -93,6 +97,7 @@ func (jctx *JobSchedulingContext) Fail(unschedulableReason string) {
jctx.UnschedulableReason = unschedulableReason
if pctx := jctx.PodSchedulingContext; pctx != nil {
pctx.NodeId = ""
pctx.SchedulingMethod = None
}
}

Expand Down
13 changes: 13 additions & 0 deletions internal/scheduler/context/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,17 @@ import (
"time"
)

type SchedulingType int

const (
None SchedulingType = iota
Rescheduled
ScheduledWithoutPreemption
ScheduledWithFairSharePreemption
ScheduledWithUrgencyBasedPreemption
ScheduledAsAwayJob
)

// PodSchedulingContext is returned by SelectAndBindNodeToPod and
// contains detailed information on the scheduling decision made for this pod.
type PodSchedulingContext struct {
Expand All @@ -25,6 +36,8 @@ type PodSchedulingContext struct {
NumNodes int
// Number of nodes excluded by reason.
NumExcludedNodesByReason map[string]int
// The method of scheduling that was used to schedule this job
SchedulingMethod SchedulingType
}

func (pctx *PodSchedulingContext) IsSuccessful() bool {
Expand Down
6 changes: 6 additions & 0 deletions internal/scheduler/nodedb/nodedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -528,6 +528,7 @@ func (nodeDb *NodeDb) SelectNodeForJobWithTxn(txn *memdb.Txn, jctx *schedulercon
if node, err := nodeDb.selectNodeForPodWithItAtPriority(it, jctx, priority, true); err != nil {
return nil, err
} else {
jctx.PodSchedulingContext.SchedulingMethod = schedulercontext.Rescheduled
return node, nil
}
}
Expand All @@ -548,6 +549,7 @@ func (nodeDb *NodeDb) SelectNodeForJobWithTxn(txn *memdb.Txn, jctx *schedulercon
}
if node != nil {
pctx.WellKnownNodeTypeName = awayNodeType.WellKnownNodeTypeName
pctx.SchedulingMethod = schedulercontext.ScheduledAsAwayJob
return node, nil
}
}
Expand Down Expand Up @@ -606,6 +608,7 @@ func (nodeDb *NodeDb) selectNodeForJobWithTxnAtPriority(
} else if err := assertPodSchedulingContextNode(pctx, node); err != nil {
return nil, err
} else if node != nil {
pctx.SchedulingMethod = schedulercontext.ScheduledWithoutPreemption
return node, nil
}

Expand All @@ -629,6 +632,7 @@ func (nodeDb *NodeDb) selectNodeForJobWithTxnAtPriority(
} else if err := assertPodSchedulingContextNode(pctx, node); err != nil {
return nil, err
} else if node != nil {
pctx.SchedulingMethod = schedulercontext.ScheduledWithFairSharePreemption
return node, nil
}

Expand All @@ -642,6 +646,7 @@ func (nodeDb *NodeDb) selectNodeForJobWithTxnAtPriority(
} else if err := assertPodSchedulingContextNode(pctx, node); err != nil {
return nil, err
} else if node != nil {
pctx.SchedulingMethod = schedulercontext.ScheduledWithUrgencyBasedPreemption
return node, nil
}

Expand Down Expand Up @@ -867,6 +872,7 @@ func (nodeDb *NodeDb) selectNodeForJobWithFairPreemption(txn *memdb.Txn, jctx *s
if priority > maxPriority {
maxPriority = priority
}
job.JobSchedulingContext.PreemptingJobId = jctx.JobId
}

selectedNode = nodeCopy
Expand Down
1 change: 1 addition & 0 deletions internal/scheduler/preempting_queue_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ func (sch *PreemptingQueueScheduler) Schedule(ctx *armadacontext.Context) (*Sche
}
ctx.WithField("stage", "scheduling-algo").Infof("Finished unbinding preempted and evicted jobs")

PopulatePreemptionDescriptions(preemptedJobs, scheduledJobs)
PrintJobSummary(ctx, "Preempting running jobs;", preemptedJobs)
PrintJobSummary(ctx, "Scheduling new jobs;", scheduledJobs)
// TODO: Show failed jobs.
Expand Down
23 changes: 12 additions & 11 deletions internal/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,7 @@ func (s *Scheduler) eventsFromSchedulerResult(result *SchedulerResult) ([]*armad
// EventsFromSchedulerResult generates necessary EventSequences from the provided SchedulerResult.
func EventsFromSchedulerResult(result *SchedulerResult, time time.Time) ([]*armadaevents.EventSequence, error) {
eventSequences := make([]*armadaevents.EventSequence, 0, len(result.PreemptedJobs)+len(result.ScheduledJobs))
eventSequences, err := AppendEventSequencesFromPreemptedJobs(eventSequences, PreemptedJobsFromSchedulerResult(result), time)
eventSequences, err := AppendEventSequencesFromPreemptedJobs(eventSequences, result.PreemptedJobs, time)
if err != nil {
return nil, err
}
Expand All @@ -492,26 +492,26 @@ func EventsFromSchedulerResult(result *SchedulerResult, time time.Time) ([]*arma
return eventSequences, nil
}

func AppendEventSequencesFromPreemptedJobs(eventSequences []*armadaevents.EventSequence, jobs []*jobdb.Job, time time.Time) ([]*armadaevents.EventSequence, error) {
for _, job := range jobs {
jobId, err := armadaevents.ProtoUuidFromUlidString(job.Id())
func AppendEventSequencesFromPreemptedJobs(eventSequences []*armadaevents.EventSequence, jctxs []*schedulercontext.JobSchedulingContext, time time.Time) ([]*armadaevents.EventSequence, error) {
for _, jctx := range jctxs {
jobId, err := armadaevents.ProtoUuidFromUlidString(jctx.JobId)
if err != nil {
return nil, err
}
run := job.LatestRun()
run := jctx.Job.LatestRun()
if run == nil {
return nil, errors.Errorf("attempting to generate preempted eventSequences for job %s with no associated runs", job.Id())
return nil, errors.Errorf("attempting to generate preempted eventSequences for job %s with no associated runs", jctx.JobId)
}
eventSequences = append(eventSequences, &armadaevents.EventSequence{
Queue: job.Queue(),
JobSetName: job.Jobset(),
Events: createEventsForPreemptedJob(jobId, armadaevents.ProtoUuidFromUuid(run.Id()), time),
Queue: jctx.Job.Queue(),
JobSetName: jctx.Job.Jobset(),
Events: createEventsForPreemptedJob(jobId, armadaevents.ProtoUuidFromUuid(run.Id()), jctx.PreemptionDescription, time),
})
}
return eventSequences, nil
}

func createEventsForPreemptedJob(jobId *armadaevents.Uuid, runId *armadaevents.Uuid, time time.Time) []*armadaevents.EventSequence_Event {
func createEventsForPreemptedJob(jobId *armadaevents.Uuid, runId *armadaevents.Uuid, cause string, time time.Time) []*armadaevents.EventSequence_Event {
return []*armadaevents.EventSequence_Event{
{
Created: protoutil.ToTimestamp(time),
Expand All @@ -521,6 +521,7 @@ func createEventsForPreemptedJob(jobId *armadaevents.Uuid, runId *armadaevents.U
PreemptedRunIdStr: armadaevents.MustUuidStringFromProtoUuid(runId),
PreemptedJobId: jobId,
PreemptedJobIdStr: armadaevents.MustUlidStringFromProtoUuid(jobId),
Cause: cause,
},
},
},
Expand Down Expand Up @@ -814,7 +815,7 @@ func (s *Scheduler) generateUpdateMessagesFromJob(ctx *armadacontext.Context, jo
}
} else if lastRun.PreemptRequested() && job.PriorityClass().Preemptible {
job = job.WithQueued(false).WithFailed(true).WithUpdatedRun(lastRun.WithoutTerminal().WithFailed(true))
events = append(events, createEventsForPreemptedJob(jobId, armadaevents.ProtoUuidFromUuid(lastRun.Id()), s.clock.Now())...)
events = append(events, createEventsForPreemptedJob(jobId, armadaevents.ProtoUuidFromUuid(lastRun.Id()), "Preemption requested via API", s.clock.Now())...)
}
}

Expand Down
1 change: 1 addition & 0 deletions internal/server/event/conversion/conversions.go
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,7 @@ func FromInternalJobRunPreempted(queueName string, jobSetName string, time time.
RunId: runId,
PreemptiveJobId: preemptiveJobId,
PreemptiveRunId: preemptiveRunId,
Cause: e.Cause,
}

return []*api.EventMessage{
Expand Down
3 changes: 3 additions & 0 deletions pkg/api/api.swagger.go
Original file line number Diff line number Diff line change
Expand Up @@ -1243,6 +1243,9 @@ func SwaggerJsonTemplate() string {
" \"apiJobPreemptedEvent\": {\n" +
" \"type\": \"object\",\n" +
" \"properties\": {\n" +
" \"cause\": {\n" +
" \"type\": \"string\"\n" +
" },\n" +
" \"clusterId\": {\n" +
" \"type\": \"string\"\n" +
" },\n" +
Expand Down
3 changes: 3 additions & 0 deletions pkg/api/api.swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -1232,6 +1232,9 @@
"apiJobPreemptedEvent": {
"type": "object",
"properties": {
"cause": {
"type": "string"
},
"clusterId": {
"type": "string"
},
Expand Down
Loading
Loading