From d150f37390ee4eec39e973661930d89fdf2bb911 Mon Sep 17 00:00:00 2001 From: Jake Klein Date: Thu, 13 Jul 2023 12:50:14 +0100 Subject: [PATCH 1/3] Delete pipelines (#18) feat: trigger delete pipeline on resource deletion Co-authored-by: Derik Evangelista Co-authored-by: Abby Bangser --- Makefile | 2 +- api/v1alpha1/promise_types.go | 2 + api/v1alpha1/zz_generated.deepcopy.go | 7 + .../kratix/crds/platform_kratix_io_crds.yaml | 5 + .../bases/platform.kratix.io_promises.yaml | 5 + config/manager/kustomization.yaml | 14 +- config/samples/redis/redis-promise.yaml | 10 + .../dynamic_resource_request_controller.go | 278 +++++++----------- ...ynamic_resource_request_controller_test.go | 110 +++++-- controllers/promise_controller.go | 101 ++++--- distribution/kratix.yaml | 5 + lib/pipeline/configure.go | 140 +++++++++ lib/pipeline/delete.go | 76 +++++ lib/pipeline/shared.go | 52 ++++ test/system/assets/bash-promise/promise.yaml | 36 +++ test/system/system_test.go | 26 +- 16 files changed, 622 insertions(+), 247 deletions(-) create mode 100644 lib/pipeline/configure.go create mode 100644 lib/pipeline/delete.go create mode 100644 lib/pipeline/shared.go diff --git a/Makefile b/Makefile index 49aaab38..52eca403 100644 --- a/Makefile +++ b/Makefile @@ -126,7 +126,7 @@ ifeq ($(shell uname -sm),Darwin arm64) endif .PHONY: test test: manifests generate fmt vet envtest ## Run tests. - KUBEBUILDER_ASSETS="$(shell $(ENVTEST) $(ARCH_FLAG) use $(ENVTEST_K8S_VERSION) -p path)" WC_IMG=${WC_IMG} TEST_PROMISE_CONTROLLER_POD_IDENTIFIER_UUID=12345 ACK_GINKGO_DEPRECATIONS=1.16.4 go run github.com/onsi/ginkgo/ginkgo -r --coverprofile cover.out --skipPackage=system + KUBEBUILDER_ASSETS="$(shell $(ENVTEST) $(ARCH_FLAG) use $(ENVTEST_K8S_VERSION) -p path)" WC_IMG=${WC_IMG} ACK_GINKGO_DEPRECATIONS=1.16.4 go run github.com/onsi/ginkgo/ginkgo -r --coverprofile cover.out --skipPackage=system ENVTEST = $(shell pwd)/bin/setup-envtest .PHONY: envtest diff --git a/api/v1alpha1/promise_types.go b/api/v1alpha1/promise_types.go index 8b8dc625..dff44f61 100644 --- a/api/v1alpha1/promise_types.go +++ b/api/v1alpha1/promise_types.go @@ -57,6 +57,8 @@ type Workflows struct { type WorkflowTriggers struct { // +kubebuilder:pruning:PreserveUnknownFields Configure []unstructured.Unstructured `json:"configure,omitempty"` + // +kubebuilder:pruning:PreserveUnknownFields + Delete []unstructured.Unstructured `json:"delete,omitempty"` } // Resources represents the manifest workload to be deployed on workers diff --git a/api/v1alpha1/zz_generated.deepcopy.go b/api/v1alpha1/zz_generated.deepcopy.go index b72cf1c8..21cccc1b 100644 --- a/api/v1alpha1/zz_generated.deepcopy.go +++ b/api/v1alpha1/zz_generated.deepcopy.go @@ -783,6 +783,13 @@ func (in *WorkflowTriggers) DeepCopyInto(out *WorkflowTriggers) { (*in)[i].DeepCopyInto(&(*out)[i]) } } + if in.Delete != nil { + in, out := &in.Delete, &out.Delete + *out = make([]unstructured.Unstructured, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new WorkflowTriggers. diff --git a/charts/kratix/crds/platform_kratix_io_crds.yaml b/charts/kratix/crds/platform_kratix_io_crds.yaml index 14ff1cc4..d5f16af9 100644 --- a/charts/kratix/crds/platform_kratix_io_crds.yaml +++ b/charts/kratix/crds/platform_kratix_io_crds.yaml @@ -244,6 +244,11 @@ spec: type: object type: array x-kubernetes-preserve-unknown-fields: true + delete: + items: + type: object + type: array + x-kubernetes-preserve-unknown-fields: true type: object type: object type: object diff --git a/config/crd/bases/platform.kratix.io_promises.yaml b/config/crd/bases/platform.kratix.io_promises.yaml index e9ea78d7..77fbe874 100644 --- a/config/crd/bases/platform.kratix.io_promises.yaml +++ b/config/crd/bases/platform.kratix.io_promises.yaml @@ -66,6 +66,11 @@ spec: type: object type: array x-kubernetes-preserve-unknown-fields: true + delete: + items: + type: object + type: array + x-kubernetes-preserve-unknown-fields: true type: object type: object type: object diff --git a/config/manager/kustomization.yaml b/config/manager/kustomization.yaml index 4e5d729a..8e95ec38 100644 --- a/config/manager/kustomization.yaml +++ b/config/manager/kustomization.yaml @@ -2,7 +2,7 @@ apiVersion: kustomize.config.k8s.io/v1beta1 kind: Kustomization resources: - - manager.yaml +- manager.yaml generatorOptions: disableNameSuffixHash: true @@ -10,12 +10,12 @@ generatorOptions: # The wc-img-config config map drives the WC_IMG env var, which is used by the promise # controller as the last container in the workflow Pipeline pod spec. configMapGenerator: - - files: - - controller_manager_config.yaml - name: manager-config - - envs: - - wc-img-config.properties - name: wc-img-config +- files: + - controller_manager_config.yaml + name: manager-config +- envs: + - wc-img-config.properties + name: wc-img-config images: - name: controller diff --git a/config/samples/redis/redis-promise.yaml b/config/samples/redis/redis-promise.yaml index 1af97d8b..74065124 100644 --- a/config/samples/redis/redis-promise.yaml +++ b/config/samples/redis/redis-promise.yaml @@ -4495,3 +4495,13 @@ spec: containers: - image: syntasso/kustomize-redis name: kustomize-redis + delete: + - apiVersion: platform.kratix.io/v1alpha1 + kind: Pipeline + metadata: + name: instance-delete + namespace: default + spec: + containers: + - image: syntasso/kustomize-redis + name: kustomize-redis diff --git a/controllers/dynamic_resource_request_controller.go b/controllers/dynamic_resource_request_controller.go index 91c6b133..cb63a853 100644 --- a/controllers/dynamic_resource_request_controller.go +++ b/controllers/dynamic_resource_request_controller.go @@ -18,27 +18,23 @@ package controllers import ( "context" - "os" - "strconv" "time" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "fmt" - "strings" "github.com/go-logr/logr" "github.com/syntasso/kratix/api/v1alpha1" + "github.com/syntasso/kratix/lib/pipeline" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/apimachinery/pkg/selection" - "k8s.io/apimachinery/pkg/util/uuid" clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1" conditionsutil "sigs.k8s.io/cluster-api/util/conditions" ctrl "sigs.k8s.io/controller-runtime" @@ -47,24 +43,26 @@ import ( ) const ( - workFinalizer = finalizerPrefix + "work-cleanup" - workflowsFinalizer = finalizerPrefix + "workflows-cleanup" + workFinalizer = finalizerPrefix + "work-cleanup" + workflowsFinalizer = finalizerPrefix + "workflows-cleanup" + deleteWorkflowsFinalizer = finalizerPrefix + "delete-workflows" ) -var rrFinalizers = []string{workFinalizer, workflowsFinalizer} +var rrFinalizers = []string{workFinalizer, workflowsFinalizer, deleteWorkflowsFinalizer} type dynamicResourceRequestController struct { //use same naming conventions as other controllers - Client client.Client - gvk *schema.GroupVersionKind - scheme *runtime.Scheme - promiseIdentifier string - promiseScheduling []v1alpha1.SchedulingConfig - workflows []v1alpha1.Pipeline - log logr.Logger - finalizers []string - uid string - enabled *bool + Client client.Client + gvk *schema.GroupVersionKind + scheme *runtime.Scheme + promiseIdentifier string + promiseScheduling []v1alpha1.SchedulingConfig + configurePipelines []v1alpha1.Pipeline + deletePipelines []v1alpha1.Pipeline + log logr.Logger + finalizers []string + uid string + enabled *bool } //+kubebuilder:rbac:groups="",resources=pods,verbs=create;list;watch;delete @@ -97,13 +95,18 @@ func (r *dynamicResourceRequestController) Reconcile(ctx context.Context, req ct } // Reconcile necessary finalizers - if finalizersAreMissing(rr, []string{workFinalizer, workflowsFinalizer}) { - return addFinalizers(ctx, r.Client, rr, []string{workFinalizer, workflowsFinalizer}, logger) + if finalizersAreMissing(rr, []string{workFinalizer, workflowsFinalizer, deleteWorkflowsFinalizer}) { + return addFinalizers(ctx, r.Client, rr, []string{workFinalizer, workflowsFinalizer, deleteWorkflowsFinalizer}, logger) } //check if the pipeline has already been created. If it has exit out. All the lines //below this call are for the one-time creation of the pipeline. - if r.pipelinePodHasBeenCreated(resourceRequestIdentifier) { + created, err := r.configurePipelinePodHasBeenCreated(resourceRequestIdentifier, logger) + if err != nil { + return ctrl.Result{}, err + } + + if created { logger.Info("Cannot execute update on pre-existing pipeline for Promise resource request " + resourceRequestIdentifier) return ctrl.Result{}, nil } @@ -116,59 +119,7 @@ func (r *dynamicResourceRequestController) Reconcile(ctx context.Context, req ct return ctrl.Result{}, err } - volumes := []v1.Volume{ - {Name: "metadata", VolumeSource: v1.VolumeSource{EmptyDir: &v1.EmptyDirVolumeSource{}}}, - { - Name: "promise-scheduling", - VolumeSource: v1.VolumeSource{ - ConfigMap: &v1.ConfigMapVolumeSource{ - LocalObjectReference: v1.LocalObjectReference{ - Name: "scheduling-" + r.promiseIdentifier, - }, - Items: []v1.KeyToPath{{ - Key: "scheduling", - Path: "promise-scheduling", - }}, - }, - }, - }, - } - initContainers, pipelineVolumes := r.pipelineInitContainers(resourceRequestIdentifier, req) - volumes = append(volumes, pipelineVolumes...) - - rrKind := fmt.Sprintf("%s.%s", strings.ToLower(r.gvk.Kind), r.gvk.Group) - pod := v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: "configure-pipeline-" + r.promiseIdentifier + "-" + getShortUuid(), - Namespace: "default", - Labels: map[string]string{ - "kratix-promise-id": r.promiseIdentifier, - "kratix-promise-resource-request-id": resourceRequestIdentifier, - }, - }, - Spec: v1.PodSpec{ - RestartPolicy: v1.RestartPolicyOnFailure, - ServiceAccountName: r.promiseIdentifier + "-promise-pipeline", - Containers: []v1.Container{ - { - Name: "status-writer", - Image: os.Getenv("WC_IMG"), - Command: []string{"sh", "-c", "update-status"}, - Env: []v1.EnvVar{ - {Name: "RR_KIND", Value: rrKind}, - {Name: "RR_NAME", Value: req.Name}, - {Name: "RR_NAMESPACE", Value: req.Namespace}, - }, - VolumeMounts: []v1.VolumeMount{{ - MountPath: "/work-creator-files/metadata", - Name: "metadata", - }}, - }, - }, - InitContainers: initContainers, - Volumes: volumes, - }, - } + pod := pipeline.NewConfigurePipeline(rr, r.configurePipelines, resourceRequestIdentifier, r.promiseIdentifier) logger.Info("Creating Pipeline for Promise resource request: " + resourceRequestIdentifier + ". The pipeline will now execute...") err = r.Client.Create(ctx, &pod) @@ -181,30 +132,42 @@ func (r *dynamicResourceRequestController) Reconcile(ctx context.Context, req ct return ctrl.Result{}, nil } -func (r *dynamicResourceRequestController) pipelinePodHasBeenCreated(resourceRequestIdentifier string) bool { - isPromise, _ := labels.NewRequirement("kratix-promise-resource-request-id", selection.Equals, []string{resourceRequestIdentifier}) - selector := labels.NewSelector(). - Add(*isPromise) - - listOps := &client.ListOptions{ - Namespace: "default", - LabelSelector: selector, - } - - ol := &v1.PodList{} - err := r.Client.List(context.Background(), ol, listOps) - if err != nil { - fmt.Println(err.Error()) - return false - } - return len(ol.Items) > 0 -} - func (r *dynamicResourceRequestController) deleteResources(ctx context.Context, resourceRequest *unstructured.Unstructured, resourceRequestIdentifier string, logger logr.Logger) (ctrl.Result, error) { if finalizersAreDeleted(resourceRequest, rrFinalizers) { return ctrl.Result{}, nil } + if controllerutil.ContainsFinalizer(resourceRequest, deleteWorkflowsFinalizer) { + existingDeletePipelinePod, err := r.getDeletePipelinePod(ctx, resourceRequestIdentifier, logger) + if err != nil { + return defaultRequeue, err + } + + if existingDeletePipelinePod == nil { + deletePipelinePod := pipeline.NewDeletePipeline(resourceRequest, r.deletePipelines, resourceRequestIdentifier, r.promiseIdentifier) + logger.Info("Creating Delete Pipeline for Promise resource request: " + resourceRequestIdentifier + ". The pipeline will now execute...") + err = r.Client.Create(ctx, &deletePipelinePod) + if err != nil { + logger.Error(err, "Error creating Pod") + y, _ := yaml.Marshal(&deletePipelinePod) + logger.Error(err, string(y)) + return ctrl.Result{}, err + } + return defaultRequeue, nil + } + + logger.Info("Checking status of Delete Pipeline for Promise resource request: " + resourceRequestIdentifier) + if pipelineIsComplete(*existingDeletePipelinePod) { + logger.Info("Delete Pipeline Completed for Promise resource request: " + resourceRequestIdentifier) + controllerutil.RemoveFinalizer(resourceRequest, deleteWorkflowsFinalizer) + if err := r.Client.Update(ctx, resourceRequest); err != nil { + return ctrl.Result{}, err + } + } + + return fastRequeue, nil + } + if controllerutil.ContainsFinalizer(resourceRequest, workFinalizer) { err := r.deleteWork(ctx, resourceRequest, resourceRequestIdentifier, workFinalizer, logger) if err != nil { @@ -224,12 +187,60 @@ func (r *dynamicResourceRequestController) deleteResources(ctx context.Context, return fastRequeue, nil } +func (r *dynamicResourceRequestController) getDeletePipelinePod(ctx context.Context, resourceRequestIdentifier string, logger logr.Logger) (*v1.Pod, error) { + pods, err := r.getPodsWithLabels(pipeline.DeletePipelineLabels(resourceRequestIdentifier, r.promiseIdentifier), logger) + if err != nil || len(pods) == 0 { + return nil, err + } + return &pods[0], nil +} + +func (r *dynamicResourceRequestController) configurePipelinePodHasBeenCreated(resourceRequestIdentifier string, logger logr.Logger) (bool, error) { + pods, err := r.getPodsWithLabels(pipeline.ConfigurePipelineLabels(resourceRequestIdentifier, r.promiseIdentifier), logger) + if err != nil { + return false, err + } + return len(pods) > 0, nil +} + +func (r *dynamicResourceRequestController) getPodsWithLabels(podLabels map[string]string, logger logr.Logger) ([]v1.Pod, error) { + selectorLabels := labels.FormatLabels(podLabels) + selector, err := labels.Parse(selectorLabels) + + if err != nil { + return nil, fmt.Errorf("error parsing labels %v: %w", podLabels, err) + } + + listOps := &client.ListOptions{ + Namespace: "default", + LabelSelector: selector, + } + + pods := &v1.PodList{} + err = r.Client.List(context.Background(), pods, listOps) + if err != nil { + logger.Error(err, "error listing pods", "selectors", selector.String()) + return nil, err + } + return pods.Items, nil +} + +func pipelineIsComplete(pod v1.Pod) bool { + for _, condition := range pod.Status.Conditions { + if condition.Type == "Initialized" { + return condition.Status == "True" && condition.Reason == "PodCompleted" + } + } + return false +} + func (r *dynamicResourceRequestController) deleteWork(ctx context.Context, resourceRequest *unstructured.Unstructured, workName string, finalizer string, logger logr.Logger) error { work := &v1alpha1.Work{} err := r.Client.Get(ctx, types.NamespacedName{ Namespace: "default", Name: workName, }, work) + if err != nil { if errors.IsNotFound(err) { // only remove finalizer at this point because deletion success is guaranteed @@ -269,10 +280,7 @@ func (r *dynamicResourceRequestController) deletePipeline(ctx context.Context, r Kind: "Pod", } - podLabels := map[string]string{ - "kratix-promise-id": r.promiseIdentifier, - "kratix-promise-resource-request-id": resourceRequestIdentifier, - } + podLabels := pipeline.SharedLabels(resourceRequestIdentifier, r.promiseIdentifier) resourcesRemaining, err := deleteAllResourcesWithKindMatchingLabel(ctx, r.Client, podGVK, podLabels, logger) if err != nil { @@ -289,15 +297,6 @@ func (r *dynamicResourceRequestController) deletePipeline(ctx context.Context, r return nil } -func getShortUuid() string { - envUuid, present := os.LookupEnv("TEST_PROMISE_CONTROLLER_POD_IDENTIFIER_UUID") - if present { - return envUuid - } else { - return string(uuid.NewUUID()[0:5]) - } -} - func (r *dynamicResourceRequestController) setPipelineConditionToNotCompleted(ctx context.Context, rr *unstructured.Unstructured, logger logr.Logger) error { setter := conditionsutil.UnstructuredSetter(rr) getter := conditionsutil.UnstructuredGetter(rr) @@ -321,68 +320,3 @@ func (r *dynamicResourceRequestController) setPipelineConditionToNotCompleted(ct } return nil } - -func (r *dynamicResourceRequestController) pipelineInitContainers(rrID string, req ctrl.Request) ([]v1.Container, []v1.Volume) { - resourceKindNameNamespace := fmt.Sprintf("%s.%s %s --namespace %s", strings.ToLower(r.gvk.Kind), r.gvk.Group, req.Name, req.Namespace) - metadataVolumeMount := v1.VolumeMount{MountPath: "/metadata", Name: "metadata"} - - resourceRequestCommand := fmt.Sprintf("kubectl get %s -oyaml > /output/object.yaml", resourceKindNameNamespace) - reader := v1.Container{ - Name: "reader", - Image: "bitnami/kubectl:1.20.10", - Command: []string{"sh", "-c", resourceRequestCommand}, - VolumeMounts: []v1.VolumeMount{ - { - MountPath: "/output", - Name: "vol0", - }, - }, - } - - volumes := []v1.Volume{{Name: "vol0", VolumeSource: v1.VolumeSource{EmptyDir: &v1.EmptyDirVolumeSource{}}}} - - containers := []v1.Container{reader} - if len(r.workflows) > 0 { - //TODO: We only support 1 workflow for now - for i, c := range r.workflows[0].Spec.Containers { - volumes = append(volumes, v1.Volume{ - Name: "vol" + strconv.Itoa(i+1), - VolumeSource: v1.VolumeSource{EmptyDir: &v1.EmptyDirVolumeSource{}}, - }) - containers = append(containers, v1.Container{ - Name: c.Name, - Image: c.Image, - VolumeMounts: []v1.VolumeMount{ - metadataVolumeMount, - {Name: "vol" + strconv.Itoa(i), MountPath: "/input"}, - {Name: "vol" + strconv.Itoa(i+1), MountPath: "/output"}, - }, - }) - } - } - - workCreatorCommand := fmt.Sprintf("./work-creator -identifier %s -input-directory /work-creator-files", rrID) - writer := v1.Container{ - Name: "work-writer", - Image: os.Getenv("WC_IMG"), - Command: []string{"sh", "-c", workCreatorCommand}, - VolumeMounts: []v1.VolumeMount{ - { - MountPath: "/work-creator-files/input", - Name: "vol" + strconv.Itoa(len(containers)-1), - }, - { - MountPath: "/work-creator-files/metadata", - Name: "metadata", - }, - { - MountPath: "/work-creator-files/kratix-system", - Name: "promise-scheduling", - }, - }, - } - - containers = append(containers, writer) - - return containers, volumes -} diff --git a/controllers/promise_and_dynamic_resource_request_controller_test.go b/controllers/promise_and_dynamic_resource_request_controller_test.go index a22ae703..0c7a0920 100644 --- a/controllers/promise_and_dynamic_resource_request_controller_test.go +++ b/controllers/promise_and_dynamic_resource_request_controller_test.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "io/ioutil" + "strings" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" @@ -177,6 +178,44 @@ var _ = Context("Promise Reconciler", func() { err = k8sClient.Delete(context.Background(), promiseCR) Expect(err).NotTo(HaveOccurred()) + //delete pipeline should be created + deletePipeline := v1.Pod{} + Eventually(func() bool { + pods := &v1.PodList{} + err := k8sClient.List(context.Background(), pods) + if err != nil { + return false + } + if len(pods.Items) != 2 { + return false + } + for _, pod := range pods.Items { + if strings.HasPrefix(pod.Name, "delete-") { + deletePipeline = pod + return true + } + } + return false + }, timeout, interval).Should(BeTrue(), "Expected the delete pipeline to be created") + + //update the pod to be marked as complete + // status: + // conditions: + // - lastProbeTime: null + // lastTransitionTime: "2023-07-11T15:20:37Z" + // reason: PodCompleted + // status: "True" + // type: Initialized + deletePipeline.Status.Conditions = []v1.PodCondition{ + { + Status: "True", + Type: "Initialized", + Reason: "PodCompleted", + }, + } + err = k8sClient.Status().Update(context.Background(), &deletePipeline) + Expect(err).NotTo(HaveOccurred()) + By("also deleting the resource requests") Eventually(func() int { rrList := &unstructured.UnstructuredList{} @@ -257,15 +296,13 @@ var _ = Context("Promise Reconciler", func() { }) Describe("Lifecycle of a Redis Custom Resource", func() { - createdPod := v1.Pod{} - - redisRequest := &unstructured.Unstructured{} + var ( + redisRequest = &unstructured.Unstructured{} - expectedName := types.NamespacedName{ - //The name of the pod is generated dynamically by the Promise Controller. For testing purposes, we set a TEST_PROMISE_CONTROLLER_POD_IDENTIFIER_UUID via an environment variable in the Makefile to make the name deterministic - Name: "configure-pipeline-redis-promise-default-12345", - Namespace: "default", - } + configurePodNamespacedName = types.NamespacedName{ + Namespace: "default", + } + ) BeforeEach(func() { yamlFile, err := ioutil.ReadFile("../config/samples/redis/redis-resource-request.yaml") @@ -285,12 +322,16 @@ var _ = Context("Promise Reconciler", func() { var timeout = "30s" var interval = "3s" Eventually(func() string { - err := k8sClient.Get(context.Background(), expectedName, &createdPod) + pods := &v1.PodList{} + err := k8sClient.List(context.Background(), pods) if err != nil { - fmt.Println(err.Error()) return "" } - return createdPod.Spec.Containers[0].Name + if len(pods.Items) != 1 { + return "" + } + configurePodNamespacedName.Name = pods.Items[0].Name + return pods.Items[0].Spec.Containers[0].Name }, timeout, interval).Should(Equal("status-writer")) By("setting the finalizer on the resource") @@ -303,7 +344,7 @@ var _ = Context("Promise Reconciler", func() { return nil } return createdRedisRequest.GetFinalizers() - }, timeout, interval).Should(ConsistOf("kratix.io/work-cleanup", "kratix.io/workflows-cleanup")) + }, timeout, interval).Should(ConsistOf("kratix.io/work-cleanup", "kratix.io/delete-workflows", "kratix.io/workflows-cleanup")) }) It("Takes no action on update", func() { @@ -356,6 +397,45 @@ var _ = Context("Promise Reconciler", func() { err = k8sClient.Delete(context.Background(), redisRequest) Expect(err).ToNot(HaveOccurred()) + //delete pipeline should be created + deletePipeline := v1.Pod{} + Eventually(func() bool { + pods := &v1.PodList{} + err := k8sClient.List(context.Background(), pods) + if err != nil { + return false + } + if len(pods.Items) != 2 { + return false + } + for _, pod := range pods.Items { + if strings.HasPrefix(pod.Name, "delete-") { + deletePipeline = pod + return true + } + } + return false + }, timeout, interval).Should(BeTrue(), "Expected the delete pipeline to be created") + + deletePipeline.Status.Conditions = []v1.PodCondition{ + { + Status: "True", + Type: "Initialized", + Reason: "PodCompleted", + }, + } + err = k8sClient.Status().Update(context.Background(), &deletePipeline) + Expect(err).NotTo(HaveOccurred()) + + Eventually(func() bool { + pods := &v1.PodList{} + err := k8sClient.List(context.Background(), pods) + if err != nil { + return false + } + return len(pods.Items) == 0 + }, timeout, interval).Should(BeTrue(), "Expected the delete and configure pipeline to be deleted") + Eventually(func() bool { work = &platformv1alpha1.Work{} work.Name = "redis-promise-default-default-opstree-redis" @@ -364,12 +444,6 @@ var _ = Context("Promise Reconciler", func() { return errors.IsNotFound(err) }, timeout, interval).Should(BeTrue(), "Expected the Work to be deleted") - Eventually(func() bool { - pipeline := &v1.Pod{} - err := k8sClient.Get(context.Background(), expectedName, pipeline) - return errors.IsNotFound(err) - }, timeout, interval).Should(BeTrue(), "Expected the request pipeline to be deleted") - Eventually(func() bool { createdRedisRequest := &unstructured.Unstructured{} createdRedisRequest.SetGroupVersionKind(redisRequest.GroupVersionKind()) diff --git a/controllers/promise_controller.go b/controllers/promise_controller.go index 06122431..a1d62436 100644 --- a/controllers/promise_controller.go +++ b/controllers/promise_controller.go @@ -137,11 +137,11 @@ func (r *PromiseReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct return ctrl.Result{}, nil } - workflows, err := r.generateWorkflows(promise, logger) + configurePipelines, deletePipelines, err := r.generatePipelines(promise, logger) if err != nil { return ctrl.Result{}, err } - return ctrl.Result{}, r.startDynamicController(promise, rrGVK, workflows) + return ctrl.Result{}, r.startDynamicController(promise, rrGVK, configurePipelines, deletePipelines) } func getDesiredFinalizers(promise *v1alpha1.Promise) []string { @@ -151,21 +151,22 @@ func getDesiredFinalizers(promise *v1alpha1.Promise) []string { return promiseFinalizers } -func (r *PromiseReconciler) startDynamicController(promise *v1alpha1.Promise, rrGVK schema.GroupVersionKind, workflows []v1alpha1.Pipeline) error { +func (r *PromiseReconciler) startDynamicController(promise *v1alpha1.Promise, rrGVK schema.GroupVersionKind, configurePipelines, deletePipelines []v1alpha1.Pipeline) error { //temporary fix until https://github.com/kubernetes-sigs/controller-runtime/issues/1884 is resolved //once resolved, delete dynamic controller rather than disable enabled := true r.DynamicControllers[string(promise.GetUID())] = &enabled dynamicResourceRequestController := &dynamicResourceRequestController{ - Client: r.Manager.GetClient(), - scheme: r.Manager.GetScheme(), - gvk: &rrGVK, - promiseIdentifier: promise.GetIdentifier(), - promiseScheduling: promise.Spec.Scheduling, - workflows: workflows, - log: r.Log, - uid: string(promise.GetUID())[0:5], - enabled: &enabled, + Client: r.Manager.GetClient(), + scheme: r.Manager.GetScheme(), + gvk: &rrGVK, + promiseIdentifier: promise.GetIdentifier(), + promiseScheduling: promise.Spec.Scheduling, + configurePipelines: configurePipelines, + deletePipelines: deletePipelines, + log: r.Log, + uid: string(promise.GetUID())[0:5], + enabled: &enabled, } unstructuredCRD := &unstructured.Unstructured{} @@ -633,39 +634,55 @@ func (r *PromiseReconciler) createWorkResourceForDependencies(ctx context.Contex return nil } -func (r *PromiseReconciler) generateWorkflows(promise *v1alpha1.Promise, logger logr.Logger) ([]v1alpha1.Pipeline, error) { - var pipelines []v1alpha1.Pipeline +func (r *PromiseReconciler) generatePipelines(promise *v1alpha1.Promise, logger logr.Logger) ([]v1alpha1.Pipeline, []v1alpha1.Pipeline, error) { + var configurePipelines []v1alpha1.Pipeline for _, pipeline := range promise.Spec.Workflows.Resource.Configure { - pipelineLogger := logger.WithValues( - "pipelineKind", pipeline.GetKind(), - "pipelineVersion", pipeline.GetAPIVersion(), - "pipelineName", pipeline.GetName()) - if pipeline.GetKind() == "Pipeline" && pipeline.GetAPIVersion() == "platform.kratix.io/v1alpha1" { - p := v1alpha1.Pipeline{} - jsonPipeline, err := pipeline.MarshalJSON() - logger.Info("json", "json", string(jsonPipeline)) - if err != nil { - // TODO test - pipelineLogger.Error(err, "Failed marshalling pipeline to json") - return nil, err - } - - err = json.Unmarshal(jsonPipeline, &p) - if err != nil { - // TODO test - pipelineLogger.Error(err, "Failed unmarshalling pipeline") - return nil, err - } - - pipelines = append(pipelines, p) - } else { - err := fmt.Errorf("unsupported pipeline %q (%s.%s)", - pipeline.GetName(), pipeline.GetKind(), pipeline.GetAPIVersion(), - ) - return nil, err + p, err := generatePipeline(pipeline, logger) + if err != nil { + return nil, nil, err } + configurePipelines = append(configurePipelines, p) + } + + var deletePipelines []v1alpha1.Pipeline + for _, pipeline := range promise.Spec.Workflows.Resource.Delete { + p, err := generatePipeline(pipeline, logger) + if err != nil { + return nil, nil, err + } + deletePipelines = append(deletePipelines, p) + } + + return configurePipelines, deletePipelines, nil +} + +func generatePipeline(pipeline unstructured.Unstructured, logger logr.Logger) (v1alpha1.Pipeline, error) { + pipelineLogger := logger.WithValues( + "pipelineKind", pipeline.GetKind(), + "pipelineVersion", pipeline.GetAPIVersion(), + "pipelineName", pipeline.GetName()) + + if pipeline.GetKind() == "Pipeline" && pipeline.GetAPIVersion() == "platform.kratix.io/v1alpha1" { + jsonPipeline, err := pipeline.MarshalJSON() + pipelineLogger.Info("json", "json", string(jsonPipeline)) + if err != nil { + // TODO test + pipelineLogger.Error(err, "Failed marshalling pipeline to json") + return v1alpha1.Pipeline{}, err + } + + p := v1alpha1.Pipeline{} + err = json.Unmarshal(jsonPipeline, &p) + if err != nil { + // TODO test + pipelineLogger.Error(err, "Failed unmarshalling pipeline") + return v1alpha1.Pipeline{}, err + } + + return p, nil } - return pipelines, nil + return v1alpha1.Pipeline{}, fmt.Errorf("unsupported pipeline %q (%s.%s)", + pipeline.GetName(), pipeline.GetKind(), pipeline.GetAPIVersion()) } diff --git a/distribution/kratix.yaml b/distribution/kratix.yaml index 541bb823..47f8fb74 100644 --- a/distribution/kratix.yaml +++ b/distribution/kratix.yaml @@ -283,6 +283,11 @@ spec: type: object type: array x-kubernetes-preserve-unknown-fields: true + delete: + items: + type: object + type: array + x-kubernetes-preserve-unknown-fields: true type: object type: object type: object diff --git a/lib/pipeline/configure.go b/lib/pipeline/configure.go new file mode 100644 index 00000000..80622450 --- /dev/null +++ b/lib/pipeline/configure.go @@ -0,0 +1,140 @@ +package pipeline + +import ( + "fmt" + "os" + "strconv" + "strings" + + platformv1alpha1 "github.com/syntasso/kratix/api/v1alpha1" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" +) + +func NewConfigurePipeline(rr *unstructured.Unstructured, pipelines []platformv1alpha1.Pipeline, resourceRequestIdentifier, promiseIdentifier string) v1.Pod { + volumes := metadataAndSchedulingVolumes(promiseIdentifier) + + initContainers, pipelineVolumes := configurePipelineInitContainers(rr, pipelines, resourceRequestIdentifier) + volumes = append(volumes, pipelineVolumes...) + + rrKind := fmt.Sprintf("%s.%s", strings.ToLower(rr.GetKind()), rr.GroupVersionKind().Group) + pod := v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: configurePipelineName(promiseIdentifier), + Namespace: "default", + Labels: ConfigurePipelineLabels(resourceRequestIdentifier, promiseIdentifier), + }, + Spec: v1.PodSpec{ + RestartPolicy: v1.RestartPolicyOnFailure, + ServiceAccountName: promiseIdentifier + "-promise-pipeline", + Containers: []v1.Container{ + { + Name: "status-writer", + Image: os.Getenv("WC_IMG"), + Command: []string{"sh", "-c", "update-status"}, + Env: []v1.EnvVar{ + {Name: "RR_KIND", Value: rrKind}, + {Name: "RR_NAME", Value: rr.GetName()}, + {Name: "RR_NAMESPACE", Value: rr.GetNamespace()}, + }, + VolumeMounts: []v1.VolumeMount{{ + MountPath: "/work-creator-files/metadata", + Name: "metadata", + }}, + }, + }, + InitContainers: initContainers, + Volumes: volumes, + }, + } + + return pod +} + +func configurePipelineInitContainers(rr *unstructured.Unstructured, pipelines []platformv1alpha1.Pipeline, rrID string) ([]v1.Container, []v1.Volume) { + metadataVolumeMount := v1.VolumeMount{MountPath: "/metadata", Name: "metadata"} + + readerContainer, readerVolume := readerContainerAndVolume(rr) + containers := []v1.Container{ + readerContainer, + } + volumes := []v1.Volume{ + readerVolume, // vol0 + } + + if len(pipelines) > 0 { + //TODO: We only support 1 workflow for now + for i, c := range pipelines[0].Spec.Containers { + volumes = append(volumes, v1.Volume{ + Name: "vol" + strconv.Itoa(i+1), + VolumeSource: v1.VolumeSource{EmptyDir: &v1.EmptyDirVolumeSource{}}, + }) + + containers = append(containers, v1.Container{ + Name: c.Name, + Image: c.Image, + VolumeMounts: []v1.VolumeMount{ + metadataVolumeMount, + {Name: "vol" + strconv.Itoa(i), MountPath: "/input"}, + {Name: "vol" + strconv.Itoa(i+1), MountPath: "/output"}, + }, + }) + } + } + + workCreatorCommand := fmt.Sprintf("./work-creator -identifier %s -input-directory /work-creator-files", rrID) + writer := v1.Container{ + Name: "work-writer", + Image: os.Getenv("WC_IMG"), + Command: []string{"sh", "-c", workCreatorCommand}, + VolumeMounts: []v1.VolumeMount{ + { + MountPath: "/work-creator-files/input", + Name: "vol" + strconv.Itoa(len(containers)-1), + }, + { + MountPath: "/work-creator-files/metadata", + Name: "metadata", + }, + { + MountPath: "/work-creator-files/kratix-system", + Name: "promise-scheduling", + }, + }, + } + + containers = append(containers, writer) + + return containers, volumes +} + +func metadataAndSchedulingVolumes(promiseIdentifier string) []v1.Volume { + return []v1.Volume{ + { + Name: "metadata", VolumeSource: v1.VolumeSource{EmptyDir: &v1.EmptyDirVolumeSource{}}, + }, + { + Name: "promise-scheduling", + VolumeSource: v1.VolumeSource{ + ConfigMap: &v1.ConfigMapVolumeSource{ + LocalObjectReference: v1.LocalObjectReference{ + Name: "scheduling-" + promiseIdentifier, + }, + Items: []v1.KeyToPath{{ + Key: "scheduling", + Path: "promise-scheduling", + }}, + }, + }, + }, + } +} + +func configurePipelineName(promiseIdentifier string) string { + return pipelineName("configure", promiseIdentifier) +} + +func ConfigurePipelineLabels(resourceRequestIdentifier, promiseIdentifier string) map[string]string { + return pipelineLabels("configure", resourceRequestIdentifier, promiseIdentifier) +} diff --git a/lib/pipeline/delete.go b/lib/pipeline/delete.go new file mode 100644 index 00000000..c29a434b --- /dev/null +++ b/lib/pipeline/delete.go @@ -0,0 +1,76 @@ +package pipeline + +import ( + "strconv" + + platformv1alpha1 "github.com/syntasso/kratix/api/v1alpha1" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" +) + +func NewDeletePipeline(rr *unstructured.Unstructured, pipelines []platformv1alpha1.Pipeline, resourceRequestIdentifier, promiseIdentifier string) v1.Pod { + containers, pipelineVolumes := deletePipelineContainers(rr, pipelines, resourceRequestIdentifier) + + pod := v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: deletePipelineName(promiseIdentifier), + Namespace: "default", + Labels: DeletePipelineLabels(resourceRequestIdentifier, promiseIdentifier), + }, + Spec: v1.PodSpec{ + RestartPolicy: v1.RestartPolicyOnFailure, + ServiceAccountName: promiseIdentifier + "-promise-pipeline", + Containers: []v1.Container{containers[len(containers)-1]}, + InitContainers: containers[0 : len(containers)-1], + Volumes: pipelineVolumes, + }, + } + + return pod +} + +func deletePipelineContainers(rr *unstructured.Unstructured, pipelines []platformv1alpha1.Pipeline, rrID string) ([]v1.Container, []v1.Volume) { + readerContainer, readerVolume := readerContainerAndVolume(rr) + containers := []v1.Container{ + readerContainer, + } + volumes := []v1.Volume{ + readerVolume, // vol0 + } + + if len(pipelines) > 0 { + //TODO: We only support 1 workflow for now + for i, c := range pipelines[0].Spec.Containers { + volumes = append(volumes, v1.Volume{ + Name: "vol" + strconv.Itoa(i+1), + VolumeSource: v1.VolumeSource{EmptyDir: &v1.EmptyDirVolumeSource{}}, + }) + + containers = append(containers, v1.Container{ + Name: c.Name, + Image: c.Image, + VolumeMounts: []v1.VolumeMount{ + {Name: "vol" + strconv.Itoa(i), MountPath: "/input"}, + {Name: "vol" + strconv.Itoa(i+1), MountPath: "/output"}, + }, + Env: []v1.EnvVar{ + { + Name: "KRATIX_OPERATION", + Value: "delete", + }, + }, + }) + } + } + + return containers, volumes +} + +func DeletePipelineLabels(resourceRequestIdentifier, promiseIdentifier string) map[string]string { + return pipelineLabels("delete", resourceRequestIdentifier, promiseIdentifier) +} + +func deletePipelineName(promiseIdentifier string) string { + return pipelineName("delete", promiseIdentifier) +} diff --git a/lib/pipeline/shared.go b/lib/pipeline/shared.go new file mode 100644 index 00000000..3c19bf6a --- /dev/null +++ b/lib/pipeline/shared.go @@ -0,0 +1,52 @@ +package pipeline + +import ( + "fmt" + "strings" + + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/util/uuid" +) + +func readerContainerAndVolume(rr *unstructured.Unstructured) (v1.Container, v1.Volume) { + resourceKindNameNamespace := fmt.Sprintf("%s.%s %s --namespace %s", + strings.ToLower(rr.GetKind()), rr.GroupVersionKind().Group, rr.GetName(), rr.GetNamespace()) + + resourceRequestCommand := fmt.Sprintf("kubectl get %s -oyaml > /output/object.yaml", resourceKindNameNamespace) + container := v1.Container{ + Name: "reader", + Image: "bitnami/kubectl:1.20.10", + Command: []string{"sh", "-c", resourceRequestCommand}, + VolumeMounts: []v1.VolumeMount{ + { + MountPath: "/output", + Name: "vol0", + }, + }, + } + + volume := v1.Volume{Name: "vol0", VolumeSource: v1.VolumeSource{EmptyDir: &v1.EmptyDirVolumeSource{}}} + return container, volume +} + +func SharedLabels(resourceRequestIdentifier, promiseIdentifier string) map[string]string { + return map[string]string{ + "kratix-promise-id": promiseIdentifier, + "kratix-promise-resource-request-id": resourceRequestIdentifier, + } +} + +func pipelineLabels(pipelineType, resourceRequestIdentifier, promiseIdentifier string) map[string]string { + labels := SharedLabels(resourceRequestIdentifier, promiseIdentifier) + labels["kratix-pipeline-type"] = pipelineType + return labels +} + +func pipelineName(pipelineType, promiseIdentifier string) string { + return pipelineType + "-pipeline-" + promiseIdentifier + "-" + getShortUuid() +} + +func getShortUuid() string { + return string(uuid.NewUUID()[0:5]) +} diff --git a/test/system/assets/bash-promise/promise.yaml b/test/system/assets/bash-promise/promise.yaml index 9a3c381e..4d88024b 100644 --- a/test/system/assets/bash-promise/promise.yaml +++ b/test/system/assets/bash-promise/promise.yaml @@ -1,3 +1,29 @@ +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: bash-default-promise-pipeline-credentials +rules: +- apiGroups: + - "" + resources: + - namespaces + verbs: + - "*" +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRoleBinding +metadata: + name: bash-default-promise-pipeline-credentials +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: bash-default-promise-pipeline-credentials +subjects: +- kind: ServiceAccount + name: bash-default-promise-pipeline + namespace: default +--- apiVersion: platform.kratix.io/v1alpha1 kind: Promise metadata: @@ -52,3 +78,13 @@ spec: name: bash-promise-test-c0 - image: syntassodev/bash-promise-test-c1:dev name: bash-promise-test-c1 + delete: + - apiVersion: platform.kratix.io/v1alpha1 + kind: Pipeline + metadata: + name: instance-configure + namespace: default + spec: + containers: + - image: syntassodev/bash-promise-test-c0:dev + name: bash-promise-test-c0 diff --git a/test/system/system_test.go b/test/system/system_test.go index b7c169bd..4d82904d 100644 --- a/test/system/system_test.go +++ b/test/system/system_test.go @@ -91,10 +91,15 @@ var _ = Describe("Kratix", func() { It("executes the pipelines and schedules the work", func() { rrName := "rr-test" - c1Command := `kubectl create namespace rr-ns --dry-run=client -oyaml > /output/ns.yaml - echo "message: My awesome status message" > /metadata/status.yaml - echo "key: value" >> /metadata/status.yaml` - c2Command := `kubectl create configmap multi-container-config --namespace rr-ns --dry-run=client -oyaml > /output/configmap.yaml` + + c1Command := `kop="delete" + if [ "${KRATIX_OPERATION}" != "delete" ]; then kop="create" + echo "message: My awesome status message" > /metadata/status.yaml + echo "key: value" >> /metadata/status.yaml + fi + kubectl ${kop} namespace imperative-$(yq '.metadata.name' /input/object.yaml)` + + c2Command := `kubectl create namespace declarative-$(yq '.metadata.name' /input/object.yaml) --dry-run=client -oyaml > /output/namespace.yaml` commands := []string{c1Command, c2Command} @@ -105,8 +110,8 @@ var _ = Describe("Kratix", func() { }) By("deploying the contents of /output to the worker cluster", func() { - worker.eventuallyKubectl("get", "namespace", "rr-ns") - worker.eventuallyKubectl("get", "configmap", "multi-container-config", "--namespace", "rr-ns") + platform.eventuallyKubectl("get", "namespace", "imperative-rr-test") + worker.eventuallyKubectl("get", "namespace", "declarative-rr-test") }) By("updating the resource status", func() { @@ -119,10 +124,17 @@ var _ = Describe("Kratix", func() { Eventually(func(g Gomega) { g.Expect(platform.kubectl("get", "bash")).NotTo(ContainSubstring(rrName)) - g.Expect(worker.kubectl("get", "namespace")).NotTo(ContainSubstring("mcns")) + g.Expect(platform.kubectl("get", "namespace")).NotTo(ContainSubstring("imperative-rr-test")) + g.Expect(worker.kubectl("get", "namespace")).NotTo(ContainSubstring("declarative-rr-test")) }, timeout, interval).Should(Succeed()) }) + By("deleting the pipeline pods", func() { + Eventually(func(g Gomega) { + g.Expect(platform.kubectl("get", "pods")).NotTo(ContainSubstring("configure")) + g.Expect(platform.kubectl("get", "pods")).NotTo(ContainSubstring("delete")) + }, timeout, interval).Should(Succeed()) + }) }) AfterEach(func() { From f0e4cf4a43a11f0621783c187618acee70a4fbff Mon Sep 17 00:00:00 2001 From: Jake Date: Thu, 13 Jul 2023 12:58:12 +0100 Subject: [PATCH 2/3] fix: refactor unit tests --- ...ynamic_resource_request_controller_test.go | 105 +++++++++--------- 1 file changed, 50 insertions(+), 55 deletions(-) diff --git a/controllers/promise_and_dynamic_resource_request_controller_test.go b/controllers/promise_and_dynamic_resource_request_controller_test.go index 0c7a0920..20fdbd6c 100644 --- a/controllers/promise_and_dynamic_resource_request_controller_test.go +++ b/controllers/promise_and_dynamic_resource_request_controller_test.go @@ -46,6 +46,7 @@ var _ = Context("Promise Reconciler", func() { var ( promiseCR *platformv1alpha1.Promise expectedCRDName = "redis.redis.redis.opstreelabs.in" + ctx = context.Background() ) Describe("Can support complete Promises", func() { @@ -59,14 +60,14 @@ var _ = Context("Promise Reconciler", func() { Expect(err).ToNot(HaveOccurred()) //Works once, then fails as the promiseCR already exists. Consider building check here. - k8sClient.Create(context.Background(), promiseCR) + k8sClient.Create(ctx, promiseCR) By("creating a CRD for redis.redis.redis") Eventually(func() string { crd, _ := apiextensionClient. ApiextensionsV1(). CustomResourceDefinitions(). - Get(context.Background(), expectedCRDName, metav1.GetOptions{}) + Get(ctx, expectedCRDName, metav1.GetOptions{}) // The returned CRD is missing the expected metadata, // therefore we need to reach inside the spec to get the @@ -88,26 +89,26 @@ var _ = Context("Promise Reconciler", func() { By("creating clusterRoleBindings for the controller and pipeline") Eventually(func() error { binding := &rbacv1.ClusterRoleBinding{} - err := k8sClient.Get(context.Background(), controllerResourceNamespacedName, binding) + err := k8sClient.Get(ctx, controllerResourceNamespacedName, binding) return err }, timeout, interval).Should(BeNil(), "Expected controller ClusterRoleBinding to exist") Eventually(func() error { binding := &rbacv1.ClusterRoleBinding{} - err := k8sClient.Get(context.Background(), piplineResourceNamespacedName, binding) + err := k8sClient.Get(ctx, piplineResourceNamespacedName, binding) return err }, timeout, interval).Should(BeNil(), "Expected pipeline ClusterRoleBinding to exist") By("creating clusterRoles for the controller and pipeline") Eventually(func() error { clusterRole := &rbacv1.ClusterRole{} - err := k8sClient.Get(context.Background(), controllerResourceNamespacedName, clusterRole) + err := k8sClient.Get(ctx, controllerResourceNamespacedName, clusterRole) return err }, timeout, interval).Should(BeNil(), "Expected controller ClusterRole to exist") Eventually(func() error { clusterRole := &rbacv1.ClusterRole{} - err := k8sClient.Get(context.Background(), piplineResourceNamespacedName, clusterRole) + err := k8sClient.Get(ctx, piplineResourceNamespacedName, clusterRole) return err }, timeout, interval).Should(BeNil(), "Expected pipeline ClusterRole to exist") @@ -115,7 +116,7 @@ var _ = Context("Promise Reconciler", func() { pipelineServiceAccountNamespacedName := types.NamespacedName{Name: piplineResourceNamespacedName.Name, Namespace: "default"} Eventually(func() error { serviceAccount := &v1.ServiceAccount{} - err := k8sClient.Get(context.Background(), pipelineServiceAccountNamespacedName, serviceAccount) + err := k8sClient.Get(ctx, pipelineServiceAccountNamespacedName, serviceAccount) return err }, timeout, interval).Should(BeNil(), "Expected pipeline ServiceAccount to exist") @@ -128,7 +129,7 @@ var _ = Context("Promise Reconciler", func() { Expect(err).ToNot(HaveOccurred()) redisRequest.SetNamespace("default") - err = k8sClient.Create(context.Background(), redisRequest) + err = k8sClient.Create(ctx, redisRequest) Expect(err).ToNot(HaveOccurred()) By("creating a configMap to store Promise scheduling") @@ -139,7 +140,7 @@ var _ = Context("Promise Reconciler", func() { Name: "scheduling-" + promiseIdentifier, } - err := k8sClient.Get(context.Background(), expectedCM, cm) + err := k8sClient.Get(ctx, expectedCM, cm) if err != nil { fmt.Println(err.Error()) } @@ -152,7 +153,7 @@ var _ = Context("Promise Reconciler", func() { promise := &v1alpha1.Promise{} - err = k8sClient.Get(context.Background(), expectedPromise, promise) + err = k8sClient.Get(ctx, expectedPromise, promise) Expect(err).NotTo(HaveOccurred()) Expect(promise.GetFinalizers()).Should( ConsistOf( @@ -170,27 +171,28 @@ var _ = Context("Promise Reconciler", func() { Namespace: "default", } Eventually(func() error { - err := k8sClient.Get(context.Background(), workNamespacedName, &v1alpha1.Work{}) + err := k8sClient.Get(ctx, workNamespacedName, &v1alpha1.Work{}) return err }, timeout, interval).Should(BeNil()) By("deleting the Promise") - err = k8sClient.Delete(context.Background(), promiseCR) + err = k8sClient.Delete(ctx, promiseCR) Expect(err).NotTo(HaveOccurred()) //delete pipeline should be created deletePipeline := v1.Pod{} Eventually(func() bool { pods := &v1.PodList{} - err := k8sClient.List(context.Background(), pods) + err := k8sClient.List(ctx, pods) if err != nil { return false } + //configure and delete pods if len(pods.Items) != 2 { return false } for _, pod := range pods.Items { - if strings.HasPrefix(pod.Name, "delete-") { + if strings.HasPrefix(pod.Name, "delete-pipeline-redis-promise-default-") { deletePipeline = pod return true } @@ -198,14 +200,6 @@ var _ = Context("Promise Reconciler", func() { return false }, timeout, interval).Should(BeTrue(), "Expected the delete pipeline to be created") - //update the pod to be marked as complete - // status: - // conditions: - // - lastProbeTime: null - // lastTransitionTime: "2023-07-11T15:20:37Z" - // reason: PodCompleted - // status: "True" - // type: Initialized deletePipeline.Status.Conditions = []v1.PodCondition{ { Status: "True", @@ -213,14 +207,14 @@ var _ = Context("Promise Reconciler", func() { Reason: "PodCompleted", }, } - err = k8sClient.Status().Update(context.Background(), &deletePipeline) + err = k8sClient.Status().Update(ctx, &deletePipeline) Expect(err).NotTo(HaveOccurred()) By("also deleting the resource requests") Eventually(func() int { rrList := &unstructured.UnstructuredList{} rrList.SetGroupVersionKind(redisRequest.GroupVersionKind()) - err := k8sClient.List(context.Background(), rrList) + err := k8sClient.List(ctx, rrList) if err != nil { return -1 } @@ -235,40 +229,40 @@ var _ = Context("Promise Reconciler", func() { Name: "scheduling-redis-promise-default", } - err := k8sClient.Get(context.Background(), expectedCM, cm) + err := k8sClient.Get(ctx, expectedCM, cm) return errors.IsNotFound(err) }, timeout, interval).Should(BeTrue(), "ConfigMap should have been deleted") By("also deleting the ClusterRoleBinding for the controller and pipeline") Eventually(func() bool { binding := &rbacv1.ClusterRoleBinding{} - err := k8sClient.Get(context.Background(), piplineResourceNamespacedName, binding) + err := k8sClient.Get(ctx, piplineResourceNamespacedName, binding) return errors.IsNotFound(err) }, timeout, interval).Should(BeTrue(), "Expected pipeline ClusterRoleBinding not to be found") Eventually(func() bool { binding := &rbacv1.ClusterRoleBinding{} - err := k8sClient.Get(context.Background(), controllerResourceNamespacedName, binding) + err := k8sClient.Get(ctx, controllerResourceNamespacedName, binding) return errors.IsNotFound(err) }, timeout, interval).Should(BeTrue(), "Expected controller ClusterRoleBinding not to be found") By("also deleting the ClusterRole for the controller and pipeline") Eventually(func() bool { binding := &rbacv1.ClusterRole{} - err := k8sClient.Get(context.Background(), piplineResourceNamespacedName, binding) + err := k8sClient.Get(ctx, piplineResourceNamespacedName, binding) return errors.IsNotFound(err) }, timeout, interval).Should(BeTrue(), "Expected pipeline ClusterRole not to be found") Eventually(func() bool { binding := &rbacv1.ClusterRole{} - err := k8sClient.Get(context.Background(), controllerResourceNamespacedName, binding) + err := k8sClient.Get(ctx, controllerResourceNamespacedName, binding) return errors.IsNotFound(err) }, timeout, interval).Should(BeTrue(), "Expected controller ClusterRole not to be found") By("also deleting the serviceAccount for the pipeline") Eventually(func() bool { serviceAccount := &v1.ServiceAccount{} - err := k8sClient.Get(context.Background(), pipelineServiceAccountNamespacedName, serviceAccount) + err := k8sClient.Get(ctx, pipelineServiceAccountNamespacedName, serviceAccount) return errors.IsNotFound(err) }, timeout, interval).Should(BeTrue(), "Expected pipleine ServiceAccount not to be found") @@ -277,20 +271,20 @@ var _ = Context("Promise Reconciler", func() { _, err := apiextensionClient. ApiextensionsV1(). CustomResourceDefinitions(). - Get(context.Background(), expectedCRDName, metav1.GetOptions{}) + Get(ctx, expectedCRDName, metav1.GetOptions{}) return errors.IsNotFound(err) }, timeout, interval).Should(BeTrue(), "Expected CRD to not be found") By("also deleting the Work") Eventually(func() bool { - err := k8sClient.Get(context.Background(), workNamespacedName, &v1alpha1.Work{}) + err := k8sClient.Get(ctx, workNamespacedName, &v1alpha1.Work{}) return errors.IsNotFound(err) }, timeout, interval).Should(BeTrue(), "Expected Work to not be found") By("finally deleting the Promise itself") Eventually(func() bool { - err := k8sClient.Get(context.Background(), expectedPromise, &v1alpha1.Promise{}) + err := k8sClient.Get(ctx, expectedPromise, &v1alpha1.Promise{}) return errors.IsNotFound(err) }, timeout, interval).Should(BeTrue(), "Expected Promise not to be found") }) @@ -315,7 +309,7 @@ var _ = Context("Promise Reconciler", func() { }) It("Creates", func() { - err := k8sClient.Create(context.Background(), redisRequest) + err := k8sClient.Create(ctx, redisRequest) Expect(err).ToNot(HaveOccurred()) By("defining a valid pod spec for the transformation pipeline") @@ -323,13 +317,14 @@ var _ = Context("Promise Reconciler", func() { var interval = "3s" Eventually(func() string { pods := &v1.PodList{} - err := k8sClient.List(context.Background(), pods) + err := k8sClient.List(ctx, pods) if err != nil { return "" } if len(pods.Items) != 1 { return "" } + Expect(pods.Items[0].Name).To(HavePrefix("configure-pipeline-redis-promise-default-")) configurePodNamespacedName.Name = pods.Items[0].Name return pods.Items[0].Spec.Containers[0].Name }, timeout, interval).Should(Equal("status-writer")) @@ -338,7 +333,7 @@ var _ = Context("Promise Reconciler", func() { Eventually(func() []string { createdRedisRequest := &unstructured.Unstructured{} createdRedisRequest.SetGroupVersionKind(redisRequest.GroupVersionKind()) - err := k8sClient.Get(context.Background(), client.ObjectKeyFromObject(redisRequest), createdRedisRequest) + err := k8sClient.Get(ctx, client.ObjectKeyFromObject(redisRequest), createdRedisRequest) if err != nil { fmt.Println(err.Error()) return nil @@ -356,13 +351,13 @@ var _ = Context("Promise Reconciler", func() { } existingResourceRequest.SetGroupVersionKind(gvk) ns := client.ObjectKeyFromObject(redisRequest) - err := k8sClient.Get(context.Background(), ns, existingResourceRequest) + err := k8sClient.Get(ctx, ns, existingResourceRequest) Expect(err).ToNot(HaveOccurred()) existingResourceRequest.SetAnnotations(map[string]string{ "new-annotation": "auto-added", }) - err = k8sClient.Update(context.Background(), existingResourceRequest) + err = k8sClient.Update(ctx, existingResourceRequest) Expect(err).ToNot(HaveOccurred()) Consistently(func() int { @@ -376,7 +371,7 @@ var _ = Context("Promise Reconciler", func() { } ol := &v1.PodList{} - err := k8sClient.List(context.Background(), ol, listOps) + err := k8sClient.List(ctx, ol, listOps) if err != nil { fmt.Println(err.Error()) return -1 @@ -390,18 +385,18 @@ var _ = Context("Promise Reconciler", func() { work = &platformv1alpha1.Work{} work.Name = "redis-promise-default-default-opstree-redis" work.Namespace = "default" - err := k8sClient.Create(context.Background(), work) + err := k8sClient.Create(ctx, work) Expect(err).ToNot(HaveOccurred()) //test delete - err = k8sClient.Delete(context.Background(), redisRequest) + err = k8sClient.Delete(ctx, redisRequest) Expect(err).ToNot(HaveOccurred()) //delete pipeline should be created deletePipeline := v1.Pod{} Eventually(func() bool { pods := &v1.PodList{} - err := k8sClient.List(context.Background(), pods) + err := k8sClient.List(ctx, pods) if err != nil { return false } @@ -409,7 +404,7 @@ var _ = Context("Promise Reconciler", func() { return false } for _, pod := range pods.Items { - if strings.HasPrefix(pod.Name, "delete-") { + if strings.HasPrefix(pod.Name, "delete-pipeline-redis-promise-default-") { deletePipeline = pod return true } @@ -424,12 +419,12 @@ var _ = Context("Promise Reconciler", func() { Reason: "PodCompleted", }, } - err = k8sClient.Status().Update(context.Background(), &deletePipeline) + err = k8sClient.Status().Update(ctx, &deletePipeline) Expect(err).NotTo(HaveOccurred()) Eventually(func() bool { pods := &v1.PodList{} - err := k8sClient.List(context.Background(), pods) + err := k8sClient.List(ctx, pods) if err != nil { return false } @@ -440,14 +435,14 @@ var _ = Context("Promise Reconciler", func() { work = &platformv1alpha1.Work{} work.Name = "redis-promise-default-default-opstree-redis" work.Namespace = "default" - err := k8sClient.Get(context.Background(), client.ObjectKeyFromObject(work), work) + err := k8sClient.Get(ctx, client.ObjectKeyFromObject(work), work) return errors.IsNotFound(err) }, timeout, interval).Should(BeTrue(), "Expected the Work to be deleted") Eventually(func() bool { createdRedisRequest := &unstructured.Unstructured{} createdRedisRequest.SetGroupVersionKind(redisRequest.GroupVersionKind()) - err := k8sClient.Get(context.Background(), client.ObjectKeyFromObject(redisRequest), createdRedisRequest) + err := k8sClient.Get(ctx, client.ObjectKeyFromObject(redisRequest), createdRedisRequest) return errors.IsNotFound(err) }, timeout, interval).Should(BeTrue(), "Expected the Redis resource to be deleted") }) @@ -463,7 +458,7 @@ var _ = Context("Promise Reconciler", func() { promiseCR.Namespace = "default" Expect(err).ToNot(HaveOccurred()) - k8sClient.Create(context.Background(), promiseCR) + k8sClient.Create(ctx, promiseCR) }) promiseIdentifier := "nil-api-promise-default" @@ -479,13 +474,13 @@ var _ = Context("Promise Reconciler", func() { Namespace: "default", } Eventually(func() error { - err := k8sClient.Get(context.Background(), workNamespacedName, &v1alpha1.Work{}) + err := k8sClient.Get(ctx, workNamespacedName, &v1alpha1.Work{}) return err }, timeout, interval).Should(BeNil()) By("setting the correct finalizers") promise := &v1alpha1.Promise{} - err := k8sClient.Get(context.Background(), expectedPromise, promise) + err := k8sClient.Get(ctx, expectedPromise, promise) Expect(err).NotTo(HaveOccurred()) Expect(promise.GetFinalizers()).Should( @@ -494,18 +489,18 @@ var _ = Context("Promise Reconciler", func() { ), "Promise should have finalizers set") By("deleting the Promise") - err = k8sClient.Delete(context.Background(), promiseCR) + err = k8sClient.Delete(ctx, promiseCR) Expect(err).NotTo(HaveOccurred()) By("also deleting the Work") Eventually(func() bool { - err := k8sClient.Get(context.Background(), workNamespacedName, &v1alpha1.Work{}) + err := k8sClient.Get(ctx, workNamespacedName, &v1alpha1.Work{}) return errors.IsNotFound(err) }, timeout, interval).Should(BeTrue(), "Expected Work to not be found") By("finally deleting the Promise itself") Eventually(func() bool { - err := k8sClient.Get(context.Background(), expectedPromise, &v1alpha1.Promise{}) + err := k8sClient.Get(ctx, expectedPromise, &v1alpha1.Promise{}) return errors.IsNotFound(err) }, timeout, interval).Should(BeTrue(), "Expected Promise not to be found") }) @@ -521,7 +516,7 @@ var _ = Context("Promise Reconciler", func() { Expect(err).ToNot(HaveOccurred()) //Works once, then fails as the promiseCR already exists. Consider building check here. - err = k8sClient.Create(context.Background(), promiseCR) + err = k8sClient.Create(ctx, promiseCR) Expect(err).ToNot(HaveOccurred()) }) @@ -533,7 +528,7 @@ var _ = Context("Promise Reconciler", func() { crd, _ = apiextensionClient. ApiextensionsV1(). CustomResourceDefinitions(). - Get(context.Background(), expectedCRDName, metav1.GetOptions{}) + Get(ctx, expectedCRDName, metav1.GetOptions{}) // The returned CRD is missing the expected metadata, // therefore we need to reach inside the spec to get the From 9929c0652d4e6778f3c4220f51ca02fa0df3c9b8 Mon Sep 17 00:00:00 2001 From: Jake Date: Thu, 13 Jul 2023 14:19:46 +0100 Subject: [PATCH 3/3] fix: race condition in unit tests --- ...promise_and_dynamic_resource_request_controller_test.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/controllers/promise_and_dynamic_resource_request_controller_test.go b/controllers/promise_and_dynamic_resource_request_controller_test.go index 20fdbd6c..4f5d39eb 100644 --- a/controllers/promise_and_dynamic_resource_request_controller_test.go +++ b/controllers/promise_and_dynamic_resource_request_controller_test.go @@ -187,10 +187,11 @@ var _ = Context("Promise Reconciler", func() { if err != nil { return false } - //configure and delete pods - if len(pods.Items) != 2 { + + if len(pods.Items) == 0 { return false } + for _, pod := range pods.Items { if strings.HasPrefix(pod.Name, "delete-pipeline-redis-promise-default-") { deletePipeline = pod @@ -400,7 +401,7 @@ var _ = Context("Promise Reconciler", func() { if err != nil { return false } - if len(pods.Items) != 2 { + if len(pods.Items) == 0 { return false } for _, pod := range pods.Items {