Skip to content

Commit

Permalink
Fix: When workflow already executed. After controller restart, health…
Browse files Browse the repository at this point in the history
…check could never be scheduled (#236)

Signed-off-by: Yu Jiang <yu_jiang@intuit.com>
  • Loading branch information
carlyjiang authored Sep 16, 2024
1 parent 52b6ad0 commit ad374d2
Show file tree
Hide file tree
Showing 6 changed files with 98 additions and 25 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
*.so
*.dylib
testbin/*
active-monitor-controller

# Temporary or metadata files
*.yaml-e
Expand Down
6 changes: 6 additions & 0 deletions Dockerfile-local
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# Use distroless as minimal base image to package the manager binary
# Refer to https://github.com/GoogleContainerTools/distroless for more details
FROM gcr.io/distroless/static:latest
WORKDIR /
COPY active-monitor-controller .
ENTRYPOINT [ "/active-monitor-controller" ]
8 changes: 8 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ test: manifests generate fmt vet envtest ## Run tests.
build: manifests generate fmt vet ## Build manager binary.
go build -o bin/manager cmd/main.go

.PHONY: build-amd64
build: manifests generate fmt vet ## Build manager binary.
CGO_ENABLED=0 GOOS=linux GOARCH=amd64 GO111MODULE=on go build -a -o active-monitor-controller cmd/main.go

.PHONY: run
run: manifests generate fmt vet ## Run a controller from your host.
go run ./cmd/main.go
Expand All @@ -83,6 +87,10 @@ run: manifests generate fmt vet ## Run a controller from your host.
docker-build: ## Build docker image with the manager.
$(CONTAINER_TOOL) build -t ${IMG} .

.PHONY: docker-build-local
docker-build: ## Build docker image with the manager.
$(CONTAINER_TOOL) build -t ${IMG} -f Dockerfile-local .

.PHONY: docker-push
docker-push: ## Push docker image with the manager.
$(CONTAINER_TOOL) push ${IMG}
Expand Down
24 changes: 17 additions & 7 deletions internal/controllers/healthcheck_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ func (r *HealthCheckReconciler) processHealthCheck(ctx context.Context, log logr
var finishedAtTime int64
if healthCheck.Status.FinishedAt != nil {
finishedAtTime = healthCheck.Status.FinishedAt.Time.Unix()
log.Info("FinishedAtTime", "finishedAtTime", finishedAtTime)
}

// workflows can be paused by setting repeatAfterSec to <= 0 and not specifying the schedule for cron.
Expand Down Expand Up @@ -217,8 +218,8 @@ func (r *HealthCheckReconciler) processHealthCheck(ctx context.Context, log logr
// we need to update the spec so have to healthCheck.Spec.RepeatAfterSec instead of local variable hcSpec
healthCheck.Spec.RepeatAfterSec = int(schedule.Next(time.Now()).Sub(time.Now())/time.Second) + 1
log.Info("spec.RepeatAfterSec value is set", "RepeatAfterSec", healthCheck.Spec.RepeatAfterSec)
} else if int(time.Now().Unix()-finishedAtTime) < hcSpec.RepeatAfterSec {
log.Info("Workflow already executed", "finishedAtTime", finishedAtTime)
} else if int(time.Now().Unix()-finishedAtTime) < hcSpec.RepeatAfterSec && r.RepeatTimersByName[healthCheck.GetName()] != nil {
log.Info("Workflow already executed, and there is repeat schedule has been added to RepeatTimersByName map", "finishedAtTime", finishedAtTime)
return ctrl.Result{}, nil
}

Expand Down Expand Up @@ -421,18 +422,25 @@ func (r *HealthCheckReconciler) deleteRBACForWorkflow(ctx context.Context, log l
// this function exists to assist with how a function called by the timer.AfterFunc() method operates to call a
// function which takes parameters, it is easiest to build this closure which holds access to the parameters we need.
// the helper returns a function object taking no parameters directly, this is what we want to give AfterFunc
func (r *HealthCheckReconciler) createSubmitWorkflowHelper(ctx context.Context, log logr.Logger, wfNamespace string, hc *activemonitorv1alpha1.HealthCheck) func() {
func (r *HealthCheckReconciler) createSubmitWorkflowHelper(ctx context.Context, log logr.Logger, wfNamespace string, prevHealthCheck *activemonitorv1alpha1.HealthCheck) func() {
return func() {
log.Info("Creating and Submitting Workflow...")
wfName, err := r.createSubmitWorkflow(ctx, log, hc)

healthCheckInstance := &activemonitorv1alpha1.HealthCheck{}
if err := r.Get(ctx, client.ObjectKey{Name: prevHealthCheck.Name, Namespace: prevHealthCheck.Namespace}, healthCheckInstance); err != nil {
log.Error(err, "Error getting healthcheck resource")
return
}

wfName, err := r.createSubmitWorkflow(ctx, log, healthCheckInstance)
if err != nil {
log.Error(err, "Error creating or submitting workflow")
r.Recorder.Event(hc, v1.EventTypeWarning, "Warning", "Error creating or submitting workflow")
r.Recorder.Event(healthCheckInstance, v1.EventTypeWarning, "Warning", "Error creating or submitting workflow")
}
err = r.watchWorkflowReschedule(ctx, ctrl.Request{}, log, wfNamespace, wfName, hc)
err = r.watchWorkflowReschedule(ctx, ctrl.Request{}, log, wfNamespace, wfName, healthCheckInstance)
if err != nil {
log.Error(err, "Error watching or rescheduling workflow")
r.Recorder.Event(hc, v1.EventTypeWarning, "Warning", "Error watching or rescheduling workflow")
r.Recorder.Event(healthCheckInstance, v1.EventTypeWarning, "Warning", "Error watching or rescheduling workflow")
}
}
}
Expand Down Expand Up @@ -652,6 +660,8 @@ func (r *HealthCheckReconciler) watchWorkflowReschedule(ctx context.Context, req
break
}
}

log.Info("waiting for workflow to complete", "namespace", wfNamespace, "name", wfName)
}

// since the workflow has taken an unknown duration of time to complete, it's possible that its parent
Expand Down
79 changes: 63 additions & 16 deletions internal/controllers/healthcheck_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"context"
"errors"
"fmt"
"io/ioutil"
"os"
"testing"
"time"

Expand All @@ -21,15 +21,19 @@ import (
)

var (
healthCheckNamespace = "health"
healthCheckName = "inline-monitor-remedy"
healthCheckKey = types.NamespacedName{Name: healthCheckName, Namespace: healthCheckNamespace}
healthCheckNameNs = "inline-monitor-remedy-namespace"
healthCheckKeyNs = types.NamespacedName{Name: healthCheckNameNs, Namespace: healthCheckNamespace}
healthCheckNamePause = "inline-hello-pause"
healthCheckKeyPause = types.NamespacedName{Name: healthCheckNamePause, Namespace: healthCheckNamespace}
healthCheckNameRetry = "inline-hello-custom-retry"
healthCheckKeyRetry = types.NamespacedName{Name: healthCheckNameRetry, Namespace: healthCheckNamespace}
healthCheckNamespace = "health"
healthCheckName = "inline-monitor-remedy"
healthCheckKey = types.NamespacedName{Name: healthCheckName, Namespace: healthCheckNamespace}
healthCheckNameNs = "inline-monitor-remedy-namespace"
healthCheckKeyNs = types.NamespacedName{Name: healthCheckNameNs, Namespace: healthCheckNamespace}
healthCheckNamePause = "inline-hello-pause"
healthCheckKeyPause = types.NamespacedName{Name: healthCheckNamePause, Namespace: healthCheckNamespace}
healthCheckNameRetry = "inline-hello-custom-retry"
healthCheckKeyRetry = types.NamespacedName{Name: healthCheckNameRetry, Namespace: healthCheckNamespace}
healthCheckAlreadyScheduled = "inline-monitor-remedy-already-scheduled"
healthCheckKeyAlreadyScheduled = types.NamespacedName{Name: healthCheckAlreadyScheduled, Namespace: healthCheckNamespace}

sharedCtrl *HealthCheckReconciler
)

const timeout = time.Second * 60
Expand All @@ -38,8 +42,7 @@ var _ = Describe("Active-Monitor Controller", func() {
Describe("healthCheck CR can be reconciled at cluster level", func() {
var instance *activemonitorv1alpha1.HealthCheck
It("instance should be parsable", func() {
// healthCheckYaml, err := ioutil.ReadFile("../examples/inlineHello.yaml")
healthCheckYaml, err := ioutil.ReadFile("../../examples/bdd/inlineMemoryRemedyUnitTest.yaml")
healthCheckYaml, err := os.ReadFile("../../examples/bdd/inlineMemoryRemedyUnitTest.yaml")
Expect(err).ToNot(HaveOccurred())
instance, err = parseHealthCheckYaml(healthCheckYaml)
Expect(err).ToNot(HaveOccurred())
Expand Down Expand Up @@ -77,8 +80,8 @@ var _ = Describe("Active-Monitor Controller", func() {
var instance *activemonitorv1alpha1.HealthCheck

It("instance should be parsable", func() {
// healthCheckYaml, err := ioutil.ReadFile("../examples/inlineHello.yaml")
healthCheckYaml, err := ioutil.ReadFile("../../examples/bdd/inlineMemoryRemedyUnitTest_Namespace.yaml")
// healthCheckYaml, err := os.ReadFile("../examples/inlineHello.yaml")
healthCheckYaml, err := os.ReadFile("../../examples/bdd/inlineMemoryRemedyUnitTest_Namespace.yaml")
Expect(err).ToNot(HaveOccurred())

instance, err = parseHealthCheckYaml(healthCheckYaml)
Expand Down Expand Up @@ -117,7 +120,7 @@ var _ = Describe("Active-Monitor Controller", func() {
var instance *activemonitorv1alpha1.HealthCheck

It("instance should be parsable", func() {
healthCheckYaml, err := ioutil.ReadFile("../../examples/bdd/inlineHelloTest.yaml")
healthCheckYaml, err := os.ReadFile("../../examples/bdd/inlineHelloTest.yaml")
Expect(err).ToNot(HaveOccurred())

instance, err = parseHealthCheckYaml(healthCheckYaml)
Expand Down Expand Up @@ -152,11 +155,55 @@ var _ = Describe("Active-Monitor Controller", func() {
})
})

Describe("healthCheck CR will be reconcile if it is executed and rescheduled", func() {
var instance *activemonitorv1alpha1.HealthCheck

It("instance should be parsable", func() {
healthCheckYaml, err := os.ReadFile("../../examples/bdd/inlineHelloTest.yaml")
Expect(err).ToNot(HaveOccurred())

instance, err = parseHealthCheckYaml(healthCheckYaml)
Expect(err).ToNot(HaveOccurred())
Expect(instance).To(BeAssignableToTypeOf(&activemonitorv1alpha1.HealthCheck{}))
Expect(instance.GetName()).To(Equal(healthCheckNamePause))
})

It("instance should be reconciled", func() {
instance.SetNamespace(healthCheckNamespace)
instance.SetName(healthCheckAlreadyScheduled)
instance.Spec.RepeatAfterSec = 60
sharedCtrl.RepeatTimersByName[instance.Name] = time.AfterFunc(time.Second*60, func() {})

err := k8sClient.Create(context.TODO(), instance)
if apierrors.IsInvalid(err) {
log.Error(err, "failed to create object, got an invalid object error")
return
}
Expect(err).NotTo(HaveOccurred())
defer k8sClient.Delete(context.TODO(), instance)

Eventually(func() error {
if err := k8sClient.Get(context.TODO(), healthCheckKeyAlreadyScheduled, instance); err != nil {
return err
}

if instance.Status.StartedAt != nil {
return nil
}
return fmt.Errorf("HealthCheck is not valid")
// return nil
}, timeout).Should(Succeed())

By("Verify healthCheck has been reconciled by checking for status")
Expect(instance.Status.ErrorMessage).ShouldNot(BeEmpty())
})
})

Describe("healthCheck CR will properly parse backoff customizations", func() {
var instance *activemonitorv1alpha1.HealthCheck

It("instance should be parsable", func() {
healthCheckYaml, err := ioutil.ReadFile("../../examples/bdd/inlineCustomBackoffTest.yaml")
healthCheckYaml, err := os.ReadFile("../../examples/bdd/inlineCustomBackoffTest.yaml")
Expect(err).ToNot(HaveOccurred())

instance, err = parseHealthCheckYaml(healthCheckYaml)
Expand Down
5 changes: 3 additions & 2 deletions internal/controllers/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,14 +106,15 @@ var _ = BeforeSuite(func() {
})
Expect(err).ToNot(HaveOccurred())

err = (&HealthCheckReconciler{
sharedCtrl = &HealthCheckReconciler{
Client: k8sManager.GetClient(),
DynClient: dynamic.NewForConfigOrDie(k8sManager.GetConfig()),
Recorder: k8sManager.GetEventRecorderFor("HealthCheck"),
kubeclient: kubernetes.NewForConfigOrDie(k8sManager.GetConfig()),
Log: log,
TimerLock: sync.RWMutex{},
}).SetupWithManager(k8sManager)
}
err = sharedCtrl.SetupWithManager(k8sManager)
Expect(err).ToNot(HaveOccurred())

go func() {
Expand Down

0 comments on commit ad374d2

Please sign in to comment.