Skip to content

Commit

Permalink
Merge branch 'master' into feat/test-workflow
Browse files Browse the repository at this point in the history
  • Loading branch information
dejanzele committed Jun 28, 2023
2 parents b6560b5 + d6aef28 commit 325580b
Show file tree
Hide file tree
Showing 16 changed files with 283 additions and 125 deletions.
1 change: 1 addition & 0 deletions config/armada/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ scheduling:
preemption:
nodeEvictionProbability: 1.0
nodeOversubscriptionEvictionProbability: 1.0
protectedFractionOfFairShare: 1.0
setNodeIdSelector: true
nodeIdLabel: kubernetes.io/hostname
setNodeName: false
Expand Down
2 changes: 2 additions & 0 deletions internal/armada/configuration/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,8 @@ type PreemptionConfig struct {
// the probability of evicting jobs on oversubscribed nodes, i.e.,
// nodes on which the total resource requests are greater than the available resources.
NodeOversubscriptionEvictionProbability float64
// Only queues allocated more than this fraction of their fair share are considered for preemption.
ProtectedFractionOfFairShare float64
// If true, the Armada scheduler will add to scheduled pods a node selector
// NodeIdLabel: <value of label on node selected by scheduler>.
// If true, NodeIdLabel must be non-empty.
Expand Down
7 changes: 6 additions & 1 deletion internal/armada/server/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,11 @@ func (q *AggregatedQueueServer) getJobs(ctx context.Context, req *api.StreamingL
schedulerobjects.ResourceList{Resources: totalCapacity},
)
for queue, priorityFactor := range priorityFactorByQueue {
if err := sctx.AddQueueSchedulingContext(queue, priorityFactor, allocatedByQueueAndPriorityClassForPool[queue]); err != nil {
var weight float64 = 1
if priorityFactor > 0 {
weight = 1 / priorityFactor
}
if err := sctx.AddQueueSchedulingContext(queue, weight, allocatedByQueueAndPriorityClassForPool[queue]); err != nil {
return nil, err
}
}
Expand All @@ -484,6 +488,7 @@ func (q *AggregatedQueueServer) getJobs(ctx context.Context, req *api.StreamingL
constraints,
q.schedulingConfig.Preemption.NodeEvictionProbability,
q.schedulingConfig.Preemption.NodeOversubscriptionEvictionProbability,
q.schedulingConfig.Preemption.ProtectedFractionOfFairShare,
&SchedulerJobRepositoryAdapter{
r: q.jobRepository,
},
Expand Down
1 change: 1 addition & 0 deletions internal/executor/reporter/job_event_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ func (eventReporter *JobEventReporter) reportStatusUpdate(old *v1.Pod, new *v1.P
// Don't report status change for pods Armada is deleting
// This prevents reporting JobFailed when we delete a pod - for example due to cancellation
if util.IsMarkedForDeletion(new) {
log.Infof("not sending event to report pod %s moving into phase %s as pod is marked for deletion", new.Name, new.Status.Phase)
return
}
eventReporter.reportCurrentStatus(new)
Expand Down
39 changes: 23 additions & 16 deletions internal/executor/service/pod_issue_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ const (
StuckStartingUp
StuckTerminating
ExternallyDeleted
ErrorDuringIssueHandling
)

type podIssue struct {
Expand Down Expand Up @@ -303,9 +304,7 @@ func (p *IssueHandler) handleNonRetryableJobIssue(issue *issue) {
// - Report JobUnableToScheduleEvent
// - Report JobReturnLeaseEvent
//
// Special consideration must be taken that most of these pods are somewhat "stuck" in pending.
// So can transition to Running/Completed/Failed in the middle of this
// We must not return the lease if the pod state changes - as likely it has become "unstuck"
// If the pod becomes Running/Completed/Failed in the middle of being deleted - swap this issue to a nonRetryableIssue where it will be Failed
func (p *IssueHandler) handleRetryableJobIssue(issue *issue) {
if !issue.RunIssue.Reported {
log.Infof("Retryable issue detected for job %s run %s - %s", issue.RunIssue.JobId, issue.RunIssue.RunId, issue.RunIssue.PodIssue.Message)
Expand All @@ -321,7 +320,25 @@ func (p *IssueHandler) handleRetryableJobIssue(issue *issue) {
}

if issue.CurrentPodState != nil {
// TODO consider moving this to a synchronous call - but long termination periods would need to be handled
if issue.CurrentPodState.Status.Phase != v1.PodPending {
p.markIssuesResolved(issue.RunIssue)
if issue.RunIssue.PodIssue.DeletionRequested {
p.registerIssue(&runIssue{
JobId: issue.RunIssue.JobId,
RunId: issue.RunIssue.RunId,
PodIssue: &podIssue{
OriginalPodState: issue.RunIssue.PodIssue.OriginalPodState,
Message: "Pod unexpectedly started up after delete was called",
Retryable: false,
DeletionRequested: false,
Type: ErrorDuringIssueHandling,
Cause: api.Cause_Error,
},
})
}
return
}

err := p.clusterContext.DeletePodWithCondition(issue.CurrentPodState, func(pod *v1.Pod) bool {
return pod.Status.Phase == v1.PodPending
}, true)
Expand Down Expand Up @@ -359,20 +376,10 @@ func hasPodIssueSelfResolved(issue *issue) bool {
return false
}

// Pod has completed - no need to report any issues
if util.IsInTerminalState(issue.CurrentPodState) {
return true
}

// Pod has started running, and we haven't requested deletion - let it continue
if issue.CurrentPodState.Status.Phase == v1.PodRunning && !issue.RunIssue.PodIssue.DeletionRequested {
// Pod has started up and we haven't tried to delete the pod yet - so resolve the issue
if issue.CurrentPodState.Status.Phase != v1.PodPending && !issue.RunIssue.PodIssue.DeletionRequested {
return true
}
// TODO There is an edge case here where the pod has started running but we have requested deletion
// Without a proper state model, we can't easily handle this correctly
// Ideally we'd see if it completes or deletes first and report it accordingly
// If it completes first - do nothing
// If it deletes first - report JobFailed (as we accidentally deleted it during the run)
}

return false
Expand Down
31 changes: 31 additions & 0 deletions internal/executor/service/pod_issue_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,37 @@ func TestPodIssueService_DeletesPodAndReportsLeaseReturned_IfRetryableStuckPod(t
assert.True(t, ok)
}

func TestPodIssueService_DeletesPodAndReportsFailed_IfRetryableStuckPodStartsUpAfterDeletionCalled(t *testing.T) {
podIssueService, _, fakeClusterContext, eventsReporter := setupTestComponents([]*job.RunState{})
retryableStuckPod := makeRetryableStuckPod(false)
addPod(t, fakeClusterContext, retryableStuckPod)

podIssueService.HandlePodIssues()

// Reports UnableToSchedule
assert.Len(t, eventsReporter.ReceivedEvents, 1)
_, ok := eventsReporter.ReceivedEvents[0].Event.(*api.JobUnableToScheduleEvent)
assert.True(t, ok)

// Reset events, and add pod back as running
eventsReporter.ReceivedEvents = []reporter.EventMessage{}
retryableStuckPod.Status.Phase = v1.PodRunning
addPod(t, fakeClusterContext, retryableStuckPod)

// Detects pod is now unexpectedly running and marks it non-retryable
podIssueService.HandlePodIssues()
assert.Len(t, eventsReporter.ReceivedEvents, 0)
assert.Len(t, getActivePods(t, fakeClusterContext), 1)

// Now processes the issue as non-retryable and fails the pod
podIssueService.HandlePodIssues()
assert.Len(t, getActivePods(t, fakeClusterContext), 0)

assert.Len(t, eventsReporter.ReceivedEvents, 1)
_, ok = eventsReporter.ReceivedEvents[0].Event.(*api.JobFailedEvent)
assert.True(t, ok)
}

func TestPodIssueService_ReportsFailed_IfDeletedExternally(t *testing.T) {
podIssueService, _, fakeClusterContext, eventsReporter := setupTestComponents([]*job.RunState{})
runningPod := makeRunningPod(false)
Expand Down
12 changes: 0 additions & 12 deletions internal/scheduler/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package scheduler

import (
"fmt"
"math"
"strconv"
"time"

Expand Down Expand Up @@ -170,17 +169,6 @@ func GangIdAndCardinalityFromAnnotations(annotations map[string]string) (string,
return gangId, gangCardinality, true, nil
}

// ResourceListAsWeightedMillis returns the linear combination of the milli values in rl with given weights.
// This function overflows for values that exceed MaxInt64. E.g., 1Pi is fine but not 10Pi.
func ResourceListAsWeightedMillis(weights map[string]float64, rl schedulerobjects.ResourceList) int64 {
var rv int64
for t, f := range weights {
q := rl.Get(t)
rv += int64(math.Round(float64(q.MilliValue()) * f))
}
return rv
}

func PodRequirementsFromLegacySchedulerJobs[S ~[]E, E interfaces.LegacySchedulerJob](jobs S, priorityClasses map[string]configuration.PriorityClass) []*schedulerobjects.PodRequirements {
rv := make([]*schedulerobjects.PodRequirements, len(jobs))
for i, job := range jobs {
Expand Down
4 changes: 2 additions & 2 deletions internal/scheduler/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func TestResourceListAsWeightedMillis(t *testing.T) {
}
for name, tc := range tests {
t.Run(name, func(t *testing.T) {
assert.Equal(t, tc.expected, ResourceListAsWeightedMillis(tc.weights, tc.rl))
assert.Equal(t, tc.expected, tc.rl.AsWeightedMillis(tc.weights))
})
}
}
Expand All @@ -151,6 +151,6 @@ func BenchmarkResourceListAsWeightedMillis(b *testing.B) {
}
b.ResetTimer()
for n := 0; n < b.N; n++ {
ResourceListAsWeightedMillis(weights, rl)
rl.AsWeightedMillis(weights)
}
}
27 changes: 23 additions & 4 deletions internal/scheduler/context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,12 @@ type SchedulingContext struct {
ResourceScarcity map[string]float64
// Per-queue scheduling contexts.
QueueSchedulingContexts map[string]*QueueSchedulingContext
// Sum of weights across all queues.
WeightSum float64
// Total resources across all clusters available at the start of the scheduling cycle.
TotalResources schedulerobjects.ResourceList
// = TotalResources.AsWeightedMillis(ResourceScarcity).
TotalResourcesAsWeightedMillis int64
// Resources assigned across all queues during this scheduling cycle.
ScheduledResources schedulerobjects.ResourceList
ScheduledResourcesByPriorityClass schedulerobjects.QuantityByTAndResourceType[string]
Expand Down Expand Up @@ -80,6 +84,7 @@ func NewSchedulingContext(
ResourceScarcity: resourceScarcity,
QueueSchedulingContexts: make(map[string]*QueueSchedulingContext),
TotalResources: totalResources.DeepCopy(),
TotalResourcesAsWeightedMillis: totalResources.AsWeightedMillis(resourceScarcity),
ScheduledResources: schedulerobjects.NewResourceListWithDefaultSize(),
ScheduledResourcesByPriorityClass: make(schedulerobjects.QuantityByTAndResourceType[string]),
EvictedResourcesByPriorityClass: make(schedulerobjects.QuantityByTAndResourceType[string]),
Expand All @@ -106,7 +111,7 @@ func (sctx *SchedulingContext) ClearUnfeasibleSchedulingKeys() {
sctx.UnfeasibleSchedulingKeys = make(map[schedulerobjects.SchedulingKey]*JobSchedulingContext)
}

func (sctx *SchedulingContext) AddQueueSchedulingContext(queue string, priorityFactor float64, initialAllocatedByPriorityClass schedulerobjects.QuantityByTAndResourceType[string]) error {
func (sctx *SchedulingContext) AddQueueSchedulingContext(queue string, weight float64, initialAllocatedByPriorityClass schedulerobjects.QuantityByTAndResourceType[string]) error {
if _, ok := sctx.QueueSchedulingContexts[queue]; ok {
return errors.WithStack(&armadaerrors.ErrInvalidArgument{
Name: "queue",
Expand All @@ -123,12 +128,13 @@ func (sctx *SchedulingContext) AddQueueSchedulingContext(queue string, priorityF
for _, rl := range initialAllocatedByPriorityClass {
allocated.Add(rl)
}
sctx.WeightSum += weight
qctx := &QueueSchedulingContext{
SchedulingContext: sctx,
Created: time.Now(),
ExecutorId: sctx.ExecutorId,
Queue: queue,
PriorityFactor: priorityFactor,
Weight: weight,
Allocated: allocated,
AllocatedByPriorityClass: initialAllocatedByPriorityClass,
ScheduledResourcesByPriorityClass: make(schedulerobjects.QuantityByTAndResourceType[string]),
Expand Down Expand Up @@ -313,8 +319,8 @@ type QueueSchedulingContext struct {
ExecutorId string
// Queue name.
Queue string
// These factors influence the fraction of resources assigned to each queue.
PriorityFactor float64
// Determines the fair share of this queue relative to other queues.
Weight float64
// Total resources assigned to the queue across all clusters by priority class priority.
// Includes jobs scheduled during this invocation of the scheduler.
Allocated schedulerobjects.ResourceList
Expand Down Expand Up @@ -490,6 +496,19 @@ func (qctx *QueueSchedulingContext) ClearJobSpecs() {
}
}

// FractionOfFairShare returns a number in [0, 1] indicating what fraction of its fair share this queue is allocated.
func (qctx *QueueSchedulingContext) FractionOfFairShare() float64 {
return qctx.FractionOfFairShareWithAllocation(qctx.Allocated)
}

// FractionOfFairShareWithAllocation returns a number in [0, 1] indicating what
// fraction of its fair share this queue is allocated if the total allocation of this queue is given by allocated.
func (qctx *QueueSchedulingContext) FractionOfFairShareWithAllocation(allocated schedulerobjects.ResourceList) float64 {
fairShare := qctx.Weight / qctx.SchedulingContext.WeightSum
allocatedAsWeightedMillis := allocated.AsWeightedMillis(qctx.SchedulingContext.ResourceScarcity)
return (float64(allocatedAsWeightedMillis) / float64(qctx.SchedulingContext.TotalResourcesAsWeightedMillis)) / fairShare
}

type GangSchedulingContext struct {
Created time.Time
Queue string
Expand Down
Loading

0 comments on commit 325580b

Please sign in to comment.