Skip to content

Commit

Permalink
Add basic reconciliation between executor RunState and kubernetes
Browse files Browse the repository at this point in the history
We can get into the state where the executor RunState thinks there is an active run - but there is no pod backing the run

The result of this is that the run can never finish (as there is no pod) and will stay in Pending/Running forever

This PR just adds some basic reconciliation so if there is no pod backing the RunState then an action will be taken
  • Loading branch information
JamesMurkin committed Jun 23, 2023
1 parent cbb35d5 commit 9ace09a
Show file tree
Hide file tree
Showing 7 changed files with 355 additions and 98 deletions.
3 changes: 3 additions & 0 deletions config/executor/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ kubernetes:
fatalPodSubmissionErrors:
- "admission webhook"
- "namespaces \".*\" not found"
stateChecks:
deadlineForSubmittedPodConsideredMissing: 15m
deadlineForActivePodConsideredMissing: 5m
pendingPodChecks:
deadlineForUpdates: 10m
deadlineForNodeAssignment: 5m
Expand Down
4 changes: 3 additions & 1 deletion internal/executor/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,9 +189,11 @@ func setupExecutorApiComponents(
jobRunState,
submitter,
etcdHealthMonitor)
podIssueService := service.NewPodIssueService(
podIssueService := service.NewIssueHandler(
jobRunState,
clusterContext,
eventReporter,
config.Kubernetes.StateChecks,
pendingPodChecker,
config.Kubernetes.StuckTerminatingPodExpiry)

Expand Down
12 changes: 12 additions & 0 deletions internal/executor/configuration/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,17 @@ type PodDefaults struct {
Ingress *IngressConfiguration
}

type StateChecksConfiguration struct {
// Once a pod is submitted to kubernetes, this is how long we'll wait for it to appear in the kubernetes informer state
// If the pod hasn't appeared after this duration, it is considered missing
DeadlineForSubmittedPodConsideredMissing time.Duration
// Once the executor has seen a pod appear on the cluster, it considers that run Active
// If we get into a state where there is no longer a pod backing that Active run, this is how long we'll wait before we consider the pod missing
// The most likely cause of this is actually a bug in the executors processing of the kubernetes state
// However without it - we can have runs get indefinitely stuck as Active with no backing pod
DeadlineForActivePodConsideredMissing time.Duration
}

type IngressConfiguration struct {
HostnameSuffix string
CertNameSuffix string
Expand Down Expand Up @@ -54,6 +65,7 @@ type KubernetesConfiguration struct {
MaxTerminatedPods int
MinimumJobSize armadaresource.ComputeResources
PodDefaults *PodDefaults
StateChecks StateChecksConfiguration
PendingPodChecks *podchecks.Checks
FatalPodSubmissionErrors []string
// Minimum amount of resources marked as allocated to non-Armada pods on each node.
Expand Down
12 changes: 8 additions & 4 deletions internal/executor/job/job_run_state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,14 @@ func NewJobRunStateStore(clusterContext context.ClusterContext) *JobRunStateStor
return
}

stateStore.reportRunActive(pod)
if !util.IsPodFinishedAndReported(pod) {
stateStore.reportRunActive(pod)
}
},
})

// On start up, make sure our state matches current k8s state
err := stateStore.reconcileStateWithKubernetes()
err := stateStore.initialiseStateFromKubernetes()
if err != nil {
panic(err)
}
Expand All @@ -75,7 +77,7 @@ func NewJobRunStateStoreWithInitialState(initialJobRuns []*RunState) *JobRunStat
return stateStore
}

func (stateStore *JobRunStateStore) reconcileStateWithKubernetes() error {
func (stateStore *JobRunStateStore) initialiseStateFromKubernetes() error {
pods, err := stateStore.clusterContext.GetAllPods()
if err != nil {
return err
Expand All @@ -84,7 +86,9 @@ func (stateStore *JobRunStateStore) reconcileStateWithKubernetes() error {
return !util.IsLegacyManagedPod(pod)
})
for _, pod := range pods {
stateStore.reportRunActive(pod)
if !util.IsPodFinishedAndReported(pod) {
stateStore.reportRunActive(pod)
}
}

return nil
Expand Down
15 changes: 14 additions & 1 deletion internal/executor/job/job_run_state_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"sort"
"testing"
"time"

"github.com/stretchr/testify/assert"
v1 "k8s.io/api/core/v1"
Expand All @@ -23,7 +24,7 @@ var defaultRunInfoMeta = &RunMeta{
JobSet: "job-set-1",
}

func TestOnStartUp_ReconcilesWithKubernetes(t *testing.T) {
func TestOnStartUp_ReconcilesWithKubernetes_ActivePod(t *testing.T) {
existingPod := createPod()

jobRunStateManager, _ := setup(t, []*v1.Pod{existingPod})
Expand All @@ -38,6 +39,18 @@ func TestOnStartUp_ReconcilesWithKubernetes(t *testing.T) {
assert.Equal(t, allKnownJobRuns[0].Phase, Active)
}

func TestOnStartUp_ReconcilesWithKubernetes_IgnoresDonePods(t *testing.T) {
donePod := createPod()
donePod.Status.Phase = v1.PodSucceeded
donePod.Annotations[domain.JobDoneAnnotation] = "true"
donePod.Annotations[string(donePod.Status.Phase)] = fmt.Sprintf("%s", time.Now())

jobRunStateManager, _ := setup(t, []*v1.Pod{donePod})
allKnownJobRuns := jobRunStateManager.GetAll()

assert.Len(t, allKnownJobRuns, 0)
}

func TestReportRunLeased(t *testing.T) {
job := &SubmitJob{
Meta: SubmitJobMeta{
Expand Down
Loading

0 comments on commit 9ace09a

Please sign in to comment.