Skip to content

Commit

Permalink
Merge branch 'master' into severinson/per-pc-limits
Browse files Browse the repository at this point in the history
  • Loading branch information
severinson authored Jun 22, 2023
2 parents f11417a + 476bc55 commit 7e1ce11
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 15 deletions.
6 changes: 4 additions & 2 deletions config/scheduler/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,10 @@ scheduling:
resourceScarcity:
cpu: 1.0
indexedResources:
- cpu
- memory
- name: "cpu"
resolution: "100m"
- name: "memory"
resolution: "1Mi"
gangIdAnnotation: armadaproject.io/gangId
gangCardinalityAnnotation: armadaproject.io/gangCardinality

18 changes: 6 additions & 12 deletions internal/executor/service/pod_issue_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,6 @@ func (p *PodIssueService) detectPodIssues(allManagedPods []*v1.Pod) {
}

p.registerIssue(issue)
break
} else if pod.Status.Phase == v1.PodUnknown || pod.Status.Phase == v1.PodPending {

podEvents, err := p.clusterContext.GetPodEvents(pod)
Expand Down Expand Up @@ -177,15 +176,14 @@ func (p *PodIssueService) detectPodIssues(allManagedPods []*v1.Pod) {
Type: podIssueType,
}
p.registerIssue(issue)
break
}
}
}
}

func (p *PodIssueService) handleKnownPodIssues(ctx context.Context, allManagedPods []*v1.Pod) {
issues := createIssues(allManagedPods, p.knownPodIssues)
// Make issues from pods + issues
issues := createIssues(allManagedPods, p.knownPodIssues)
util.ProcessItemsWithThreadPool(ctx, 20, issues, p.handlePodIssue)
}

Expand All @@ -212,13 +210,9 @@ func createIssues(managedPods []*v1.Pod, podIssues map[string]*podIssue) []*issu
}

func (p *PodIssueService) handlePodIssue(issue *issue) {
// Skip jobs with no issues
if issue == nil {
return
}

hasSelfResolved := hasPodIssueSelfResolved(issue)
if hasSelfResolved {
log.Infof("Issue for job %s run %s has self resolved", issue.Issue.JobId, issue.Issue.RunId)
p.markIssuesResolved(issue.Issue)
return
}
Expand All @@ -237,6 +231,7 @@ func (p *PodIssueService) handlePodIssue(issue *issue) {
// Once that is done we are free to cleanup the pod
func (p *PodIssueService) handleNonRetryableJobIssue(issue *issue) {
if !issue.Issue.Reported {
log.Infof("Non-retryable issue detected for job %s run %s - %s", issue.Issue.JobId, issue.Issue.RunId, issue.Issue.Message)
message := issue.Issue.Message

events := make([]reporter.EventMessage, 0, 2)
Expand All @@ -252,7 +247,6 @@ func (p *PodIssueService) handleNonRetryableJobIssue(issue *issue) {
log.Errorf("Failed to report failed event for job %s because %s", issue.Issue.JobId, err)
return
}
log.Infof("Non-retryable issue detected for job %s run %s - %s", issue.Issue.JobId, issue.Issue.RunId, issue.Issue.Message)
p.markIssueReported(issue.Issue)
}

Expand All @@ -273,6 +267,7 @@ func (p *PodIssueService) handleNonRetryableJobIssue(issue *issue) {
// We must not return the lease if the pod state changes - as likely it has become "unstuck"
func (p *PodIssueService) handleRetryableJobIssue(issue *issue) {
if !issue.Issue.Reported {
log.Infof("Retryable issue detected for job %s run %s - %s", issue.Issue.JobId, issue.Issue.RunId, issue.Issue.Message)
if issue.Issue.Type == StuckStartingUp || issue.Issue.Type == UnableToSchedule {
event := reporter.CreateJobUnableToScheduleEvent(issue.Issue.OriginalPodState, issue.Issue.Message, p.clusterContext.GetClusterId())
err := p.eventReporter.Report([]reporter.EventMessage{{Event: event, JobRunId: issue.Issue.RunId}})
Expand All @@ -281,7 +276,6 @@ func (p *PodIssueService) handleRetryableJobIssue(issue *issue) {
return
}
}
log.Infof("Retryable issue detected for job %s run %s - %s", issue.Issue.JobId, issue.Issue.RunId, issue.Issue.Message)
p.markIssueReported(issue.Issue)
}

Expand Down Expand Up @@ -312,7 +306,7 @@ func (p *PodIssueService) handleRetryableJobIssue(issue *issue) {
}

func hasPodIssueSelfResolved(issue *issue) bool {
if issue == nil {
if issue == nil || issue.Issue == nil {
return true
}

Expand Down Expand Up @@ -359,7 +353,7 @@ func (p *PodIssueService) handleDeletedPod(pod *v1.Pod) {
OriginalPodState: pod,
JobId: jobId,
RunId: util.ExtractJobRunId(pod),
Message: "Pod was unexpected deleted",
Message: "Pod was unexpectedly deleted",
Retryable: false,
Reported: false,
Type: ExternallyDeleted,
Expand Down
2 changes: 1 addition & 1 deletion third_party/airflow/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ license = { text = "Apache Software License" }
readme = "README.md"

[project.optional-dependencies]
format = ["black==23.3.0", "flake8==6.0.0", "pylint==2.17.3"]
format = ["black==23.3.0", "flake8==6.0.0", "pylint==2.17.4"]
test = ["pytest==7.3.1", "coverage>=6.5.0", "pytest-asyncio==0.21.0"]
docs = ["sphinx", "sphinx-jekyll-builder"]

Expand Down

0 comments on commit 7e1ce11

Please sign in to comment.