Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add basic reconciliation between executor RunState and kubernetes #2604

Merged
merged 3 commits into from
Jun 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions config/executor/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ kubernetes:
fatalPodSubmissionErrors:
- "admission webhook"
- "namespaces \".*\" not found"
stateChecks:
deadlineForSubmittedPodConsideredMissing: 15m
deadlineForActivePodConsideredMissing: 5m
pendingPodChecks:
deadlineForUpdates: 10m
deadlineForNodeAssignment: 5m
Expand Down
4 changes: 3 additions & 1 deletion internal/executor/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,9 +189,11 @@ func setupExecutorApiComponents(
jobRunState,
submitter,
etcdHealthMonitor)
podIssueService := service.NewPodIssueService(
podIssueService := service.NewIssueHandler(
jobRunState,
clusterContext,
eventReporter,
config.Kubernetes.StateChecks,
pendingPodChecker,
config.Kubernetes.StuckTerminatingPodExpiry)

Expand Down
12 changes: 12 additions & 0 deletions internal/executor/configuration/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,17 @@ type PodDefaults struct {
Ingress *IngressConfiguration
}

type StateChecksConfiguration struct {
// Once a pod is submitted to kubernetes, this is how long we'll wait for it to appear in the kubernetes informer state
// If the pod hasn't appeared after this duration, it is considered missing
DeadlineForSubmittedPodConsideredMissing time.Duration
// Once the executor has seen a pod appear on the cluster, it considers that run Active
// If we get into a state where there is no longer a pod backing that Active run, this is how long we'll wait before we consider the pod missing
// The most likely cause of this is actually a bug in the executors processing of the kubernetes state
// However without it - we can have runs get indefinitely stuck as Active with no backing pod
DeadlineForActivePodConsideredMissing time.Duration
}

type IngressConfiguration struct {
HostnameSuffix string
CertNameSuffix string
Expand Down Expand Up @@ -54,6 +65,7 @@ type KubernetesConfiguration struct {
MaxTerminatedPods int
MinimumJobSize armadaresource.ComputeResources
PodDefaults *PodDefaults
StateChecks StateChecksConfiguration
PendingPodChecks *podchecks.Checks
FatalPodSubmissionErrors []string
// Minimum amount of resources marked as allocated to non-Armada pods on each node.
Expand Down
12 changes: 8 additions & 4 deletions internal/executor/job/job_run_state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,14 @@ func NewJobRunStateStore(clusterContext context.ClusterContext) *JobRunStateStor
return
}

stateStore.reportRunActive(pod)
if !util.IsPodFinishedAndReported(pod) {
stateStore.reportRunActive(pod)
}
},
})

// On start up, make sure our state matches current k8s state
err := stateStore.reconcileStateWithKubernetes()
err := stateStore.initialiseStateFromKubernetes()
if err != nil {
panic(err)
}
Expand All @@ -75,7 +77,7 @@ func NewJobRunStateStoreWithInitialState(initialJobRuns []*RunState) *JobRunStat
return stateStore
}

func (stateStore *JobRunStateStore) reconcileStateWithKubernetes() error {
func (stateStore *JobRunStateStore) initialiseStateFromKubernetes() error {
pods, err := stateStore.clusterContext.GetAllPods()
if err != nil {
return err
Expand All @@ -84,7 +86,9 @@ func (stateStore *JobRunStateStore) reconcileStateWithKubernetes() error {
return !util.IsLegacyManagedPod(pod)
})
for _, pod := range pods {
stateStore.reportRunActive(pod)
if !util.IsPodFinishedAndReported(pod) {
stateStore.reportRunActive(pod)
}
}

return nil
Expand Down
15 changes: 14 additions & 1 deletion internal/executor/job/job_run_state_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"sort"
"testing"
"time"

"github.com/stretchr/testify/assert"
v1 "k8s.io/api/core/v1"
Expand All @@ -23,7 +24,7 @@ var defaultRunInfoMeta = &RunMeta{
JobSet: "job-set-1",
}

func TestOnStartUp_ReconcilesWithKubernetes(t *testing.T) {
func TestOnStartUp_ReconcilesWithKubernetes_ActivePod(t *testing.T) {
existingPod := createPod()

jobRunStateManager, _ := setup(t, []*v1.Pod{existingPod})
Expand All @@ -38,6 +39,18 @@ func TestOnStartUp_ReconcilesWithKubernetes(t *testing.T) {
assert.Equal(t, allKnownJobRuns[0].Phase, Active)
}

func TestOnStartUp_ReconcilesWithKubernetes_IgnoresDonePods(t *testing.T) {
donePod := createPod()
donePod.Status.Phase = v1.PodSucceeded
donePod.Annotations[domain.JobDoneAnnotation] = "true"
donePod.Annotations[string(donePod.Status.Phase)] = fmt.Sprintf("%s", time.Now())

jobRunStateManager, _ := setup(t, []*v1.Pod{donePod})
allKnownJobRuns := jobRunStateManager.GetAll()

assert.Len(t, allKnownJobRuns, 0)
}

func TestReportRunLeased(t *testing.T) {
job := &SubmitJob{
Meta: SubmitJobMeta{
Expand Down
Loading