Skip to content

Commit

Permalink
Merge pull request #251 from syntasso/feat/219/reconciliation-on-an-i…
Browse files Browse the repository at this point in the history
…nterval

feat: configurable re-reconciliation of Kratix resources
  • Loading branch information
abangser authored Oct 15, 2024
2 parents 79a7422 + bca35ff commit b0e3c24
Show file tree
Hide file tree
Showing 18 changed files with 369 additions and 159 deletions.
10 changes: 10 additions & 0 deletions api/v1alpha1/promise_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ type PromiseStatus struct {
Status string `json:"status,omitempty"`
RequiredPromises []RequiredPromiseStatus `json:"requiredPromises,omitempty"`
RequiredBy []RequiredBy `json:"requiredBy,omitempty"`
LastAvailableTime *metav1.Time `json:"lastAvailableTime,omitempty"`
}

type PromiseSummary struct {
Expand Down Expand Up @@ -252,6 +253,15 @@ func (d Dependencies) Marshal() ([]byte, error) {
return io.ReadAll(buf)
}

func (p *Promise) GetCondition(conditionType string) *metav1.Condition {
for i := range p.Status.Conditions {
if p.Status.Conditions[i].Type == conditionType {
return &p.Status.Conditions[i]
}
}
return nil
}

//+kubebuilder:object:root=true

// PromiseList contains a list of Promise
Expand Down
4 changes: 4 additions & 0 deletions api/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions config/crd/bases/platform.kratix.io_promisereleases.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,9 @@ spec:
properties:
secretRef:
description: |-
SecretReference represents a Secret Reference. It has enough information to retrieve secret
in any namespace
Reference a secret with credentials to access the source.
For more details on the secret format, see the documentation:
https://docs.kratix.io/main/reference/promises/releases#promise-release
properties:
name:
description: name is unique within a namespace to reference
Expand Down
3 changes: 3 additions & 0 deletions config/crd/bases/platform.kratix.io_promises.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,9 @@ spec:
type: array
kind:
type: string
lastAvailableTime:
format: date-time
type: string
observedGeneration:
format: int64
type: integer
Expand Down
2 changes: 2 additions & 0 deletions controllers/assets/promise-with-workflow.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ apiVersion: platform.kratix.io/v1alpha1
kind: Promise
metadata:
name: promise-with-workflow
labels:
kratix.io/promise-version: v1.1.0
spec:
api:
apiVersion: apiextensions.k8s.io/v1
Expand Down
8 changes: 2 additions & 6 deletions controllers/dynamic_resource_request_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,15 +145,11 @@ func (r *DynamicResourceRequestController) Reconcile(ctx context.Context, req ct

jobOpts := workflow.NewOpts(ctx, r.Client, logger, rr, pipelineResources, "resource", r.NumberOfJobsToKeep)

requeue, err := reconcileConfigure(jobOpts)
if err != nil {
abort, err := reconcileConfigure(jobOpts)
if err != nil || abort {
return ctrl.Result{}, err
}

if requeue {
return defaultRequeue, nil
}

if rr.GetGeneration() != resourceutil.GetObservedGeneration(rr) {
resourceutil.SetStatus(rr, logger, "observedGeneration", rr.GetGeneration())
return ctrl.Result{}, opts.client.Status().Update(opts.ctx, rr)
Expand Down
7 changes: 4 additions & 3 deletions controllers/dynamic_resource_request_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ var _ = Describe("DynamicResourceRequestController", func() {

When("resource is being created", func() {
It("re-reconciles until completion", func() {
_, err := t.reconcileUntilCompletion(reconciler, resReq)
result, err := t.reconcileUntilCompletion(reconciler, resReq)
Expect(fakeK8sClient.Get(ctx, resReqNameNamespace, resReq)).To(Succeed())

resourceLabels := map[string]string{
Expand Down Expand Up @@ -151,8 +151,9 @@ var _ = Describe("DynamicResourceRequestController", func() {
Expect(strings.TrimSpace(destinationSelectors)).To(Equal(`- matchlabels: environment: dev source: promise`))
})

By("requeuing forever until jobs finishes", func() {
Expect(err).To(MatchError("reconcile loop detected"))
By("not requeuing, since the controller is watching the job", func() {
Expect(err).To(BeNil())
Expect(result).To(Equal(ctrl.Result{}))
})

By("finishing the creation once the job is finished", func() {
Expand Down
92 changes: 72 additions & 20 deletions controllers/promise_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"slices"
"strings"
"sync"
"time"

"github.com/syntasso/kratix/lib/objectutil"
Expand Down Expand Up @@ -69,6 +70,10 @@ type PromiseReconciler struct {
StartedDynamicControllers map[string]*DynamicResourceRequestController
RestartManager func()
NumberOfJobsToKeep int

ScheduledReconciliation map[string]metav1.Time

mutex sync.Mutex
}

const (
Expand All @@ -77,6 +82,7 @@ const (
dynamicControllerDependantResourcesCleanupFinalizer = v1alpha1.KratixPrefix + "dynamic-controller-dependant-resources-cleanup"
crdCleanupFinalizer = v1alpha1.KratixPrefix + "api-crd-cleanup"
dependenciesCleanupFinalizer = v1alpha1.KratixPrefix + "dependencies-cleanup"
lastUpdatedAtAnnotation = v1alpha1.KratixPrefix + "last-updated-at"

requirementStateInstalled = "Requirement installed"
requirementStateNotInstalled = "Requirement not installed"
Expand All @@ -95,9 +101,9 @@ var (
// fastRequeue can be used whenever we want to quickly requeue, and we don't expect
// an error to occur. Example: we delete a resource, we then requeue
// to check it's been deleted. Here we can use a fastRequeue instead of a defaultRequeue
fastRequeue = ctrl.Result{RequeueAfter: 1 * time.Second}
defaultRequeue = ctrl.Result{RequeueAfter: 5 * time.Second}
slowRequeue = ctrl.Result{RequeueAfter: 15 * time.Second}
fastRequeue = ctrl.Result{RequeueAfter: 5 * time.Second}
defaultRequeue = ctrl.Result{RequeueAfter: 15 * time.Second}
slowRequeue = ctrl.Result{RequeueAfter: 60 * time.Second}
)

//+kubebuilder:rbac:groups=platform.kratix.io,resources=promises,verbs=get;list;watch;create;update;patch;delete
Expand Down Expand Up @@ -152,9 +158,16 @@ func (r *PromiseReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
//Set status to unavailable, at the end of this function we set it to
//available. If at anytime we return early, it persisted as unavailable
promise.Status.Status = v1alpha1.PromiseStatusUnavailable
updated, err := r.ensureRequiredPromiseStatusIsUpToDate(ctx, promise)
if err != nil || updated {
return ctrl.Result{}, err
requirementsChanged := r.hasPromiseRequirementsChanged(ctx, promise)

scheduledReconciliation := promise.Status.LastAvailableTime != nil && time.Since(promise.Status.LastAvailableTime.Time) > DefaultReconciliationInterval
if (requirementsChanged || scheduledReconciliation) && originalStatus == v1alpha1.PromiseStatusAvailable {
err := r.Client.Status().Update(ctx, promise)
if err != nil {
return ctrl.Result{}, err
}
logger.Info("Requeueing: requirements changed or scheduled reconciliation")
return ctrl.Result{}, nil
}

//TODO handle removing finalizer
Expand Down Expand Up @@ -204,13 +217,14 @@ func (r *PromiseReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
return addFinalizers(opts, promise, []string{dependenciesCleanupFinalizer})
}

requeue, err = r.reconcileDependenciesAndPromiseWorkflows(opts, promise)
ctrlResult, err := r.reconcileDependenciesAndPromiseWorkflows(opts, promise)
if err != nil {
return ctrl.Result{}, err
}

if requeue != nil {
return *requeue, nil
if ctrlResult != nil {
logger.Info("stopping reconciliation while reconciling dependencies")
return *ctrlResult, nil
}

if promise.ContainsAPI() {
Expand Down Expand Up @@ -240,10 +254,12 @@ func (r *PromiseReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct

if promise.GetGeneration() != promise.Status.ObservedGeneration {
if promise.GetGeneration() != 1 {
logger.Info("reconciling all RRs")
if err := r.reconcileAllRRs(rrGVK); err != nil {
return ctrl.Result{}, err
}
}
logger.Info("updating observed generation", "from", promise.Status.ObservedGeneration, "to", promise.GetGeneration())
promise.Status.ObservedGeneration = promise.GetGeneration()
return ctrl.Result{}, r.Client.Status().Update(ctx, promise)
}
Expand All @@ -252,24 +268,38 @@ func (r *PromiseReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
}

if originalStatus == v1alpha1.PromiseStatusAvailable {
return ctrl.Result{}, nil
return r.nextReconciliation(promise, logger)
}

logger.Info("Promise status being set to Available")
promise.Status.Status = v1alpha1.PromiseStatusAvailable
promise.Status.LastAvailableTime = &metav1.Time{Time: time.Now()}
return ctrl.Result{}, r.Client.Status().Update(ctx, promise)
}

func (r *PromiseReconciler) ensureRequiredPromiseStatusIsUpToDate(ctx context.Context, promise *v1alpha1.Promise) (bool, error) {
func (r *PromiseReconciler) nextReconciliation(promise *v1alpha1.Promise, logger logr.Logger) (ctrl.Result, error) {
r.mutex.Lock()
defer r.mutex.Unlock()

scheduled, found := r.ScheduledReconciliation[promise.GetName()]
if !found || time.Now().After(scheduled.Time) {
next := metav1.NewTime(time.Now().Add(DefaultReconciliationInterval))
r.ScheduledReconciliation[promise.GetName()] = next

logger.Info("Scheduling next reconciliation", "scheduledReconciliationTimestamp", next.Time.String())
return ctrl.Result{RequeueAfter: DefaultReconciliationInterval}, nil
}
logger.Info("Reconciliation already scheduled", "scheduledReconciliationTimestamp", scheduled.Time.String(), "labels", promise.Labels)
return ctrl.Result{}, nil
}

func (r *PromiseReconciler) hasPromiseRequirementsChanged(ctx context.Context, promise *v1alpha1.Promise) bool {
latestCondition, latestRequirements := r.generateStatusAndMarkRequirements(ctx, promise)

requirementsFieldChanged := updateRequirementsStatusOnPromise(promise, promise.Status.RequiredPromises, latestRequirements)
conditionsFieldChanged := updateConditionOnPromise(promise, latestCondition)

if conditionsFieldChanged || requirementsFieldChanged {
return true, r.Client.Status().Update(ctx, promise)
}

return false, nil
return conditionsFieldChanged || requirementsFieldChanged
}

func updateConditionOnPromise(promise *v1alpha1.Promise, latestCondition metav1.Condition) bool {
Expand Down Expand Up @@ -377,6 +407,19 @@ func (r *PromiseReconciler) reconcileDependenciesAndPromiseWorkflows(o opts, pro
}

o.logger.Info("Promise contains workflows.promise.configure, reconciling workflows")
pipelineCompletedCondition := promise.GetCondition(string(resourceutil.PipelineCompletedCondition))
forcePipelineRun := pipelineCompletedCondition != nil && pipelineCompletedCondition.Status == "True" && time.Since(pipelineCompletedCondition.LastTransitionTime.Time) > DefaultReconciliationInterval
if forcePipelineRun {
o.logger.Info("Pipeline completed too long ago... forcing the reconciliation", "lastTransitionTime", pipelineCompletedCondition.LastTransitionTime.Time.String())
if promise.Labels == nil {
promise.Labels = make(map[string]string)
}
promise.Labels[resourceutil.ManualReconciliationLabel] = "true"
if err := r.Client.Update(o.ctx, promise); err != nil {
return &ctrl.Result{}, err
}
}

unstructuredPromise, err := promise.ToUnstructured()
if err != nil {
return nil, err
Expand All @@ -389,19 +432,20 @@ func (r *PromiseReconciler) reconcileDependenciesAndPromiseWorkflows(o opts, pro

jobOpts := workflow.NewOpts(o.ctx, o.client, o.logger, unstructuredPromise, pipelineResources, "promise", r.NumberOfJobsToKeep)

requeue, err := reconcileConfigure(jobOpts)
abort, err := reconcileConfigure(jobOpts)
if err != nil {
return nil, err
}

if requeue {
return &defaultRequeue, nil
if abort {
return &ctrl.Result{}, nil
}

return nil, nil
}

func (r *PromiseReconciler) reconcileAllRRs(rrGVK schema.GroupVersionKind) error {
//label all rr with manual reocnciliation
//label all rr with manual reconciliation
rrs := &unstructured.UnstructuredList{}
rrListGVK := rrGVK
rrListGVK.Kind = rrListGVK.Kind + "List"
Expand Down Expand Up @@ -976,6 +1020,14 @@ func (r *PromiseReconciler) applyWorkForStaticDependencies(o opts, promise *v1al
} else {
op = "updated"
existingWork.Spec = work.Spec

ann := existingWork.GetAnnotations()
if ann == nil {
ann = map[string]string{}
}
ann[lastUpdatedAtAnnotation] = time.Now().Local().String()
existingWork.SetAnnotations(ann)

err = r.Client.Update(o.ctx, existingWork)
}

Expand Down
Loading

0 comments on commit b0e3c24

Please sign in to comment.