From 8511c81d6a08cc302578eb1004ca2a38c85c6d2f Mon Sep 17 00:00:00 2001 From: Winna Bridgewater Date: Fri, 28 Jul 2023 18:05:11 +0100 Subject: [PATCH] Refactor: Wrap pipeline Pod in a Job * Jobs are designed for completable tasks and expose better completion data and functionality * There is a 1:1 mapping from Job to Pod so the change to the system is minimal * Future enhancements likely around Pod retry logic --- .../kratix/crds/platform_kratix_io_crds.yaml | 14 +++- config/rbac/role.yaml | 18 ++--- .../dynamic_resource_request_controller.go | 73 ++++++++----------- ...ynamic_resource_request_controller_test.go | 38 ++++------ controllers/util.go | 3 +- lib/pipeline/configure.go | 50 +++++++------ lib/pipeline/delete.go | 26 ++++--- 7 files changed, 113 insertions(+), 109 deletions(-) diff --git a/charts/kratix/crds/platform_kratix_io_crds.yaml b/charts/kratix/crds/platform_kratix_io_crds.yaml index 73e09ab8..1d970ad4 100644 --- a/charts/kratix/crds/platform_kratix_io_crds.yaml +++ b/charts/kratix/crds/platform_kratix_io_crds.yaml @@ -303,9 +303,17 @@ spec: properties: targetClusterName: type: string - workName: - description: The unique identifier of the Work parent - type: string + workload: + description: Workload represents the manifest workload to be deployed on worker + properties: + manifests: + description: Manifests represents a list of resources to be deployed on the worker + items: + description: Manifest represents a resource to be deployed on worker + type: object + x-kubernetes-preserve-unknown-fields: true + type: array + type: object type: object status: description: WorkPlacementStatus defines the observed state of WorkPlacement diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml index 023a85d1..a5ded8fb 100644 --- a/config/rbac/role.yaml +++ b/config/rbac/role.yaml @@ -13,15 +13,6 @@ rules: - delete - list - watch -- apiGroups: - - "" - resources: - - pods - verbs: - - create - - delete - - list - - watch - apiGroups: - "" resources: @@ -52,6 +43,15 @@ rules: - patch - update - watch +- apiGroups: + - batch + resources: + - jobs + verbs: + - create + - delete + - list + - watch - apiGroups: - platform.kratix.io resources: diff --git a/controllers/dynamic_resource_request_controller.go b/controllers/dynamic_resource_request_controller.go index c2078f63..db0e43dc 100644 --- a/controllers/dynamic_resource_request_controller.go +++ b/controllers/dynamic_resource_request_controller.go @@ -30,6 +30,7 @@ import ( "github.com/go-logr/logr" "github.com/syntasso/kratix/api/v1alpha1" "github.com/syntasso/kratix/lib/pipeline" + batchv1 "k8s.io/api/batch/v1" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -68,7 +69,7 @@ type dynamicResourceRequestController struct { crd *apiextensionsv1.CustomResourceDefinition } -//+kubebuilder:rbac:groups="",resources=pods,verbs=create;list;watch;delete +//+kubebuilder:rbac:groups="batch",resources=jobs,verbs=create;list;watch;delete //+kubebuilder:rbac:groups="",resources=serviceaccounts,verbs=create func (r *dynamicResourceRequestController) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { @@ -116,7 +117,7 @@ func (r *dynamicResourceRequestController) Reconcile(ctx context.Context, req ct //check if the pipeline has already been created. If it has exit out. All the lines //below this call are for the one-time creation of the pipeline. - created, err := r.configurePipelinePodHasBeenCreated(resourceRequestIdentifier, rr.GetNamespace(), logger) + created, err := r.configurePipelineHasBeenCreated(resourceRequestIdentifier, rr.GetNamespace(), logger) if err != nil { return ctrl.Result{}, err } @@ -162,18 +163,18 @@ func (r *dynamicResourceRequestController) deleteResources(ctx context.Context, } if controllerutil.ContainsFinalizer(resourceRequest, deleteWorkflowsFinalizer) { - existingDeletePipelinePod, err := r.getDeletePipelinePod(ctx, resourceRequestIdentifier, resourceRequest.GetNamespace(), logger) + existingDeletePipeline, err := r.getDeletePipeline(ctx, resourceRequestIdentifier, resourceRequest.GetNamespace(), logger) if err != nil { return defaultRequeue, err } - if existingDeletePipelinePod == nil { - deletePipelinePod := pipeline.NewDeletePipeline(resourceRequest, r.deletePipelines, resourceRequestIdentifier, r.promiseIdentifier) + if existingDeletePipeline == nil { + deletePipeline := pipeline.NewDeletePipeline(resourceRequest, r.deletePipelines, resourceRequestIdentifier, r.promiseIdentifier) logger.Info("Creating Delete Pipeline for Promise resource request: " + resourceRequestIdentifier + ". The pipeline will now execute...") - err = r.Client.Create(ctx, &deletePipelinePod) + err = r.Client.Create(ctx, &deletePipeline) if err != nil { - logger.Error(err, "Error creating Pod") - y, _ := yaml.Marshal(&deletePipelinePod) + logger.Error(err, "Error creating delete pipeline") + y, _ := yaml.Marshal(&deletePipeline) logger.Error(err, string(y)) return ctrl.Result{}, err } @@ -181,7 +182,7 @@ func (r *dynamicResourceRequestController) deleteResources(ctx context.Context, } logger.Info("Checking status of Delete Pipeline for Promise resource request: " + resourceRequestIdentifier) - if pipelineIsComplete(*existingDeletePipelinePod) { + if existingDeletePipeline.Status.Succeeded > 0 { logger.Info("Delete Pipeline Completed for Promise resource request: " + resourceRequestIdentifier) controllerutil.RemoveFinalizer(resourceRequest, deleteWorkflowsFinalizer) if err := r.Client.Update(ctx, resourceRequest); err != nil { @@ -201,7 +202,7 @@ func (r *dynamicResourceRequestController) deleteResources(ctx context.Context, } if controllerutil.ContainsFinalizer(resourceRequest, workflowsFinalizer) { - err := r.deletePipeline(ctx, resourceRequest, resourceRequestIdentifier, workflowsFinalizer, logger) + err := r.deleteWorkflows(ctx, resourceRequest, resourceRequestIdentifier, workflowsFinalizer, logger) if err != nil { return defaultRequeue, err } @@ -211,20 +212,20 @@ func (r *dynamicResourceRequestController) deleteResources(ctx context.Context, return fastRequeue, nil } -func (r *dynamicResourceRequestController) getDeletePipelinePod(ctx context.Context, resourceRequestIdentifier, namespace string, logger logr.Logger) (*v1.Pod, error) { - pods, err := r.getPodsWithLabels( +func (r *dynamicResourceRequestController) getDeletePipeline(ctx context.Context, resourceRequestIdentifier, namespace string, logger logr.Logger) (*batchv1.Job, error) { + jobs, err := r.getJobsWithLabels( pipeline.DeletePipelineLabels(resourceRequestIdentifier, r.promiseIdentifier), namespace, logger, ) - if err != nil || len(pods) == 0 { + if err != nil || len(jobs) == 0 { return nil, err } - return &pods[0], nil + return &jobs[0], nil } -func (r *dynamicResourceRequestController) configurePipelinePodHasBeenCreated(resourceRequestIdentifier, namespace string, logger logr.Logger) (bool, error) { - pods, err := r.getPodsWithLabels( +func (r *dynamicResourceRequestController) configurePipelineHasBeenCreated(resourceRequestIdentifier, namespace string, logger logr.Logger) (bool, error) { + jobs, err := r.getJobsWithLabels( pipeline.ConfigurePipelineLabels(resourceRequestIdentifier, r.promiseIdentifier), namespace, logger, @@ -232,15 +233,15 @@ func (r *dynamicResourceRequestController) configurePipelinePodHasBeenCreated(re if err != nil { return false, err } - return len(pods) > 0, nil + return len(jobs) > 0, nil } -func (r *dynamicResourceRequestController) getPodsWithLabels(podLabels map[string]string, namespace string, logger logr.Logger) ([]v1.Pod, error) { - selectorLabels := labels.FormatLabels(podLabels) +func (r *dynamicResourceRequestController) getJobsWithLabels(jobLabels map[string]string, namespace string, logger logr.Logger) ([]batchv1.Job, error) { + selectorLabels := labels.FormatLabels(jobLabels) selector, err := labels.Parse(selectorLabels) if err != nil { - return nil, fmt.Errorf("error parsing labels %v: %w", podLabels, err) + return nil, fmt.Errorf("error parsing labels %v: %w", jobLabels, err) } listOps := &client.ListOptions{ @@ -248,23 +249,13 @@ func (r *dynamicResourceRequestController) getPodsWithLabels(podLabels map[strin Namespace: namespace, } - pods := &v1.PodList{} - err = r.Client.List(context.Background(), pods, listOps) + jobs := &batchv1.JobList{} + err = r.Client.List(context.Background(), jobs, listOps) if err != nil { - logger.Error(err, "error listing pods", "selectors", selector.String()) + logger.Error(err, "error listing jobs", "selectors", selector.String()) return nil, err } - return pods.Items, nil -} - -func pipelineIsComplete(pod v1.Pod) bool { - for _, condition := range pod.Status.Conditions { - if condition.Type == "Initialized" { - return condition.Status == "True" && condition.Reason == "PodCompleted" && - pod.Status.Phase == v1.PodSucceeded - } - } - return false + return jobs.Items, nil } func (r *dynamicResourceRequestController) deleteWork(ctx context.Context, resourceRequest *unstructured.Unstructured, workName string, finalizer string, logger logr.Logger) error { @@ -306,16 +297,16 @@ func (r *dynamicResourceRequestController) deleteWork(ctx context.Context, resou return nil } -func (r *dynamicResourceRequestController) deletePipeline(ctx context.Context, resourceRequest *unstructured.Unstructured, resourceRequestIdentifier, finalizer string, logger logr.Logger) error { - podGVK := schema.GroupVersionKind{ - Group: v1.SchemeGroupVersion.Group, - Version: v1.SchemeGroupVersion.Version, - Kind: "Pod", +func (r *dynamicResourceRequestController) deleteWorkflows(ctx context.Context, resourceRequest *unstructured.Unstructured, resourceRequestIdentifier, finalizer string, logger logr.Logger) error { + jobGVK := schema.GroupVersionKind{ + Group: batchv1.SchemeGroupVersion.Group, + Version: batchv1.SchemeGroupVersion.Version, + Kind: "Job", } - podLabels := pipeline.Labels(resourceRequestIdentifier, r.promiseIdentifier) + jobLabels := pipeline.Labels(resourceRequestIdentifier, r.promiseIdentifier) - resourcesRemaining, err := deleteAllResourcesWithKindMatchingLabel(ctx, r.Client, podGVK, podLabels, logger) + resourcesRemaining, err := deleteAllResourcesWithKindMatchingLabel(ctx, r.Client, jobGVK, jobLabels, logger) if err != nil { return err } diff --git a/controllers/promise_and_dynamic_resource_request_controller_test.go b/controllers/promise_and_dynamic_resource_request_controller_test.go index c0dabbb1..f4b38487 100644 --- a/controllers/promise_and_dynamic_resource_request_controller_test.go +++ b/controllers/promise_and_dynamic_resource_request_controller_test.go @@ -25,6 +25,7 @@ import ( . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" + batchv1 "k8s.io/api/batch/v1" v1 "k8s.io/api/core/v1" rbacv1 "k8s.io/api/rbac/v1" apiextensions "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" @@ -279,7 +280,7 @@ var _ = Context("Promise Reconciler", func() { It("triggers the resource configure workflow", func() { Eventually(func() int { - pods := &v1.PodList{} + jobs := &batchv1.JobList{} lo := &client.ListOptions{ LabelSelector: labels.SelectorFromSet(map[string]string{ "kratix-promise-id": promiseCR.GetIdentifier(), @@ -287,8 +288,8 @@ var _ = Context("Promise Reconciler", func() { }), } - Expect(k8sClient.List(ctx, pods, lo)).To(Succeed()) - return len(pods.Items) + Expect(k8sClient.List(ctx, jobs, lo)).To(Succeed()) + return len(jobs.Items) }, timeout, interval).Should(Equal(1), "Configure Pipeline never trigerred") }) }) @@ -299,23 +300,23 @@ var _ = Context("Promise Reconciler", func() { }) It("executes the deletion process", func() { - deletePipeline := v1.Pod{} + deletePipeline := batchv1.Job{} By("triggering the delete pipeline for the promise", func() { Eventually(func() bool { - pods := &v1.PodList{} - err := k8sClient.List(ctx, pods) + jobs := &batchv1.JobList{} + err := k8sClient.List(ctx, jobs) if err != nil { return false } - if len(pods.Items) == 0 { + if len(jobs.Items) == 0 { return false } - for _, pod := range pods.Items { - if strings.HasPrefix(pod.Name, "delete-pipeline-redis-promise") { - deletePipeline = pod + for _, job := range jobs.Items { + if strings.HasPrefix(job.Name, "delete-pipeline-redis-promise") { + deletePipeline = job return true } } @@ -324,7 +325,9 @@ var _ = Context("Promise Reconciler", func() { }) By("deleting the resources when the pipeline completes", func() { - completePipeline(ctx, &deletePipeline) + deletePipeline.Status.Succeeded = 1 + Expect(k8sClient.Status().Update(ctx, &deletePipeline)).To(Succeed()) + Eventually(func() int { rrList := &unstructured.UnstructuredList{} rrList.SetGroupVersionKind(requestedResource.GroupVersionKind()) @@ -557,16 +560,3 @@ var _ = Context("Promise Reconciler", func() { }) }) }) - -func completePipeline(ctx context.Context, pipeline *v1.Pod) { - pipeline.Status.Conditions = []v1.PodCondition{ - { - Status: "True", - Type: "Initialized", - Reason: "PodCompleted", - }, - } - pipeline.Status.Phase = v1.PodSucceeded - - Expect(k8sClient.Status().Update(ctx, pipeline)).To(Succeed()) -} diff --git a/controllers/util.go b/controllers/util.go index 0a9b3f02..01d2e8c9 100644 --- a/controllers/util.go +++ b/controllers/util.go @@ -9,6 +9,7 @@ import ( "github.com/syntasso/kratix/lib/writers" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime/schema" @@ -40,7 +41,7 @@ func deleteAllResourcesWithKindMatchingLabel(ctx context.Context, kClient client logger.Info("deleting resources", "kind", resourceList.GetKind(), "withLabels", resourceLabels, "resources", getResourceNames(resourceList.Items)) for _, resource := range resourceList.Items { - err = kClient.Delete(ctx, &resource) + err = kClient.Delete(ctx, &resource, client.PropagationPolicy(metav1.DeletePropagationBackground)) if err != nil && !errors.IsNotFound(err) { logger.Error(err, "Error deleting resource, will try again in 5 seconds", "name", resource.GetName(), "kind", resource.GetKind()) return true, err diff --git a/lib/pipeline/configure.go b/lib/pipeline/configure.go index f99e0a6c..9abc7347 100644 --- a/lib/pipeline/configure.go +++ b/lib/pipeline/configure.go @@ -6,6 +6,7 @@ import ( "strings" platformv1alpha1 "github.com/syntasso/kratix/api/v1alpha1" + batchv1 "k8s.io/api/batch/v1" v1 "k8s.io/api/core/v1" apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -37,46 +38,53 @@ func NewConfigurePipeline( role(rr, crdNames, pipelineResources), roleBinding((pipelineResources)), configMap, - configurePipelinePod(rr, pipelines, pipelineResources), + configurePipeline(rr, pipelines, pipelineResources), } return resources, nil } -func configurePipelinePod(rr *unstructured.Unstructured, pipelines []platformv1alpha1.Pipeline, pipelineResources pipelineArgs) *v1.Pod { +func configurePipeline(rr *unstructured.Unstructured, pipelines []platformv1alpha1.Pipeline, pipelineResources pipelineArgs) *batchv1.Job { volumes := metadataAndSchedulingVolumes(pipelineResources.ConfigMapName()) initContainers, pipelineVolumes := configurePipelineInitContainers(rr, pipelines, pipelineResources.ResourceRequestID()) volumes = append(volumes, pipelineVolumes...) rrKind := fmt.Sprintf("%s.%s", strings.ToLower(rr.GetKind()), rr.GroupVersionKind().Group) - return &v1.Pod{ + return &batchv1.Job{ ObjectMeta: metav1.ObjectMeta{ Name: pipelineResources.ConfigurePipelineName(), Namespace: rr.GetNamespace(), Labels: pipelineResources.ConfigurePipelinePodLabels(), }, - Spec: v1.PodSpec{ - RestartPolicy: v1.RestartPolicyOnFailure, - ServiceAccountName: pipelineResources.ServiceAccountName(), - Containers: []v1.Container{ - { - Name: "status-writer", - Image: os.Getenv("WC_IMG"), - Command: []string{"sh", "-c", "update-status"}, - Env: []v1.EnvVar{ - {Name: "RR_KIND", Value: rrKind}, - {Name: "RR_NAME", Value: rr.GetName()}, - {Name: "RR_NAMESPACE", Value: rr.GetNamespace()}, + Spec: batchv1.JobSpec{ + Template: v1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: pipelineResources.ConfigurePipelinePodLabels(), + }, + Spec: v1.PodSpec{ + RestartPolicy: v1.RestartPolicyOnFailure, + ServiceAccountName: pipelineResources.ServiceAccountName(), + Containers: []v1.Container{ + { + Name: "status-writer", + Image: os.Getenv("WC_IMG"), + Command: []string{"sh", "-c", "update-status"}, + Env: []v1.EnvVar{ + {Name: "RR_KIND", Value: rrKind}, + {Name: "RR_NAME", Value: rr.GetName()}, + {Name: "RR_NAMESPACE", Value: rr.GetNamespace()}, + }, + VolumeMounts: []v1.VolumeMount{{ + MountPath: "/work-creator-files/metadata", + Name: "shared-metadata", + }}, + }, }, - VolumeMounts: []v1.VolumeMount{{ - MountPath: "/work-creator-files/metadata", - Name: "shared-metadata", - }}, + InitContainers: initContainers, + Volumes: volumes, }, }, - InitContainers: initContainers, - Volumes: volumes, }, } } diff --git a/lib/pipeline/delete.go b/lib/pipeline/delete.go index e10b09d5..0b507929 100644 --- a/lib/pipeline/delete.go +++ b/lib/pipeline/delete.go @@ -2,6 +2,7 @@ package pipeline import ( platformv1alpha1 "github.com/syntasso/kratix/api/v1alpha1" + batchv1 "k8s.io/api/batch/v1" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" @@ -9,28 +10,33 @@ import ( const kratixDeleteOperation = "delete" -func NewDeletePipeline(rr *unstructured.Unstructured, pipelines []platformv1alpha1.Pipeline, resourceRequestIdentifier, promiseIdentifier string) v1.Pod { +func NewDeletePipeline(rr *unstructured.Unstructured, pipelines []platformv1alpha1.Pipeline, resourceRequestIdentifier, promiseIdentifier string) batchv1.Job { args := newPipelineArgs(promiseIdentifier, resourceRequestIdentifier, rr.GetNamespace()) containers, pipelineVolumes := deletePipelineContainers(rr, pipelines) - pod := v1.Pod{ + return batchv1.Job{ ObjectMeta: metav1.ObjectMeta{ Name: args.DeletePipelineName(), Namespace: args.Namespace(), Labels: args.DeletePipelinePodLabels(), }, - Spec: v1.PodSpec{ - RestartPolicy: v1.RestartPolicyOnFailure, - ServiceAccountName: args.ServiceAccountName(), - Containers: []v1.Container{containers[len(containers)-1]}, - InitContainers: containers[0 : len(containers)-1], - Volumes: pipelineVolumes, + Spec: batchv1.JobSpec{ + Template: v1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: args.DeletePipelinePodLabels(), + }, + Spec: v1.PodSpec{ + RestartPolicy: v1.RestartPolicyOnFailure, + ServiceAccountName: args.ServiceAccountName(), + Containers: []v1.Container{containers[len(containers)-1]}, + InitContainers: containers[0 : len(containers)-1], + Volumes: pipelineVolumes, + }, + }, }, } - - return pod } func deletePipelineContainers(rr *unstructured.Unstructured, pipelines []platformv1alpha1.Pipeline) ([]v1.Container, []v1.Volume) {