Skip to content

Commit

Permalink
Refactor: Wrap pipeline Pod in a Job
Browse files Browse the repository at this point in the history
* Jobs are designed for completable tasks and expose better completion data and functionality
* There is a 1:1 mapping from Job to Pod so the change to the system is minimal
* Future enhancements likely around Pod retry logic
  • Loading branch information
winnab committed Jul 28, 2023
1 parent b92b681 commit 8511c81
Show file tree
Hide file tree
Showing 7 changed files with 113 additions and 109 deletions.
14 changes: 11 additions & 3 deletions charts/kratix/crds/platform_kratix_io_crds.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -303,9 +303,17 @@ spec:
properties:
targetClusterName:
type: string
workName:
description: The unique identifier of the Work parent
type: string
workload:
description: Workload represents the manifest workload to be deployed on worker
properties:
manifests:
description: Manifests represents a list of resources to be deployed on the worker
items:
description: Manifest represents a resource to be deployed on worker
type: object
x-kubernetes-preserve-unknown-fields: true
type: array
type: object
type: object
status:
description: WorkPlacementStatus defines the observed state of WorkPlacement
Expand Down
18 changes: 9 additions & 9 deletions config/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,6 @@ rules:
- delete
- list
- watch
- apiGroups:
- ""
resources:
- pods
verbs:
- create
- delete
- list
- watch
- apiGroups:
- ""
resources:
Expand Down Expand Up @@ -52,6 +43,15 @@ rules:
- patch
- update
- watch
- apiGroups:
- batch
resources:
- jobs
verbs:
- create
- delete
- list
- watch
- apiGroups:
- platform.kratix.io
resources:
Expand Down
73 changes: 32 additions & 41 deletions controllers/dynamic_resource_request_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/go-logr/logr"
"github.com/syntasso/kratix/api/v1alpha1"
"github.com/syntasso/kratix/lib/pipeline"
batchv1 "k8s.io/api/batch/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -68,7 +69,7 @@ type dynamicResourceRequestController struct {
crd *apiextensionsv1.CustomResourceDefinition
}

//+kubebuilder:rbac:groups="",resources=pods,verbs=create;list;watch;delete
//+kubebuilder:rbac:groups="batch",resources=jobs,verbs=create;list;watch;delete
//+kubebuilder:rbac:groups="",resources=serviceaccounts,verbs=create

