Skip to content

Commit

Permalink
Merge pull request #96 from syntasso/95-delete-pipeline-params
Browse files Browse the repository at this point in the history
feat: ensure all delete pipeline params are passed to pods
  • Loading branch information
SaphMB authored Mar 21, 2024
2 parents c964c11 + 3a24866 commit f680db0
Show file tree
Hide file tree
Showing 4 changed files with 201 additions and 84 deletions.
73 changes: 20 additions & 53 deletions lib/pipeline/configure.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func NewConfigureResource(
}

func NewConfigurePromise(
unstructedPromise *unstructured.Unstructured,
unstructuredPromise *unstructured.Unstructured,
pipelines []v1alpha1.Pipeline,
promiseIdentifier string,
promiseDestinationSelectors []v1alpha1.PromiseScheduling,
Expand All @@ -64,7 +64,7 @@ func NewConfigurePromise(
return nil, err
}

pipeline, err := ConfigurePipeline(unstructedPromise, pipelines, pipelineResources, promiseIdentifier, true, logger)
pipeline, err := ConfigurePipeline(unstructuredPromise, pipelines, pipelineResources, promiseIdentifier, true, logger)
if err != nil {
return nil, err
}
Expand All @@ -83,7 +83,7 @@ func NewConfigurePromise(
func ConfigurePipeline(obj *unstructured.Unstructured, pipelines []v1alpha1.Pipeline, pipelineArgs PipelineArgs, promiseName string, promiseWorkflow bool, logger logr.Logger) (*batchv1.Job, error) {
volumes := metadataAndSchedulingVolumes(pipelineArgs.ConfigMapName())

initContainers, pipelineVolumes := configurePipelineInitContainers(obj, pipelines, promiseName, promiseWorkflow, logger)
initContainers, pipelineVolumes := generateConfigurePipelineContainersAndVolumes(obj, pipelines, promiseName, promiseWorkflow)
volumes = append(volumes, pipelineVolumes...)

objHash, err := hash.ComputeHashForResource(obj)
Expand Down Expand Up @@ -142,54 +142,28 @@ func ConfigurePipeline(obj *unstructured.Unstructured, pipelines []v1alpha1.Pipe
return job, nil
}

func configurePipelineInitContainers(obj *unstructured.Unstructured, pipelines []v1alpha1.Pipeline, promiseName string, promiseWorkflow bool, logger logr.Logger) ([]v1.Container, []v1.Volume) {
volumes, volumeMounts := pipelineVolumes()

kratixWorkflowType := v1alpha1.WorkflowTypeResource
func generateConfigurePipelineContainersAndVolumes(obj *unstructured.Unstructured, pipelines []v1alpha1.Pipeline, promiseName string, promiseWorkflow bool) ([]v1.Container, []v1.Volume) {
workflowType := v1alpha1.WorkflowTypeResource
if promiseWorkflow {
kratixWorkflowType = v1alpha1.WorkflowTypePromise
workflowType = v1alpha1.WorkflowTypePromise
}

readerContainer := readerContainer(obj, kratixWorkflowType, "shared-input")
containers := []v1.Container{
readerContainer,
kratixEnvVars := []v1.EnvVar{
{
Name: kratixActionEnvVar,
Value: string(v1alpha1.WorkflowActionConfigure),
},
{
Name: kratixTypeEnvVar,
Value: string(workflowType),
},
{
Name: kratixPromiseEnvVar,
Value: promiseName,
},
}

if len(pipelines) > 0 {
//TODO: We only support 1 workflow for now
if len(pipelines[0].Spec.Volumes) > 0 {
volumes = append(volumes, pipelines[0].Spec.Volumes...)
}
for i, c := range pipelines[0].Spec.Containers {
kratixEnvVars := []v1.EnvVar{
{
Name: kratixActionEnvVar,
Value: string(v1alpha1.WorkflowActionConfigure),
},
{
Name: kratixTypeEnvVar,
Value: string(kratixWorkflowType),
},
{
Name: kratixPromiseEnvVar,
Value: promiseName,
},
}
if len(c.VolumeMounts) > 0 {
volumeMounts = append(volumeMounts, c.VolumeMounts...)
}
containers = append(containers, v1.Container{
Name: providedOrDefaultName(c.Name, i),
Image: c.Image,
VolumeMounts: volumeMounts,
Args: c.Args,
Command: c.Command,
Env: append(kratixEnvVars, c.Env...),
EnvFrom: c.EnvFrom,
ImagePullPolicy: c.ImagePullPolicy,
})
}
}
containers, volumes := generateContainersAndVolumes(obj, workflowType, pipelines, kratixEnvVars)

workCreatorCommand := fmt.Sprintf("./work-creator -input-directory /work-creator-files -promise-name %s -namespace %q", promiseName, obj.GetNamespace())
if promiseWorkflow {
Expand Down Expand Up @@ -243,10 +217,3 @@ func metadataAndSchedulingVolumes(configMapName string) []v1.Volume {
},
}
}

func providedOrDefaultName(providedName string, index int) string {
if providedName == "" {
return fmt.Sprintf("default-container-name-%d", index)
}
return providedName
}
48 changes: 18 additions & 30 deletions lib/pipeline/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@ import (
const kratixActionDelete = "delete"

func NewDeleteResource(rr *unstructured.Unstructured, pipelines []v1alpha1.Pipeline, resourceRequestIdentifier, promiseIdentifier, crdPlural string) []client.Object {
return newDelete(rr, pipelines, resourceRequestIdentifier, promiseIdentifier, crdPlural)
return NewDelete(rr, pipelines, resourceRequestIdentifier, promiseIdentifier, crdPlural)
}

func NewDeletePromise(promise *unstructured.Unstructured, pipelines []v1alpha1.Pipeline) []client.Object {
return newDelete(promise, pipelines, "", promise.GetName(), v1alpha1.PromisePlural)
return NewDelete(promise, pipelines, "", promise.GetName(), v1alpha1.PromisePlural)
}

func newDelete(obj *unstructured.Unstructured, pipelines []v1alpha1.Pipeline, resourceRequestIdentifier, promiseIdentifier, objPlural string) []client.Object {
func NewDelete(obj *unstructured.Unstructured, pipelines []v1alpha1.Pipeline, resourceRequestIdentifier, promiseIdentifier, objPlural string) []client.Object {
isPromise := resourceRequestIdentifier == ""
namespace := obj.GetNamespace()
if isPromise {
Expand All @@ -28,7 +28,12 @@ func newDelete(obj *unstructured.Unstructured, pipelines []v1alpha1.Pipeline, re

args := NewPipelineArgs(promiseIdentifier, resourceRequestIdentifier, namespace)

containers, pipelineVolumes := deletePipelineContainers(obj, isPromise, pipelines)
containers, pipelineVolumes := generateDeletePipelineContainersAndVolumes(obj, isPromise, pipelines)

var imagePullSecrets []v1.LocalObjectReference
if len(pipelines) > 0 {
imagePullSecrets = pipelines[0].Spec.ImagePullSecrets
}

resources := []client.Object{
serviceAccount(args),
Expand All @@ -51,6 +56,7 @@ func newDelete(obj *unstructured.Unstructured, pipelines []v1alpha1.Pipeline, re
Containers: []v1.Container{containers[len(containers)-1]},
InitContainers: containers[0 : len(containers)-1],
Volumes: pipelineVolumes,
ImagePullSecrets: imagePullSecrets,
},
},
},
Expand All @@ -60,36 +66,18 @@ func newDelete(obj *unstructured.Unstructured, pipelines []v1alpha1.Pipeline, re
return resources
}

func deletePipelineContainers(obj *unstructured.Unstructured, isPromise bool, pipelines []v1alpha1.Pipeline) ([]v1.Container, []v1.Volume) {
volumes, volumeMounts := pipelineVolumes()
func generateDeletePipelineContainersAndVolumes(obj *unstructured.Unstructured, isPromise bool, pipelines []v1alpha1.Pipeline) ([]v1.Container, []v1.Volume) {
kratixEnvVars := []v1.EnvVar{
{
Name: kratixActionEnvVar,
Value: kratixActionDelete,
},
}

//TODO: Does this get called for promises too? If so, change the parameter name and dynamically set input below
workflowType := v1alpha1.WorkflowTypeResource
if isPromise {
workflowType = v1alpha1.WorkflowTypePromise
}

readerContainer := readerContainer(obj, workflowType, "shared-input")
containers := []v1.Container{
readerContainer,
}

if len(pipelines) > 0 {
//TODO: We only support 1 workflow for now
for _, c := range pipelines[0].Spec.Containers {
containers = append(containers, v1.Container{
Name: c.Name,
Image: c.Image,
VolumeMounts: volumeMounts,
Env: []v1.EnvVar{
{
Name: kratixActionEnvVar,
Value: kratixActionDelete,
},
},
})
}
}

return containers, volumes
return generateContainersAndVolumes(obj, workflowType, pipelines, kratixEnvVars)
}
129 changes: 129 additions & 0 deletions lib/pipeline/delete_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -410,6 +411,134 @@ var _ = Describe("Delete Pipeline", func() {
})
})
})

Describe("optional workflow configs", func() {
var (
rr *unstructured.Unstructured
pipelines []v1alpha1.Pipeline
)

BeforeEach(func() {
rr = &unstructured.Unstructured{
Object: map[string]interface{}{
"apiVersion": "v1",
"kind": "Pod",
"metadata": map[string]interface{}{
"name": "test-pod",
"namespace": "test-namespace",
},
"spec": map[string]interface{}{
"foo": "bar",
},
},
}

pipelines = []v1alpha1.Pipeline{
{
Spec: v1alpha1.PipelineSpec{
Containers: []v1alpha1.Container{
{Name: "test-container", Image: "test-image"},
},
},
},
}
})

It("can include args and commands", func() {
pipelines[0].Spec.Containers = append(pipelines[0].Spec.Containers, v1alpha1.Container{
Name: "another-container",
Image: "another-image",
Args: []string{"arg1", "arg2"},
Command: []string{"command1", "command2"},
})
resources := pipeline.NewDelete(rr, pipelines, "", "test-promise", "promises")
job := resources[3].(*batchv1.Job)

Expect(job.Spec.Template.Spec.InitContainers[1].Args).To(BeEmpty())
Expect(job.Spec.Template.Spec.InitContainers[1].Command).To(BeEmpty())
Expect(job.Spec.Template.Spec.Containers[0].Args).To(Equal([]string{"arg1", "arg2"}))
Expect(job.Spec.Template.Spec.Containers[0].Command).To(Equal([]string{"command1", "command2"}))
})

It("can include env and envFrom", func() {
pipelines[0].Spec.Containers = append(pipelines[0].Spec.Containers, v1alpha1.Container{
Name: "another-container",
Image: "another-image",
Env: []corev1.EnvVar{
{Name: "env1", Value: "value1"},
},
EnvFrom: []corev1.EnvFromSource{
{
ConfigMapRef: &corev1.ConfigMapEnvSource{
LocalObjectReference: corev1.LocalObjectReference{Name: "test-configmap"},
},
},
},
})
resources := pipeline.NewDelete(rr, pipelines, "", "test-promise", "promises")
job := resources[3].(*batchv1.Job)

Expect(job.Spec.Template.Spec.InitContainers[1].Env).To(ContainElements(
corev1.EnvVar{Name: "KRATIX_WORKFLOW_ACTION", Value: "delete"},
))
Expect(job.Spec.Template.Spec.Containers[0].Env).To(ContainElements(
corev1.EnvVar{Name: "KRATIX_WORKFLOW_ACTION", Value: "delete"},
corev1.EnvVar{Name: "env1", Value: "value1"},
))

Expect(job.Spec.Template.Spec.InitContainers[1].EnvFrom).To(BeNil())
Expect(job.Spec.Template.Spec.Containers[0].EnvFrom).To(ContainElements(
corev1.EnvFromSource{
ConfigMapRef: &corev1.ConfigMapEnvSource{
LocalObjectReference: corev1.LocalObjectReference{Name: "test-configmap"},
},
},
))
})

It("can include volume and volume mounts", func() {
pipelines[0].Spec.Volumes = []corev1.Volume{
{Name: "test-volume", VolumeSource: corev1.VolumeSource{EmptyDir: &corev1.EmptyDirVolumeSource{}}},
}
pipelines[0].Spec.Containers = append(pipelines[0].Spec.Containers, v1alpha1.Container{
Name: "another-container",
Image: "another-image",
VolumeMounts: []corev1.VolumeMount{
{Name: "test-volume-mount", MountPath: "/test-mount-path"},
},
})
resources := pipeline.NewDelete(rr, pipelines, "", "test-promise", "promises")
job := resources[3].(*batchv1.Job)

Expect(job.Spec.Template.Spec.InitContainers[1].VolumeMounts).To(HaveLen(3), "default volume mounts should've been included")
Expect(job.Spec.Template.Spec.InitContainers[1].Command).To(BeEmpty())
Expect(job.Spec.Template.Spec.Containers[0].VolumeMounts).To(ContainElement(
corev1.VolumeMount{Name: "test-volume-mount", MountPath: "/test-mount-path"},
))
Expect(job.Spec.Template.Spec.Volumes).To(ContainElement(
corev1.Volume{Name: "test-volume", VolumeSource: corev1.VolumeSource{EmptyDir: &corev1.EmptyDirVolumeSource{}}},
))
})

It("can include imagePullPolicy and imagePullSecrets", func() {
pipelines[0].Spec.ImagePullSecrets = []corev1.LocalObjectReference{{Name: "test-secret"}, {Name: "another-secret"}}
pipelines[0].Spec.Containers = append(pipelines[0].Spec.Containers, v1alpha1.Container{
Name: "another-container",
Image: "another-image",
ImagePullPolicy: corev1.PullAlways,
})
resources := pipeline.NewDelete(rr, pipelines, "", "test-promise", "promises")
job := resources[3].(*batchv1.Job)

Expect(job.Spec.Template.Spec.ImagePullSecrets).To(HaveLen(2), "imagePullSecrets should've been included")
Expect(job.Spec.Template.Spec.ImagePullSecrets).To(ContainElements(
corev1.LocalObjectReference{Name: "test-secret"},
corev1.LocalObjectReference{Name: "another-secret"},
), "imagePullSecrets should've been included")
Expect(job.Spec.Template.Spec.InitContainers[1].ImagePullPolicy).To(BeEmpty())
Expect(job.Spec.Template.Spec.Containers[0].ImagePullPolicy).To(Equal(corev1.PullAlways))
})
})
})

func promiseFromFile(path string) *v1alpha1.Promise {
Expand Down
35 changes: 34 additions & 1 deletion lib/pipeline/shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ const (
kratixPromiseEnvVar = "KRATIX_PROMISE_NAME"
)

func pipelineVolumes() ([]v1.Volume, []v1.VolumeMount) {
func defaultPipelineVolumes() ([]v1.Volume, []v1.VolumeMount) {
volumes := []v1.Volume{
{Name: "shared-input", VolumeSource: v1.VolumeSource{EmptyDir: &v1.EmptyDirVolumeSource{}}},
{Name: "shared-output", VolumeSource: v1.VolumeSource{EmptyDir: &v1.EmptyDirVolumeSource{}}},
Expand Down Expand Up @@ -182,6 +182,39 @@ func readerContainer(obj *unstructured.Unstructured, kratixWorkflowType v1alpha1
return readerContainer
}

func generateContainersAndVolumes(obj *unstructured.Unstructured, workflowType v1alpha1.Type, pipelines []v1alpha1.Pipeline, kratixEnvVars []v1.EnvVar) ([]v1.Container, []v1.Volume) {
volumes, volumeMounts := defaultPipelineVolumes()

readerContainer := readerContainer(obj, workflowType, "shared-input")
containers := []v1.Container{
readerContainer,
}

if len(pipelines) > 0 {
if len(pipelines[0].Spec.Volumes) > 0 {
volumes = append(volumes, pipelines[0].Spec.Volumes...)
}
for _, c := range pipelines[0].Spec.Containers {
if len(c.VolumeMounts) > 0 {
volumeMounts = append(volumeMounts, c.VolumeMounts...)
}

containers = append(containers, v1.Container{
Name: c.Name,
Image: c.Image,
VolumeMounts: volumeMounts,
Args: c.Args,
Command: c.Command,
Env: append(kratixEnvVars, c.Env...),
EnvFrom: c.EnvFrom,
ImagePullPolicy: c.ImagePullPolicy,
})
}
}

return containers, volumes
}

// TODO(breaking) change this to {promiseIdentifier}-{pipelineType}-pipeline-{short-uuid}
// for consistency with other resource names (e.g. service account)
func pipelineName(pipelineType, promiseIdentifier string) string {
Expand Down

0 comments on commit f680db0

Please sign in to comment.