Skip to content

Commit

Permalink
Merge pull request #179 from syntasso/refactor-pipelines
Browse files Browse the repository at this point in the history
refactor: move pipeline resource generation into the pipelines types
  • Loading branch information
kirederik authored Jun 25, 2024
2 parents 836afb5 + 3b771ad commit 67cda32
Show file tree
Hide file tree
Showing 25 changed files with 1,554 additions and 2,112 deletions.
550 changes: 550 additions & 0 deletions api/v1alpha1/pipeline_types.go

Large diffs are not rendered by default.

760 changes: 760 additions & 0 deletions api/v1alpha1/pipeline_types_test.go

Large diffs are not rendered by default.

184 changes: 111 additions & 73 deletions api/v1alpha1/promise_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@ import (
"encoding/json"
"fmt"
"io"
"strconv"

"github.com/go-logr/logr"
"gopkg.in/yaml.v2"
v1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
corev1 "k8s.io/api/core/v1"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
Expand Down Expand Up @@ -151,13 +153,6 @@ type Promise struct {
Status PromiseStatus `json:"status,omitempty"`
}

type PromisePipelines struct {
DeleteResource []Pipeline
ConfigureResource []Pipeline
ConfigurePromise []Pipeline
DeletePromise []Pipeline
}

var ErrNoAPI = fmt.Errorf("promise does not contain an API")

func SquashPromiseScheduling(scheduling []PromiseScheduling) map[string]string {
Expand Down Expand Up @@ -195,12 +190,12 @@ func (p *Promise) DoesNotContainAPI() bool {
return p.Spec.API == nil || p.Spec.API.Raw == nil
}

func (p *Promise) GetAPIAsCRD() (*v1.CustomResourceDefinition, error) {
func (p *Promise) GetAPIAsCRD() (*apiextensionsv1.CustomResourceDefinition, error) {
if p.DoesNotContainAPI() {
return nil, ErrNoAPI
}

crd := v1.CustomResourceDefinition{}
crd := apiextensionsv1.CustomResourceDefinition{}
if err := json.Unmarshal(p.Spec.API.Raw, &crd); err != nil {
return nil, fmt.Errorf("api is not a valid CRD: %w", err)
}
Expand Down Expand Up @@ -244,69 +239,6 @@ func (p *Promise) ToUnstructured() (*unstructured.Unstructured, error) {
return unstructuredPromise, nil
}

func (p *Promise) GeneratePipelines(logger logr.Logger) (PromisePipelines, error) {
pipelineWorkflows := [][]unstructured.Unstructured{
p.Spec.Workflows.Resource.Configure,
p.Spec.Workflows.Resource.Delete,
p.Spec.Workflows.Promise.Configure,
p.Spec.Workflows.Promise.Delete,
}

var pipelines [][]Pipeline
for _, pipeline := range pipelineWorkflows {
p, err := generatePipeline(pipeline, logger)
if err != nil {
return PromisePipelines{}, err
}
pipelines = append(pipelines, p)
}

return PromisePipelines{
ConfigureResource: pipelines[0],
DeleteResource: pipelines[1],
ConfigurePromise: pipelines[2],
DeletePromise: pipelines[3],
}, nil
}

func generatePipeline(pipelines []unstructured.Unstructured, logger logr.Logger) ([]Pipeline, error) {
if len(pipelines) == 0 {
return nil, nil
}

//We only support 1 pipeline for now
ps := []Pipeline{}
for _, pipeline := range pipelines {
pipelineLogger := logger.WithValues(
"pipelineKind", pipeline.GetKind(),
"pipelineVersion", pipeline.GetAPIVersion(),
"pipelineName", pipeline.GetName())

if pipeline.GetKind() == "Pipeline" && pipeline.GetAPIVersion() == "platform.kratix.io/v1alpha1" {
jsonPipeline, err := pipeline.MarshalJSON()
if err != nil {
// TODO test
pipelineLogger.Error(err, "Failed marshalling pipeline to json")
return nil, err
}

p := Pipeline{}
err = json.Unmarshal(jsonPipeline, &p)
if err != nil {
// TODO test
pipelineLogger.Error(err, "Failed unmarshalling pipeline")
return nil, err
}
ps = append(ps, p)
} else {
return nil, fmt.Errorf("unsupported pipeline %q (%s.%s)",
pipeline.GetName(), pipeline.GetKind(), pipeline.GetAPIVersion())
}
}
return ps, nil

}

func (d Dependencies) Marshal() ([]byte, error) {
buf := new(bytes.Buffer)
encoder := yaml.NewEncoder(buf)
Expand All @@ -332,3 +264,109 @@ type PromiseList struct {
func init() {
SchemeBuilder.Register(&Promise{}, &PromiseList{})
}

func (p *Promise) GetWorkloadGroupScheduling() []WorkloadGroupScheduling {
workloadGroupScheduling := []WorkloadGroupScheduling{}
for _, scheduling := range p.Spec.DestinationSelectors {
workloadGroupScheduling = append(workloadGroupScheduling, WorkloadGroupScheduling{
MatchLabels: scheduling.MatchLabels,
Source: "promise",
})
}

return workloadGroupScheduling
}

func (p *Promise) generatePipelinesObjects(workflowType Type, workflowAction Action, crd *apiextensionsv1.CustomResourceDefinition, resourceRequest *unstructured.Unstructured, logger logr.Logger) ([]PipelineJobResources, error) {
promisePipelines, err := NewPipelinesMap(p, logger)
if err != nil {
return nil, err
}

var allResources []PipelineJobResources
pipelines := promisePipelines[workflowType][workflowAction]

lastIndex := len(pipelines) - 1
for i, pipe := range pipelines {
isLast := i == lastIndex
additionalJobEnv := []corev1.EnvVar{
{Name: "IS_LAST_PIPELINE", Value: strconv.FormatBool(isLast)},
}

var resources PipelineJobResources
var err error
switch workflowType {
case WorkflowTypeResource:
resources, err = pipe.ForResource(p, workflowAction, resourceRequest).Resources(additionalJobEnv)
case WorkflowTypePromise:
resources, err = pipe.ForPromise(p, workflowAction).Resources(additionalJobEnv)
}
if err != nil {
return nil, err
}

allResources = append(allResources, resources)
}
return allResources, nil
}

func (p *Promise) GeneratePromisePipelines(workflowAction Action, logger logr.Logger) ([]PipelineJobResources, error) {
return p.generatePipelinesObjects(WorkflowTypePromise, workflowAction, nil, nil, logger)
}

func (p *Promise) GenerateResourcePipelines(workflowAction Action, crd *apiextensionsv1.CustomResourceDefinition, resourceRequest *unstructured.Unstructured, logger logr.Logger) ([]PipelineJobResources, error) {
return p.generatePipelinesObjects(WorkflowTypeResource, workflowAction, crd, resourceRequest, logger)
}

func (p *Promise) HasPipeline(workflowType Type, workflowAction Action) bool {
switch workflowType {
case WorkflowTypeResource:
switch workflowAction {
case WorkflowActionConfigure:
return len(p.Spec.Workflows.Resource.Configure) > 0
case WorkflowActionDelete:
return len(p.Spec.Workflows.Resource.Delete) > 0
}
case WorkflowTypePromise:
switch workflowAction {
case WorkflowActionConfigure:
return len(p.Spec.Workflows.Promise.Configure) > 0
case WorkflowActionDelete:
return len(p.Spec.Workflows.Promise.Delete) > 0
}
}
return false
}

type pipelineMap map[Type]map[Action][]Pipeline

func NewPipelinesMap(promise *Promise, logger logr.Logger) (pipelineMap, error) {
unstructuredMap := map[Type]map[Action][]unstructured.Unstructured{
WorkflowTypeResource: {
WorkflowActionConfigure: promise.Spec.Workflows.Resource.Configure,
WorkflowActionDelete: promise.Spec.Workflows.Resource.Delete,
},
WorkflowTypePromise: {
WorkflowActionConfigure: promise.Spec.Workflows.Promise.Configure,
WorkflowActionDelete: promise.Spec.Workflows.Promise.Delete,
},
}

pipelinesMap := map[Type]map[Action][]Pipeline{}

for _, t := range []Type{WorkflowTypeResource, WorkflowTypePromise} {
if _, ok := pipelinesMap[t]; !ok {
pipelinesMap[t] = map[Action][]Pipeline{}
}
for _, a := range []Action{WorkflowActionConfigure, WorkflowActionDelete} {
pipelines, err := PipelinesFromUnstructured(unstructuredMap[t][a], logger)
if err != nil {
return nil, err
}
pipelinesMap[t][a] = pipelines
}

}

return pipelinesMap, nil
}
5 changes: 4 additions & 1 deletion api/v1alpha1/v1alpha1_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package v1alpha1
package v1alpha1_test

import (
"testing"
Expand All @@ -27,8 +27,11 @@ import (
// These tests use Ginkgo (BDD-style Go testing framework). Refer to
// http://onsi.github.io/ginkgo/ to learn more about Ginkgo.

var workCreatorImage = "work-creator:latest"

func TestAPIs(t *testing.T) {
RegisterFailHandler(Fail)
t.Setenv("WC_IMG", workCreatorImage)

RunSpecs(t, "v1alpha1 API Suite")
}
1 change: 1 addition & 0 deletions api/v1alpha1/work_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ const (
ResourceNameLabel = KratixPrefix + "resource-name"
PipelineNameLabel = KratixPrefix + "pipeline-name"
WorkTypeLabel = KratixPrefix + "work-type"
WorkActionLabel = KratixPrefix + "work-action"

WorkTypePromise = "promise"
WorkTypeResource = "resource"
Expand Down
72 changes: 29 additions & 43 deletions api/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 67cda32

Please sign in to comment.