func (r *dynamicResourceRequestController) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
Expand Down Expand Up @@ -116,7 +117,7 @@ func (r *dynamicResourceRequestController) Reconcile(ctx context.Context, req ct

//check if the pipeline has already been created. If it has exit out. All the lines
//below this call are for the one-time creation of the pipeline.
created, err := r.configurePipelinePodHasBeenCreated(resourceRequestIdentifier, rr.GetNamespace(), logger)
created, err := r.configurePipelineHasBeenCreated(resourceRequestIdentifier, rr.GetNamespace(), logger)
if err != nil {
return ctrl.Result{}, err
}
Expand Down Expand Up @@ -162,26 +163,26 @@ func (r *dynamicResourceRequestController) deleteResources(ctx context.Context,
}

if controllerutil.ContainsFinalizer(resourceRequest, deleteWorkflowsFinalizer) {
existingDeletePipelinePod, err := r.getDeletePipelinePod(ctx, resourceRequestIdentifier, resourceRequest.GetNamespace(), logger)
existingDeletePipeline, err := r.getDeletePipeline(ctx, resourceRequestIdentifier, resourceRequest.GetNamespace(), logger)
if err != nil {
return defaultRequeue, err
}

if existingDeletePipelinePod == nil {
deletePipelinePod := pipeline.NewDeletePipeline(resourceRequest, r.deletePipelines, resourceRequestIdentifier, r.promiseIdentifier)
if existingDeletePipeline == nil {
deletePipeline := pipeline.NewDeletePipeline(resourceRequest, r.deletePipelines, resourceRequestIdentifier, r.promiseIdentifier)
logger.Info("Creating Delete Pipeline for Promise resource request: " + resourceRequestIdentifier + ". The pipeline will now execute...")
err = r.Client.Create(ctx, &deletePipelinePod)
err = r.Client.Create(ctx, &deletePipeline)
if err != nil {
logger.Error(err, "Error creating Pod")
y, _ := yaml.Marshal(&deletePipelinePod)
logger.Error(err, "Error creating delete pipeline")
y, _ := yaml.Marshal(&deletePipeline)
logger.Error(err, string(y))
return ctrl.Result{}, err
}
return defaultRequeue, nil
}

logger.Info("Checking status of Delete Pipeline for Promise resource request: " + resourceRequestIdentifier)
if pipelineIsComplete(*existingDeletePipelinePod) {
if existingDeletePipeline.Status.Succeeded > 0 {
logger.Info("Delete Pipeline Completed for Promise resource request: " + resourceRequestIdentifier)
controllerutil.RemoveFinalizer(resourceRequest, deleteWorkflowsFinalizer)
if err := r.Client.Update(ctx, resourceRequest); err != nil {
Expand All @@ -201,7 +202,7 @@ func (r *dynamicResourceRequestController) deleteResources(ctx context.Context,
}

if controllerutil.ContainsFinalizer(resourceRequest, workflowsFinalizer) {
err := r.deletePipeline(ctx, resourceRequest, resourceRequestIdentifier, workflowsFinalizer, logger)
err := r.deleteWorkflows(ctx, resourceRequest, resourceRequestIdentifier, workflowsFinalizer, logger)
if err != nil {
return defaultRequeue, err
}
Expand All @@ -211,60 +212,50 @@ func (r *dynamicResourceRequestController) deleteResources(ctx context.Context,
return fastRequeue, nil
}

func (r *dynamicResourceRequestController) getDeletePipelinePod(ctx context.Context, resourceRequestIdentifier, namespace string, logger logr.Logger) (*v1.Pod, error) {
pods, err := r.getPodsWithLabels(
func (r *dynamicResourceRequestController) getDeletePipeline(ctx context.Context, resourceRequestIdentifier, namespace string, logger logr.Logger) (*batchv1.Job, error) {
jobs, err := r.getJobsWithLabels(
pipeline.DeletePipelineLabels(resourceRequestIdentifier, r.promiseIdentifier),
namespace,
logger,
)
if err != nil || len(pods) == 0 {
if err != nil || len(jobs) == 0 {
return nil, err
}
return &pods[0], nil
return &jobs[0], nil
}

func (r *dynamicResourceRequestController) configurePipelinePodHasBeenCreated(resourceRequestIdentifier, namespace string, logger logr.Logger) (bool, error) {
pods, err := r.getPodsWithLabels(
func (r *dynamicResourceRequestController) configurePipelineHasBeenCreated(resourceRequestIdentifier, namespace string, logger logr.Logger) (bool, error) {
jobs, err := r.getJobsWithLabels(
pipeline.ConfigurePipelineLabels(resourceRequestIdentifier, r.promiseIdentifier),
namespace,
logger,
)
if err != nil {
return false, err
}
return len(pods) > 0, nil
return len(jobs) > 0, nil
}

func (r *dynamicResourceRequestController) getPodsWithLabels(podLabels map[string]string, namespace string, logger logr.Logger) ([]v1.Pod, error) {
selectorLabels := labels.FormatLabels(podLabels)
func (r *dynamicResourceRequestController) getJobsWithLabels(jobLabels map[string]string, namespace string, logger logr.Logger) ([]batchv1.Job, error) {
selectorLabels := labels.FormatLabels(jobLabels)
selector, err := labels.Parse(selectorLabels)

if err != nil {
return nil, fmt.Errorf("error parsing labels %v: %w", podLabels, err)
return nil, fmt.Errorf("error parsing labels %v: %w", jobLabels, err)
}

listOps := &client.ListOptions{
LabelSelector: selector,
Namespace: namespace,
}

pods := &v1.PodList{}
err = r.Client.List(context.Background(), pods, listOps)
jobs := &batchv1.JobList{}
err = r.Client.List(context.Background(), jobs, listOps)
if err != nil {
logger.Error(err, "error listing pods", "selectors", selector.String())
logger.Error(err, "error listing jobs", "selectors", selector.String())
return nil, err
}
return pods.Items, nil
}

func pipelineIsComplete(pod v1.Pod) bool {
for _, condition := range pod.Status.Conditions {
if condition.Type == "Initialized" {
return condition.Status == "True" && condition.Reason == "PodCompleted" &&
pod.Status.Phase == v1.PodSucceeded
}
}
return false
return jobs.Items, nil
}

func (r *dynamicResourceRequestController) deleteWork(ctx context.Context, resourceRequest *unstructured.Unstructured, workName string, finalizer string, logger logr.Logger) error {
Expand Down Expand Up @@ -306,16 +297,16 @@ func (r *dynamicResourceRequestController) deleteWork(ctx context.Context, resou
return nil
}

func (r *dynamicResourceRequestController) deletePipeline(ctx context.Context, resourceRequest *unstructured.Unstructured, resourceRequestIdentifier, finalizer string, logger logr.Logger) error {
podGVK := schema.GroupVersionKind{
Group: v1.SchemeGroupVersion.Group,
Version: v1.SchemeGroupVersion.Version,
Kind: "Pod",
func (r *dynamicResourceRequestController) deleteWorkflows(ctx context.Context, resourceRequest *unstructured.Unstructured, resourceRequestIdentifier, finalizer string, logger logr.Logger) error {
jobGVK := schema.GroupVersionKind{
Group: batchv1.SchemeGroupVersion.Group,
Version: batchv1.SchemeGroupVersion.Version,
Kind: "Job",
}

podLabels := pipeline.Labels(resourceRequestIdentifier, r.promiseIdentifier)
jobLabels := pipeline.Labels(resourceRequestIdentifier, r.promiseIdentifier)

resourcesRemaining, err := deleteAllResourcesWithKindMatchingLabel(ctx, r.Client, podGVK, podLabels, logger)
resourcesRemaining, err := deleteAllResourcesWithKindMatchingLabel(ctx, r.Client, jobGVK, jobLabels, logger)
if err != nil {
return err
}
Expand Down
38 changes: 14 additions & 24 deletions controllers/promise_and_dynamic_resource_request_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
batchv1 "k8s.io/api/batch/v1"
v1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
apiextensions "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
Expand Down Expand Up @@ -279,16 +280,16 @@ var _ = Context("Promise Reconciler", func() {

It("triggers the resource configure workflow", func() {
Eventually(func() int {
pods := &v1.PodList{}
jobs := &batchv1.JobList{}
lo := &client.ListOptions{
LabelSelector: labels.SelectorFromSet(map[string]string{
"kratix-promise-id": promiseCR.GetIdentifier(),
"kratix-pipeline-type": "configure",
}),
}

Expect(k8sClient.List(ctx, pods, lo)).To(Succeed())
return len(pods.Items)
Expect(k8sClient.List(ctx, jobs, lo)).To(Succeed())
return len(jobs.Items)
}, timeout, interval).Should(Equal(1), "Configure Pipeline never trigerred")
})
})
Expand All @@ -299,23 +300,23 @@ var _ = Context("Promise Reconciler", func() {
})

It("executes the deletion process", func() {
deletePipeline := v1.Pod{}
deletePipeline := batchv1.Job{}

By("triggering the delete pipeline for the promise", func() {
Eventually(func() bool {
pods := &v1.PodList{}
err := k8sClient.List(ctx, pods)
jobs := &batchv1.JobList{}
err := k8sClient.List(ctx, jobs)
if err != nil {
return false
}

if len(pods.Items) == 0 {
if len(jobs.Items) == 0 {
return false
}

for _, pod := range pods.Items {
if strings.HasPrefix(pod.Name, "delete-pipeline-redis-promise") {
deletePipeline = pod
for _, job := range jobs.Items {
if strings.HasPrefix(job.Name, "delete-pipeline-redis-promise") {
deletePipeline = job
return true
}
}
Expand All @@ -324,7 +325,9 @@ var _ = Context("Promise Reconciler", func() {
})

By("deleting the resources when the pipeline completes", func() {
completePipeline(ctx, &deletePipeline)
deletePipeline.Status.Succeeded = 1
Expect(k8sClient.Status().Update(ctx, &deletePipeline)).To(Succeed())

Eventually(func() int {
rrList := &unstructured.UnstructuredList{}
rrList.SetGroupVersionKind(requestedResource.GroupVersionKind())
Expand Down Expand Up @@ -557,16 +560,3 @@ var _ = Context("Promise Reconciler", func() {
})
})
})

func completePipeline(ctx context.Context, pipeline *v1.Pod) {
pipeline.Status.Conditions = []v1.PodCondition{
{
Status: "True",
Type: "Initialized",
Reason: "PodCompleted",
},
}
pipeline.Status.Phase = v1.PodSucceeded

Expect(k8sClient.Status().Update(ctx, pipeline)).To(Succeed())
}
3 changes: 2 additions & 1 deletion controllers/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/syntasso/kratix/lib/writers"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
Expand Down Expand Up @@ -40,7 +41,7 @@ func deleteAllResourcesWithKindMatchingLabel(ctx context.Context, kClient client
logger.Info("deleting resources", "kind", resourceList.GetKind(), "withLabels", resourceLabels, "resources", getResourceNames(resourceList.Items))

for _, resource := range resourceList.Items {
err = kClient.Delete(ctx, &resource)
err = kClient.Delete(ctx, &resource, client.PropagationPolicy(metav1.DeletePropagationBackground))
if err != nil && !errors.IsNotFound(err) {
logger.Error(err, "Error deleting resource, will try again in 5 seconds", "name", resource.GetName(), "kind", resource.GetKind())
return true, err
Expand Down
Loading

0 comments on commit 8511c81

Please sign in to comment.