Skip to content

Commit

Permalink
Merge branch 'master' into zuqq/operate-on-jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
zuqq authored Jun 27, 2023
2 parents 144f087 + fbff206 commit c950ffc
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 16 deletions.
1 change: 1 addition & 0 deletions internal/executor/reporter/job_event_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ func (eventReporter *JobEventReporter) reportStatusUpdate(old *v1.Pod, new *v1.P
// Don't report status change for pods Armada is deleting
// This prevents reporting JobFailed when we delete a pod - for example due to cancellation
if util.IsMarkedForDeletion(new) {
log.Infof("not sending event to report pod %s moving into phase %s as pod is marked for deletion", new.Name, new.Status.Phase)
return
}
eventReporter.reportCurrentStatus(new)
Expand Down
39 changes: 23 additions & 16 deletions internal/executor/service/pod_issue_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ const (
StuckStartingUp
StuckTerminating
ExternallyDeleted
ErrorDuringIssueHandling
)

type podIssue struct {
Expand Down Expand Up @@ -303,9 +304,7 @@ func (p *IssueHandler) handleNonRetryableJobIssue(issue *issue) {
// - Report JobUnableToScheduleEvent
// - Report JobReturnLeaseEvent
//
// Special consideration must be taken that most of these pods are somewhat "stuck" in pending.
// So can transition to Running/Completed/Failed in the middle of this
// We must not return the lease if the pod state changes - as likely it has become "unstuck"
// If the pod becomes Running/Completed/Failed in the middle of being deleted - swap this issue to a nonRetryableIssue where it will be Failed
func (p *IssueHandler) handleRetryableJobIssue(issue *issue) {
if !issue.RunIssue.Reported {
log.Infof("Retryable issue detected for job %s run %s - %s", issue.RunIssue.JobId, issue.RunIssue.RunId, issue.RunIssue.PodIssue.Message)
Expand All @@ -321,7 +320,25 @@ func (p *IssueHandler) handleRetryableJobIssue(issue *issue) {
}

if issue.CurrentPodState != nil {
// TODO consider moving this to a synchronous call - but long termination periods would need to be handled
if issue.CurrentPodState.Status.Phase != v1.PodPending {
p.markIssuesResolved(issue.RunIssue)
if issue.RunIssue.PodIssue.DeletionRequested {
p.registerIssue(&runIssue{
JobId: issue.RunIssue.JobId,
RunId: issue.RunIssue.RunId,
PodIssue: &podIssue{
OriginalPodState: issue.RunIssue.PodIssue.OriginalPodState,
Message: "Pod unexpectedly started up after delete was called",
Retryable: false,
DeletionRequested: false,
Type: ErrorDuringIssueHandling,
Cause: api.Cause_Error,
},
})
}
return
}

err := p.clusterContext.DeletePodWithCondition(issue.CurrentPodState, func(pod *v1.Pod) bool {
return pod.Status.Phase == v1.PodPending
}, true)
Expand Down Expand Up @@ -359,20 +376,10 @@ func hasPodIssueSelfResolved(issue *issue) bool {
return false
}

// Pod has completed - no need to report any issues
if util.IsInTerminalState(issue.CurrentPodState) {
return true
}

// Pod has started running, and we haven't requested deletion - let it continue
if issue.CurrentPodState.Status.Phase == v1.PodRunning && !issue.RunIssue.PodIssue.DeletionRequested {
// Pod has started up and we haven't tried to delete the pod yet - so resolve the issue
if issue.CurrentPodState.Status.Phase != v1.PodPending && !issue.RunIssue.PodIssue.DeletionRequested {
return true
}
// TODO There is an edge case here where the pod has started running but we have requested deletion
// Without a proper state model, we can't easily handle this correctly
// Ideally we'd see if it completes or deletes first and report it accordingly
// If it completes first - do nothing
// If it deletes first - report JobFailed (as we accidentally deleted it during the run)
}

return false
Expand Down
31 changes: 31 additions & 0 deletions internal/executor/service/pod_issue_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,37 @@ func TestPodIssueService_DeletesPodAndReportsLeaseReturned_IfRetryableStuckPod(t
assert.True(t, ok)
}

func TestPodIssueService_DeletesPodAndReportsFailed_IfRetryableStuckPodStartsUpAfterDeletionCalled(t *testing.T) {
podIssueService, _, fakeClusterContext, eventsReporter := setupTestComponents([]*job.RunState{})
retryableStuckPod := makeRetryableStuckPod(false)
addPod(t, fakeClusterContext, retryableStuckPod)

podIssueService.HandlePodIssues()

// Reports UnableToSchedule
assert.Len(t, eventsReporter.ReceivedEvents, 1)
_, ok := eventsReporter.ReceivedEvents[0].Event.(*api.JobUnableToScheduleEvent)
assert.True(t, ok)

// Reset events, and add pod back as running
eventsReporter.ReceivedEvents = []reporter.EventMessage{}
retryableStuckPod.Status.Phase = v1.PodRunning
addPod(t, fakeClusterContext, retryableStuckPod)

// Detects pod is now unexpectedly running and marks it non-retryable
podIssueService.HandlePodIssues()
assert.Len(t, eventsReporter.ReceivedEvents, 0)
assert.Len(t, getActivePods(t, fakeClusterContext), 1)

// Now processes the issue as non-retryable and fails the pod
podIssueService.HandlePodIssues()
assert.Len(t, getActivePods(t, fakeClusterContext), 0)

assert.Len(t, eventsReporter.ReceivedEvents, 1)
_, ok = eventsReporter.ReceivedEvents[0].Event.(*api.JobFailedEvent)
assert.True(t, ok)
}

func TestPodIssueService_ReportsFailed_IfDeletedExternally(t *testing.T) {
podIssueService, _, fakeClusterContext, eventsReporter := setupTestComponents([]*job.RunState{})
runningPod := makeRunningPod(false)
Expand Down

0 comments on commit c950ffc

Please sign in to comment.