From 7c2a00d6adf638494808a6399783af49a0452003 Mon Sep 17 00:00:00 2001 From: Vladislav Sukhin Date: Fri, 8 Nov 2024 17:52:56 +0300 Subject: [PATCH 01/20] feat: run test workflows by selector Signed-off-by: Vladislav Sukhin --- .../commands/testworkflows/run.go | 85 ++++++++++++------ internal/app/api/v1/server.go | 1 + internal/app/api/v1/testworkflows.go | 90 +++++++++++++------ pkg/api/v1/client/interface.go | 1 + pkg/api/v1/client/testworkflow.go | 19 ++++ 5 files changed, 145 insertions(+), 51 deletions(-) diff --git a/cmd/kubectl-testkube/commands/testworkflows/run.go b/cmd/kubectl-testkube/commands/testworkflows/run.go index d0f10c51ba..12cc026901 100644 --- a/cmd/kubectl-testkube/commands/testworkflows/run.go +++ b/cmd/kubectl-testkube/commands/testworkflows/run.go @@ -3,6 +3,7 @@ package testworkflows import ( "bufio" "bytes" + "context" "errors" "fmt" "os" @@ -45,12 +46,13 @@ func NewRunTestWorkflowCmd() *cobra.Command { format string masks []string tags map[string]string + selectors []string + parallelism int ) cmd := &cobra.Command{ Use: "testworkflow [name]", Aliases: []string{"testworkflows", "tw"}, - Args: cobra.ExactArgs(1), Short: "Starts test workflow execution", Run: func(cmd *cobra.Command, args []string) { @@ -65,7 +67,6 @@ func NewRunTestWorkflowCmd() *cobra.Command { client, _, err := common.GetClient(cmd) ui.ExitOnError("getting client", err) - name := args[0] runContext := telemetry.GetCliRunContext() interfaceType := testkube.CICD_TestWorkflowRunningContextInterfaceType if runContext == "others|local" { @@ -83,13 +84,29 @@ func NewRunTestWorkflowCmd() *cobra.Command { runningContext = tclcmd.GetRunningContext(runContext, cfg.CloudContext.ApiKey, interfaceType) } - execution, err := client.ExecuteTestWorkflow(name, testkube.TestWorkflowExecutionRequest{ + request := testkube.TestWorkflowExecutionRequest{ Name: executionName, Config: config, DisableWebhooks: disableWebhooks, Tags: tags, RunningContext: runningContext, - }) + } + + var executions []testkube.TestWorkflowExecution + switch { + case len(args) > 0: + name := args[0] + + var execution testkube.TestWorkflowExecution + execution, err = client.ExecuteTestWorkflow(name, request) + executions = append(executions, execution) + case len(selectors) != 0: + selector := strings.Join(selectors, ",") + executions, err = client.ExecuteTestWorkflows(selector, parallelism, request) + default: + ui.Failf("Pass Test workflow name or labels to run by labels ") + } + if err != nil { // User friendly Open Source operation error errMessage := err.Error() @@ -108,33 +125,47 @@ func NewRunTestWorkflowCmd() *cobra.Command { } } - ui.ExitOnError("execute test workflow "+name+" from namespace "+namespace, err) - err = renderer.PrintTestWorkflowExecution(cmd, os.Stdout, execution) - ui.ExitOnError("render test workflow execution", err) - - var exitCode = 0 - if outputPretty { - ui.NL() - if !execution.FailedToInitialize() { - if watchEnabled { - exitCode = uiWatch(execution, client) - ui.NL() - if downloadArtifactsEnabled { - tests.DownloadTestWorkflowArtifacts(execution.Id, downloadDir, format, masks, client, outputPretty) + if len(args) > 0 { + ui.ExitOnError("execute test workflow "+args[0]+" from namespace "+namespace, err) + } else { + ui.ExitOnError("execute test workflows "+strings.Join(selectors, ",")+" from namespace "+namespace, err) + } + + go func() { + <-cmd.Context().Done() + if errors.Is(cmd.Context().Err(), context.Canceled) { + os.Exit(0) + } + }() + + for _, execution := range executions { + err = renderer.PrintTestWorkflowExecution(cmd, os.Stdout, execution) + ui.ExitOnError("render test workflow execution", err) + + var exitCode = 0 + if outputPretty { + ui.NL() + if !execution.FailedToInitialize() { + if watchEnabled && len(args) > 0 { + exitCode = uiWatch(execution, client) + ui.NL() + if downloadArtifactsEnabled { + tests.DownloadTestWorkflowArtifacts(execution.Id, downloadDir, format, masks, client, outputPretty) + } + } else { + uiShellWatchExecution(execution.Id) } - } else { - uiShellWatchExecution(execution.Id) } - } - execution, err = client.GetTestWorkflowExecution(execution.Id) - ui.ExitOnError("get execution failed", err) + execution, err = client.GetTestWorkflowExecution(execution.Id) + ui.ExitOnError("get execution failed", err) - render.PrintTestWorkflowExecutionURIs(&execution) - uiShellGetExecution(execution.Id) - } + render.PrintTestWorkflowExecutionURIs(&execution) + uiShellGetExecution(execution.Id) + } - os.Exit(exitCode) + os.Exit(exitCode) + } }, } @@ -148,6 +179,8 @@ func NewRunTestWorkflowCmd() *cobra.Command { cmd.Flags().StringVar(&format, "format", "folder", "data format for storing files, one of folder|archive") cmd.Flags().StringArrayVarP(&masks, "mask", "", []string{}, "regexp to filter downloaded files, single or comma separated, like report/.* or .*\\.json,.*\\.js$") cmd.Flags().StringToStringVarP(&tags, "tag", "", map[string]string{}, "execution tags in a form of name1=val1 passed to executor") + cmd.Flags().StringSliceVarP(&selectors, "label", "l", nil, "label key value pair: --label key1=value1") + cmd.Flags().IntVar(¶llelism, "parallelism", 10, "parallelism for multiple test workflow execution") return cmd } diff --git a/internal/app/api/v1/server.go b/internal/app/api/v1/server.go index e555f095ec..615f1e52c7 100644 --- a/internal/app/api/v1/server.go +++ b/internal/app/api/v1/server.go @@ -147,6 +147,7 @@ func (s *TestkubeAPI) Init(server server.HTTPServer) { testWorkflowExecutions := root.Group("/test-workflow-executions") testWorkflowExecutions.Get("/", s.ListTestWorkflowExecutionsHandler()) + testWorkflowExecutions.Post("/", s.ExecuteTestWorkflowHandler()) testWorkflowExecutions.Get("/:executionID", s.GetTestWorkflowExecutionHandler()) testWorkflowExecutions.Get("/:executionID/notifications", s.StreamTestWorkflowExecutionNotificationsHandler()) testWorkflowExecutions.Get("/:executionID/notifications/stream", s.StreamTestWorkflowExecutionNotificationsWebSocketHandler()) diff --git a/internal/app/api/v1/testworkflows.go b/internal/app/api/v1/testworkflows.go index 4602eb555a..cd5c73a378 100644 --- a/internal/app/api/v1/testworkflows.go +++ b/internal/app/api/v1/testworkflows.go @@ -9,6 +9,7 @@ import ( "github.com/gofiber/fiber/v2" "github.com/pkg/errors" + k8serrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" testworkflowsv1 "github.com/kubeshop/testkube-operator/api/testworkflows/v1" @@ -346,12 +347,31 @@ func (s *TestkubeAPI) PreviewTestWorkflowHandler() fiber.Handler { // TODO: Add metrics func (s *TestkubeAPI) ExecuteTestWorkflowHandler() fiber.Handler { return func(c *fiber.Ctx) (err error) { - ctx := c.Context() name := c.Params("id") - errPrefix := fmt.Sprintf("failed to execute test workflow '%s'", name) - workflow, err := s.TestWorkflowsClient.Get(name) - if err != nil { - return s.ClientError(c, errPrefix, err) + selector := c.Query("selector") + s.Log.Debugw("getting test suite", "name", name, "selector", selector) + + errPrefix := "failed to execute test workflow" + + var testWorkflows []testworkflowsv1.TestWorkflow + if name != "" { + errPrefix = errPrefix + " " + name + testWorkflow, err := s.TestWorkflowsClient.Get(name) + if err != nil { + if k8serrors.IsNotFound(err) { + return s.Warn(c, http.StatusNotFound, fmt.Errorf("%s: test workflow not found: %w", errPrefix, err)) + } + + return s.Error(c, http.StatusBadGateway, fmt.Errorf("%s: client could get test workflow: %w", errPrefix, err)) + } + testWorkflows = append(testWorkflows, *testWorkflow) + } else { + testWorkflowList, err := s.TestWorkflowsClient.List(selector) + if err != nil { + return s.Error(c, http.StatusBadGateway, fmt.Errorf("%s: can't list test workflows: %w", errPrefix, err)) + } + + testWorkflows = append(testWorkflows, testWorkflowList.Items...) } // Load the execution request @@ -371,25 +391,23 @@ func (s *TestkubeAPI) ExecuteTestWorkflowHandler() fiber.Handler { var results []testkube.TestWorkflowExecution var errs []error - request.TestWorkflowExecutionName = strings.Clone(c.Query("testWorkflowExecutionName")) - concurrencyLevel := scheduler.DefaultConcurrencyLevel - workerpoolService := workerpool.New[testworkflowsv1.TestWorkflow, testkube.TestWorkflowExecutionRequest, - testkube.TestWorkflowExecution](concurrencyLevel) - requests := []workerpool.Request[testworkflowsv1.TestWorkflow, testkube.TestWorkflowExecutionRequest, testkube.TestWorkflowExecution]{ - { - Object: *workflow, - Options: request, - ExecFn: s.TestWorkflowExecutor.Execute, - }, - } - - go workerpoolService.SendRequests(requests) - go workerpoolService.Run(ctx) - - for r := range workerpoolService.GetResponses() { - results = append(results, r.Result) - if r.Err != nil { - errs = append(errs, r.Err) + if len(testWorkflows) != 0 { + request.TestWorkflowExecutionName = strings.Clone(c.Query("testWorkflowExecutionName")) + parallelism, err := strconv.Atoi(c.Query("parallelism", strconv.Itoa(scheduler.DefaultConcurrencyLevel))) + if err != nil { + return s.Error(c, http.StatusBadRequest, fmt.Errorf("%s: can't detect parallelism: %w", errPrefix, err)) + } + + workerpoolService := workerpool.New[testworkflowsv1.TestWorkflow, testkube.TestWorkflowExecutionRequest, testkube.TestWorkflowExecution](parallelism) + + go workerpoolService.SendRequests(s.prepareTestWorkflowRequests(testWorkflows, request)) + go workerpoolService.Run(c.Context()) + + for r := range workerpoolService.GetResponses() { + results = append(results, r.Result) + if r.Err != nil { + errs = append(errs, r.Err) + } } } @@ -397,8 +415,13 @@ func (s *TestkubeAPI) ExecuteTestWorkflowHandler() fiber.Handler { return s.InternalError(c, errPrefix, "execution error", errs[0]) } + s.Log.Debugw("executing test workflow", "name", name, "selector", selector) if len(results) != 0 { - return c.JSON(results[0]) + if name != "" { + return c.JSON(results[0]) + } + + return c.JSON(results) } return s.InternalError(c, errPrefix, "error", errors.New("no execution results")) @@ -423,3 +446,20 @@ func (s *TestkubeAPI) getFilteredTestWorkflowList(c *fiber.Ctx) (*testworkflowsv return crWorkflows, nil } + +func (s *TestkubeAPI) prepareTestWorkflowRequests(work []testworkflowsv1.TestWorkflow, request testkube.TestWorkflowExecutionRequest) []workerpool.Request[ + testworkflowsv1.TestWorkflow, + testkube.TestWorkflowExecutionRequest, + testkube.TestWorkflowExecution, +] { + requests := make([]workerpool.Request[testworkflowsv1.TestWorkflow, testkube.TestWorkflowExecutionRequest, testkube.TestWorkflowExecution], len(work)) + for i := range work { + requests[i] = workerpool.Request[testworkflowsv1.TestWorkflow, testkube.TestWorkflowExecutionRequest, testkube.TestWorkflowExecution]{ + Object: work[i], + Options: request, + ExecFn: s.TestWorkflowExecutor.Execute, + } + } + + return requests +} diff --git a/pkg/api/v1/client/interface.go b/pkg/api/v1/client/interface.go index 554a3faaf6..491bbc565e 100644 --- a/pkg/api/v1/client/interface.go +++ b/pkg/api/v1/client/interface.go @@ -151,6 +151,7 @@ type TestWorkflowAPI interface { UpdateTestWorkflow(workflow testkube.TestWorkflow) (testkube.TestWorkflow, error) DeleteTestWorkflow(name string) error ExecuteTestWorkflow(name string, request testkube.TestWorkflowExecutionRequest) (testkube.TestWorkflowExecution, error) + ExecuteTestWorkflows(sselector string, parallelism int, request testkube.TestWorkflowExecutionRequest) ([]testkube.TestWorkflowExecution, error) GetTestWorkflowExecutionNotifications(id string) (chan testkube.TestWorkflowExecutionNotification, error) GetTestWorkflowExecutionLogs(id string) ([]byte, error) } diff --git a/pkg/api/v1/client/testworkflow.go b/pkg/api/v1/client/testworkflow.go index 3c929be660..b2f8980656 100644 --- a/pkg/api/v1/client/testworkflow.go +++ b/pkg/api/v1/client/testworkflow.go @@ -5,6 +5,7 @@ import ( "fmt" "net/http" "net/url" + "strconv" "github.com/kubeshop/testkube/pkg/api/v1/testkube" ) @@ -121,6 +122,24 @@ func (c TestWorkflowClient) ExecuteTestWorkflow(name string, request testkube.Te return c.testWorkflowExecutionTransport.Execute(http.MethodPost, uri, body, nil) } +// ExecuteTestWorkflows starts new external test workflow executions, reads data and returns IDs +// Executions are started asynchronously client can check later for results +func (c TestWorkflowClient) ExecuteTestWorkflows(selector string, parallelism int, request testkube.TestWorkflowExecutionRequest) (executions []testkube.TestWorkflowExecution, err error) { + uri := c.testWorkflowExecutionTransport.GetURI("/test-workflow-executions") + + body, err := json.Marshal(request) + if err != nil { + return executions, err + } + + params := map[string]string{ + "selector": selector, + "parallelism": strconv.Itoa(parallelism), + } + + return c.testWorkflowExecutionTransport.ExecuteMultiple(http.MethodPost, uri, body, params) +} + // GetTestWorkflowExecutionNotifications returns events stream from job pods, based on job pods logs func (c TestWorkflowClient) GetTestWorkflowExecutionNotifications(id string) (notifications chan testkube.TestWorkflowExecutionNotification, err error) { notifications = make(chan testkube.TestWorkflowExecutionNotification) From 72315f9a57134a3dafdeff44441a3d749316a40d Mon Sep 17 00:00:00 2001 From: Vladislav Sukhin Date: Fri, 8 Nov 2024 17:57:11 +0300 Subject: [PATCH 02/20] fix: log typo Signed-off-by: Vladislav Sukhin --- internal/app/api/v1/testworkflows.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/app/api/v1/testworkflows.go b/internal/app/api/v1/testworkflows.go index cd5c73a378..682c033b72 100644 --- a/internal/app/api/v1/testworkflows.go +++ b/internal/app/api/v1/testworkflows.go @@ -349,7 +349,7 @@ func (s *TestkubeAPI) ExecuteTestWorkflowHandler() fiber.Handler { return func(c *fiber.Ctx) (err error) { name := c.Params("id") selector := c.Query("selector") - s.Log.Debugw("getting test suite", "name", name, "selector", selector) + s.Log.Debugw("getting test workflow", "name", name, "selector", selector) errPrefix := "failed to execute test workflow" From dfde32e6a0ef98f6530cfc5b3b3caec3217a9d69 Mon Sep 17 00:00:00 2001 From: Vladislav Sukhin Date: Fri, 8 Nov 2024 22:14:29 +0300 Subject: [PATCH 03/20] feat: selector for execute op Signed-off-by: Vladislav Sukhin --- api/v1/testkube.yaml | 3 +++ .../model_test_workflow_step_execute_test_workflow_ref.go | 3 ++- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/api/v1/testkube.yaml b/api/v1/testkube.yaml index 1a7068d0e5..f462bdaa0b 100644 --- a/api/v1/testkube.yaml +++ b/api/v1/testkube.yaml @@ -8990,6 +8990,9 @@ components: $ref: "#/components/schemas/TestWorkflowTarballRequest" config: $ref: "#/components/schemas/TestWorkflowConfigValue" + selector: + $ref: "https://raw.githubusercontent.com/garethr/kubernetes-json-schema/master/v1.7.8/_definitions.json#/definitions/io.k8s.apimachinery.pkg.apis.meta.v1.LabelSelector" + description: label selector for test workflow TestWorkflowStepExecuteTestRef: type: object diff --git a/pkg/api/v1/testkube/model_test_workflow_step_execute_test_workflow_ref.go b/pkg/api/v1/testkube/model_test_workflow_step_execute_test_workflow_ref.go index c2bcd5b1d1..e0c8f3156e 100644 --- a/pkg/api/v1/testkube/model_test_workflow_step_execute_test_workflow_ref.go +++ b/pkg/api/v1/testkube/model_test_workflow_step_execute_test_workflow_ref.go @@ -23,5 +23,6 @@ type TestWorkflowStepExecuteTestWorkflowRef struct { // matrix of parameters to spawn instances Matrix map[string]interface{} `json:"matrix,omitempty"` // parameters that should be distributed across sharded instances - Shards map[string]interface{} `json:"shards,omitempty"` + Shards map[string]interface{} `json:"shards,omitempty"` + Selector *IoK8sApimachineryPkgApisMetaV1LabelSelector `json:"selector,omitempty"` } From 7b80920aafe187edcb957d622af3695d27ec99a3 Mon Sep 17 00:00:00 2001 From: Vladislav Sukhin Date: Mon, 11 Nov 2024 15:59:20 +0300 Subject: [PATCH 04/20] fix: dep update Signed-off-by: Vladislav Sukhin --- go.mod | 4 ++-- go.sum | 4 ++-- pkg/mapper/testworkflows/kube_openapi.go | 16 ++++++++++++++++ pkg/mapper/testworkflows/openapi_kube.go | 16 ++++++++++++++++ 4 files changed, 36 insertions(+), 4 deletions(-) diff --git a/go.mod b/go.mod index 9cee87d6da..a09c966418 100644 --- a/go.mod +++ b/go.mod @@ -40,7 +40,7 @@ require ( github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 github.com/kelseyhightower/envconfig v1.4.0 github.com/kubepug/kubepug v1.7.1 - github.com/kubeshop/testkube-operator v1.17.55-0.20241030092155-2a57f6e797e9 + github.com/kubeshop/testkube-operator v1.17.55-0.20241111112201-61ba79ec4899 github.com/minio/minio-go/v7 v7.0.47 github.com/montanaflynn/stats v0.6.6 github.com/moogar0880/problems v0.1.1 @@ -55,6 +55,7 @@ require ( github.com/prometheus/client_golang v1.18.0 github.com/pterm/pterm v0.12.79 github.com/robfig/cron v1.2.0 + github.com/savioxavier/termlink v1.4.1 github.com/segmentio/analytics-go/v3 v3.2.1 github.com/shirou/gopsutil/v3 v3.24.3 github.com/skratchdot/open-golang v0.0.0-20200116055534-eef842397966 @@ -185,7 +186,6 @@ require ( github.com/rs/xid v1.4.0 // indirect github.com/russross/blackfriday/v2 v2.1.0 // indirect github.com/santhosh-tekuri/jsonschema/v5 v5.0.0 // indirect - github.com/savioxavier/termlink v1.4.1 // indirect github.com/savsgio/gotils v0.0.0-20211223103454-d0aaa54c5899 // indirect github.com/segmentio/backo-go v1.0.0 // indirect github.com/shoenig/go-m1cpu v0.1.6 // indirect diff --git a/go.sum b/go.sum index cd0cd7ac9e..67c5a685e1 100644 --- a/go.sum +++ b/go.sum @@ -402,8 +402,8 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/kubepug/kubepug v1.7.1 h1:LKhfSxS8Y5mXs50v+3Lpyec+cogErDLcV7CMUuiaisw= github.com/kubepug/kubepug v1.7.1/go.mod h1:lv+HxD0oTFL7ZWjj0u6HKhMbbTIId3eG7aWIW0gyF8g= -github.com/kubeshop/testkube-operator v1.17.55-0.20241030092155-2a57f6e797e9 h1:0v4W4kPfuDBJxvfkgKhDFA71AgmV0B5Jdb8dR7n4bV4= -github.com/kubeshop/testkube-operator v1.17.55-0.20241030092155-2a57f6e797e9/go.mod h1:P47tw1nKQFufdsZndyq2HG2MSa0zK/lU0XpRfZtEmIk= +github.com/kubeshop/testkube-operator v1.17.55-0.20241111112201-61ba79ec4899 h1:mbUblLseBd2XosHWmfwonxTJU8w01hsuNXyUEa2ZjWA= +github.com/kubeshop/testkube-operator v1.17.55-0.20241111112201-61ba79ec4899/go.mod h1:P47tw1nKQFufdsZndyq2HG2MSa0zK/lU0XpRfZtEmIk= github.com/leodido/go-urn v1.2.1 h1:BqpAaACuzVSgi/VLzGZIobT2z4v53pjosyNd9Yv6n/w= github.com/leodido/go-urn v1.2.1/go.mod h1:zt4jvISO2HfUBqxjfIshjdMTYS56ZS/qv49ictyFfxY= github.com/lithammer/fuzzysearch v1.1.8 h1:/HIuJnjHuXS8bKaiTMeeDlW2/AyIWk2brx1V8LFgLN4= diff --git a/pkg/mapper/testworkflows/kube_openapi.go b/pkg/mapper/testworkflows/kube_openapi.go index c0664f9d93..5785762402 100644 --- a/pkg/mapper/testworkflows/kube_openapi.go +++ b/pkg/mapper/testworkflows/kube_openapi.go @@ -806,6 +806,21 @@ func MapStepExecuteTestKubeToAPI(v testworkflowsv1.StepExecuteTest) testkube.Tes } } +func MapLabelSelectorRequirementToAPI(v metav1.LabelSelectorRequirement) testkube.IoK8sApimachineryPkgApisMetaV1LabelSelectorRequirement { + return testkube.IoK8sApimachineryPkgApisMetaV1LabelSelectorRequirement{ + Key: v.Key, + Operator: string(v.Operator), + Values: v.Values, + } +} + +func MapSelectorToAPI(v metav1.LabelSelector) testkube.IoK8sApimachineryPkgApisMetaV1LabelSelector { + return testkube.IoK8sApimachineryPkgApisMetaV1LabelSelector{ + MatchLabels: v.MatchLabels, + MatchExpressions: common.MapSlice(v.MatchExpressions, MapLabelSelectorRequirementToAPI), + } +} + func MapStepExecuteTestWorkflowKubeToAPI(v testworkflowsv1.StepExecuteWorkflow) testkube.TestWorkflowStepExecuteTestWorkflowRef { return testkube.TestWorkflowStepExecuteTestWorkflowRef{ Name: v.Name, @@ -817,6 +832,7 @@ func MapStepExecuteTestWorkflowKubeToAPI(v testworkflowsv1.StepExecuteWorkflow) MaxCount: MapIntOrStringToBoxedString(v.MaxCount), Matrix: MapDynamicListMapKubeToAPI(v.Matrix), Shards: MapDynamicListMapKubeToAPI(v.Shards), + Selector: common.MapPtr(v.Selector, MapSelectorToAPI), } } diff --git a/pkg/mapper/testworkflows/openapi_kube.go b/pkg/mapper/testworkflows/openapi_kube.go index 844e6da049..79b1b97d8f 100644 --- a/pkg/mapper/testworkflows/openapi_kube.go +++ b/pkg/mapper/testworkflows/openapi_kube.go @@ -846,6 +846,21 @@ func MapStepExecuteTestAPIToKube(v testkube.TestWorkflowStepExecuteTestRef) test } } +func MapLabelSelectorRequirementToCRD(v testkube.IoK8sApimachineryPkgApisMetaV1LabelSelectorRequirement) metav1.LabelSelectorRequirement { + return metav1.LabelSelectorRequirement{ + Key: v.Key, + Operator: metav1.LabelSelectorOperator(v.Operator), + Values: v.Values, + } +} + +func MapSelectorToCRD(v testkube.IoK8sApimachineryPkgApisMetaV1LabelSelector) metav1.LabelSelector { + return metav1.LabelSelector{ + MatchLabels: v.MatchLabels, + MatchExpressions: common.MapSlice(v.MatchExpressions, MapLabelSelectorRequirementToCRD), + } +} + func MapStepExecuteTestWorkflowAPIToKube(v testkube.TestWorkflowStepExecuteTestWorkflowRef) testworkflowsv1.StepExecuteWorkflow { return testworkflowsv1.StepExecuteWorkflow{ Name: v.Name, @@ -859,6 +874,7 @@ func MapStepExecuteTestWorkflowAPIToKube(v testkube.TestWorkflowStepExecuteTestW Matrix: MapDynamicListMapAPIToKube(v.Matrix), Shards: MapDynamicListMapAPIToKube(v.Shards), }, + Selector: common.MapPtr(v.Selector, MapSelectorToCRD), } } From 5b0f92d4539c6e7724819b1be7ff5f10c03255b4 Mon Sep 17 00:00:00 2001 From: Vladislav Sukhin Date: Mon, 11 Nov 2024 17:18:49 +0300 Subject: [PATCH 05/20] feat: execute method with selector Signed-off-by: Vladislav Sukhin --- .../testworkflow-toolkit/commands/execute.go | 68 +++++++++++++------ 1 file changed, 46 insertions(+), 22 deletions(-) diff --git a/cmd/tcl/testworkflow-toolkit/commands/execute.go b/cmd/tcl/testworkflow-toolkit/commands/execute.go index 2079b69e20..0a1ecfeb16 100644 --- a/cmd/tcl/testworkflow-toolkit/commands/execute.go +++ b/cmd/tcl/testworkflow-toolkit/commands/execute.go @@ -18,6 +18,7 @@ import ( "github.com/pkg/errors" "github.com/spf13/cobra" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" testworkflowsv1 "github.com/kubeshop/testkube-operator/api/testworkflows/v1" commontcl "github.com/kubeshop/testkube/cmd/tcl/testworkflow-toolkit/common" @@ -368,6 +369,8 @@ func NewExecuteCmd() *cobra.Command { operations = append(operations, fn) } } + + c := env.Testkube() for _, s := range workflows { var w testworkflowsv1.StepExecuteWorkflow err := json.Unmarshal([]byte(s), &w) @@ -375,35 +378,56 @@ func NewExecuteCmd() *cobra.Command { ui.Fail(errors.Wrap(err, "unmarshal workflow definition")) } - // Resolve the params - params, err := commontcl.GetParamsSpec(w.Matrix, w.Shards, w.Count, w.MaxCount, baseMachine) - if err != nil { - ui.Fail(errors.Wrap(err, "matrix and sharding")) - } - fmt.Printf("%s: %s\n", commontcl.ServiceLabel(w.Name), params.Humanize()) - - // Create operations for each expected execution - for i := int64(0); i < params.Count; i++ { - // Clone the spec - spec := w.DeepCopy() - - // Build files for transfer - tarballMachine, err := registerTransfer(transferSrv, spec.Tarball, baseMachine, params.MachineAt(i)) + testWorkflowNames := []string{w.Name} + if w.Selector != nil { + selector, err := metav1.LabelSelectorAsSelector(w.Selector) if err != nil { - ui.Fail(errors.Wrapf(err, "'%s' workflow", spec.Name)) + ui.Fail(errors.Wrap(err, "error creating selector from test workflow selector")) } - spec.Tarball = nil - // Prepare the operation to run - err = expressions.Finalize(&spec, baseMachine, tarballMachine, params.MachineAt(i)) + stringifiedSelector := selector.String() + testWorkflowsList, err := c.ListTestWorkflows(stringifiedSelector) if err != nil { - ui.Fail(errors.Wrapf(err, "'%s' workflow: computing execution", spec.Name)) + ui.Fail(errors.Wrap(err, "error listing test workflows using selector")) + } + + for _, item := range testWorkflowsList { + testWorkflowNames = append(testWorkflowNames, item.Name) } - fn, err := buildWorkflowExecution(*spec, async) + } + + for _, testWorkflowName := range testWorkflowNames { + // Resolve the params + params, err := commontcl.GetParamsSpec(w.Matrix, w.Shards, w.Count, w.MaxCount, baseMachine) if err != nil { - ui.Fail(err) + ui.Fail(errors.Wrap(err, "matrix and sharding")) + } + fmt.Printf("%s: %s\n", commontcl.ServiceLabel(testWorkflowName), params.Humanize()) + + // Create operations for each expected execution + for i := int64(0); i < params.Count; i++ { + // Clone the spec + spec := w.DeepCopy() + spec.Name = testWorkflowName + + // Build files for transfer + tarballMachine, err := registerTransfer(transferSrv, spec.Tarball, baseMachine, params.MachineAt(i)) + if err != nil { + ui.Fail(errors.Wrapf(err, "'%s' workflow", spec.Name)) + } + spec.Tarball = nil + + // Prepare the operation to run + err = expressions.Finalize(&spec, baseMachine, tarballMachine, params.MachineAt(i)) + if err != nil { + ui.Fail(errors.Wrapf(err, "'%s' workflow: computing execution", spec.Name)) + } + fn, err := buildWorkflowExecution(*spec, async) + if err != nil { + ui.Fail(err) + } + operations = append(operations, fn) } - operations = append(operations, fn) } } From 1414b760c576c8a990d8e38b50e0ba7ebe85b154 Mon Sep 17 00:00:00 2001 From: Vladislav Sukhin Date: Mon, 11 Nov 2024 17:21:46 +0300 Subject: [PATCH 06/20] fix: check if name is not empty Signed-off-by: Vladislav Sukhin --- cmd/tcl/testworkflow-toolkit/commands/execute.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/cmd/tcl/testworkflow-toolkit/commands/execute.go b/cmd/tcl/testworkflow-toolkit/commands/execute.go index 0a1ecfeb16..1efa685125 100644 --- a/cmd/tcl/testworkflow-toolkit/commands/execute.go +++ b/cmd/tcl/testworkflow-toolkit/commands/execute.go @@ -378,7 +378,11 @@ func NewExecuteCmd() *cobra.Command { ui.Fail(errors.Wrap(err, "unmarshal workflow definition")) } - testWorkflowNames := []string{w.Name} + var testWorkflowNames []string + if w.Name != "" { + testWorkflowNames = []string{w.Name} + } + if w.Selector != nil { selector, err := metav1.LabelSelectorAsSelector(w.Selector) if err != nil { From 779100c6a8ef5148993f1ef91c523af851309fd6 Mon Sep 17 00:00:00 2001 From: Vladislav Sukhin Date: Mon, 11 Nov 2024 19:32:32 +0300 Subject: [PATCH 07/20] fix: exit on error code Signed-off-by: Vladislav Sukhin --- cmd/kubectl-testkube/commands/testworkflows/run.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/cmd/kubectl-testkube/commands/testworkflows/run.go b/cmd/kubectl-testkube/commands/testworkflows/run.go index 12cc026901..f9d58be000 100644 --- a/cmd/kubectl-testkube/commands/testworkflows/run.go +++ b/cmd/kubectl-testkube/commands/testworkflows/run.go @@ -164,7 +164,9 @@ func NewRunTestWorkflowCmd() *cobra.Command { uiShellGetExecution(execution.Id) } - os.Exit(exitCode) + if exitCode != 0 { + os.Exit(exitCode) + } } }, } From 9e029d8e16000335e67bf0bc2e49c5f020d157c2 Mon Sep 17 00:00:00 2001 From: Vladislav Sukhin Date: Tue, 12 Nov 2024 14:45:26 +0300 Subject: [PATCH 08/20] fix: update help Signed-off-by: Vladislav Sukhin --- cmd/kubectl-testkube/commands/testworkflows/run.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/kubectl-testkube/commands/testworkflows/run.go b/cmd/kubectl-testkube/commands/testworkflows/run.go index f9d58be000..e7ad7b9c0d 100644 --- a/cmd/kubectl-testkube/commands/testworkflows/run.go +++ b/cmd/kubectl-testkube/commands/testworkflows/run.go @@ -181,7 +181,7 @@ func NewRunTestWorkflowCmd() *cobra.Command { cmd.Flags().StringVar(&format, "format", "folder", "data format for storing files, one of folder|archive") cmd.Flags().StringArrayVarP(&masks, "mask", "", []string{}, "regexp to filter downloaded files, single or comma separated, like report/.* or .*\\.json,.*\\.js$") cmd.Flags().StringToStringVarP(&tags, "tag", "", map[string]string{}, "execution tags in a form of name1=val1 passed to executor") - cmd.Flags().StringSliceVarP(&selectors, "label", "l", nil, "label key value pair: --label key1=value1") + cmd.Flags().StringSliceVarP(&selectors, "label", "l", nil, "label key value pair: --label key1=value1 or label expression") cmd.Flags().IntVar(¶llelism, "parallelism", 10, "parallelism for multiple test workflow execution") return cmd From bfb82dcc9a3ea2ad9e343b3bfb852c593975ca5d Mon Sep 17 00:00:00 2001 From: Vladislav Sukhin Date: Tue, 12 Nov 2024 16:01:34 +0300 Subject: [PATCH 09/20] fix: switch to error merhods Signed-off-by: Vladislav Sukhin --- internal/app/api/v1/testworkflows.go | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/internal/app/api/v1/testworkflows.go b/internal/app/api/v1/testworkflows.go index 682c033b72..6a8ca6db3d 100644 --- a/internal/app/api/v1/testworkflows.go +++ b/internal/app/api/v1/testworkflows.go @@ -9,7 +9,6 @@ import ( "github.com/gofiber/fiber/v2" "github.com/pkg/errors" - k8serrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" testworkflowsv1 "github.com/kubeshop/testkube-operator/api/testworkflows/v1" @@ -347,6 +346,7 @@ func (s *TestkubeAPI) PreviewTestWorkflowHandler() fiber.Handler { // TODO: Add metrics func (s *TestkubeAPI) ExecuteTestWorkflowHandler() fiber.Handler { return func(c *fiber.Ctx) (err error) { + ctx := c.Context() name := c.Params("id") selector := c.Query("selector") s.Log.Debugw("getting test workflow", "name", name, "selector", selector) @@ -358,17 +358,14 @@ func (s *TestkubeAPI) ExecuteTestWorkflowHandler() fiber.Handler { errPrefix = errPrefix + " " + name testWorkflow, err := s.TestWorkflowsClient.Get(name) if err != nil { - if k8serrors.IsNotFound(err) { - return s.Warn(c, http.StatusNotFound, fmt.Errorf("%s: test workflow not found: %w", errPrefix, err)) - } - - return s.Error(c, http.StatusBadGateway, fmt.Errorf("%s: client could get test workflow: %w", errPrefix, err)) + return s.ClientError(c, errPrefix, err) } + testWorkflows = append(testWorkflows, *testWorkflow) } else { testWorkflowList, err := s.TestWorkflowsClient.List(selector) if err != nil { - return s.Error(c, http.StatusBadGateway, fmt.Errorf("%s: can't list test workflows: %w", errPrefix, err)) + return s.ClientError(c, errPrefix, err) } testWorkflows = append(testWorkflows, testWorkflowList.Items...) @@ -395,13 +392,13 @@ func (s *TestkubeAPI) ExecuteTestWorkflowHandler() fiber.Handler { request.TestWorkflowExecutionName = strings.Clone(c.Query("testWorkflowExecutionName")) parallelism, err := strconv.Atoi(c.Query("parallelism", strconv.Itoa(scheduler.DefaultConcurrencyLevel))) if err != nil { - return s.Error(c, http.StatusBadRequest, fmt.Errorf("%s: can't detect parallelism: %w", errPrefix, err)) + return s.BadRequest(c, errPrefix, "can't detect parallelism:", err) } workerpoolService := workerpool.New[testworkflowsv1.TestWorkflow, testkube.TestWorkflowExecutionRequest, testkube.TestWorkflowExecution](parallelism) go workerpoolService.SendRequests(s.prepareTestWorkflowRequests(testWorkflows, request)) - go workerpoolService.Run(c.Context()) + go workerpoolService.Run(ctx) for r := range workerpoolService.GetResponses() { results = append(results, r.Result) From f33c5c98ce81b5cb9dabab138f372d515a5a94f5 Mon Sep 17 00:00:00 2001 From: Vladislav Sukhin Date: Tue, 12 Nov 2024 16:11:07 +0300 Subject: [PATCH 10/20] fix: error text Signed-off-by: Vladislav Sukhin --- internal/app/api/v1/testworkflows.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/app/api/v1/testworkflows.go b/internal/app/api/v1/testworkflows.go index 6a8ca6db3d..971577ada1 100644 --- a/internal/app/api/v1/testworkflows.go +++ b/internal/app/api/v1/testworkflows.go @@ -392,7 +392,7 @@ func (s *TestkubeAPI) ExecuteTestWorkflowHandler() fiber.Handler { request.TestWorkflowExecutionName = strings.Clone(c.Query("testWorkflowExecutionName")) parallelism, err := strconv.Atoi(c.Query("parallelism", strconv.Itoa(scheduler.DefaultConcurrencyLevel))) if err != nil { - return s.BadRequest(c, errPrefix, "can't detect parallelism:", err) + return s.BadRequest(c, errPrefix, "can't detect parallelism", err) } workerpoolService := workerpool.New[testworkflowsv1.TestWorkflow, testkube.TestWorkflowExecutionRequest, testkube.TestWorkflowExecution](parallelism) From 2a5a728feced7d0fbc540f72824507db435527c0 Mon Sep 17 00:00:00 2001 From: Vladislav Sukhin Date: Tue, 12 Nov 2024 16:21:18 +0300 Subject: [PATCH 11/20] fix: join errors Signed-off-by: Vladislav Sukhin --- internal/app/api/v1/testworkflows.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/app/api/v1/testworkflows.go b/internal/app/api/v1/testworkflows.go index 971577ada1..a40c2f7ed7 100644 --- a/internal/app/api/v1/testworkflows.go +++ b/internal/app/api/v1/testworkflows.go @@ -2,13 +2,13 @@ package v1 import ( "context" + "errors" "fmt" "net/http" "strconv" "strings" "github.com/gofiber/fiber/v2" - "github.com/pkg/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" testworkflowsv1 "github.com/kubeshop/testkube-operator/api/testworkflows/v1" @@ -409,7 +409,7 @@ func (s *TestkubeAPI) ExecuteTestWorkflowHandler() fiber.Handler { } if len(errs) != 0 { - return s.InternalError(c, errPrefix, "execution error", errs[0]) + return s.InternalError(c, errPrefix, "execution error", errors.Join(errs...)) } s.Log.Debugw("executing test workflow", "name", name, "selector", selector) From a1f704573f5945f259ad40726ae2d4cb7cc605e6 Mon Sep 17 00:00:00 2001 From: Vladislav Sukhin Date: Tue, 12 Nov 2024 17:19:40 +0300 Subject: [PATCH 12/20] fix: use existing naming in cli Signed-off-by: Vladislav Sukhin --- cmd/kubectl-testkube/commands/testworkflows/run.go | 6 +++--- internal/app/api/v1/testworkflows.go | 6 +++--- pkg/api/v1/client/interface.go | 2 +- pkg/api/v1/client/testworkflow.go | 4 ++-- 4 files changed, 9 insertions(+), 9 deletions(-) diff --git a/cmd/kubectl-testkube/commands/testworkflows/run.go b/cmd/kubectl-testkube/commands/testworkflows/run.go index e7ad7b9c0d..e9e5421360 100644 --- a/cmd/kubectl-testkube/commands/testworkflows/run.go +++ b/cmd/kubectl-testkube/commands/testworkflows/run.go @@ -47,7 +47,7 @@ func NewRunTestWorkflowCmd() *cobra.Command { masks []string tags map[string]string selectors []string - parallelism int + concurrencyLevel int ) cmd := &cobra.Command{ @@ -102,7 +102,7 @@ func NewRunTestWorkflowCmd() *cobra.Command { executions = append(executions, execution) case len(selectors) != 0: selector := strings.Join(selectors, ",") - executions, err = client.ExecuteTestWorkflows(selector, parallelism, request) + executions, err = client.ExecuteTestWorkflows(selector, concurrencyLevel, request) default: ui.Failf("Pass Test workflow name or labels to run by labels ") } @@ -182,7 +182,7 @@ func NewRunTestWorkflowCmd() *cobra.Command { cmd.Flags().StringArrayVarP(&masks, "mask", "", []string{}, "regexp to filter downloaded files, single or comma separated, like report/.* or .*\\.json,.*\\.js$") cmd.Flags().StringToStringVarP(&tags, "tag", "", map[string]string{}, "execution tags in a form of name1=val1 passed to executor") cmd.Flags().StringSliceVarP(&selectors, "label", "l", nil, "label key value pair: --label key1=value1 or label expression") - cmd.Flags().IntVar(¶llelism, "parallelism", 10, "parallelism for multiple test workflow execution") + cmd.Flags().IntVar(&concurrencyLevel, "concurrency", 10, "concurrency level for multiple test workflow execution") return cmd } diff --git a/internal/app/api/v1/testworkflows.go b/internal/app/api/v1/testworkflows.go index a40c2f7ed7..2a89b3457b 100644 --- a/internal/app/api/v1/testworkflows.go +++ b/internal/app/api/v1/testworkflows.go @@ -390,12 +390,12 @@ func (s *TestkubeAPI) ExecuteTestWorkflowHandler() fiber.Handler { if len(testWorkflows) != 0 { request.TestWorkflowExecutionName = strings.Clone(c.Query("testWorkflowExecutionName")) - parallelism, err := strconv.Atoi(c.Query("parallelism", strconv.Itoa(scheduler.DefaultConcurrencyLevel))) + concurrencyLevel, err := strconv.Atoi(c.Query("concurrency", strconv.Itoa(scheduler.DefaultConcurrencyLevel))) if err != nil { - return s.BadRequest(c, errPrefix, "can't detect parallelism", err) + return s.BadRequest(c, errPrefix, "can't detect concurrency level", err) } - workerpoolService := workerpool.New[testworkflowsv1.TestWorkflow, testkube.TestWorkflowExecutionRequest, testkube.TestWorkflowExecution](parallelism) + workerpoolService := workerpool.New[testworkflowsv1.TestWorkflow, testkube.TestWorkflowExecutionRequest, testkube.TestWorkflowExecution](concurrencyLevel) go workerpoolService.SendRequests(s.prepareTestWorkflowRequests(testWorkflows, request)) go workerpoolService.Run(ctx) diff --git a/pkg/api/v1/client/interface.go b/pkg/api/v1/client/interface.go index 491bbc565e..c4fb9abe4d 100644 --- a/pkg/api/v1/client/interface.go +++ b/pkg/api/v1/client/interface.go @@ -151,7 +151,7 @@ type TestWorkflowAPI interface { UpdateTestWorkflow(workflow testkube.TestWorkflow) (testkube.TestWorkflow, error) DeleteTestWorkflow(name string) error ExecuteTestWorkflow(name string, request testkube.TestWorkflowExecutionRequest) (testkube.TestWorkflowExecution, error) - ExecuteTestWorkflows(sselector string, parallelism int, request testkube.TestWorkflowExecutionRequest) ([]testkube.TestWorkflowExecution, error) + ExecuteTestWorkflows(sselector string, concurrencyLevel int, request testkube.TestWorkflowExecutionRequest) ([]testkube.TestWorkflowExecution, error) GetTestWorkflowExecutionNotifications(id string) (chan testkube.TestWorkflowExecutionNotification, error) GetTestWorkflowExecutionLogs(id string) ([]byte, error) } diff --git a/pkg/api/v1/client/testworkflow.go b/pkg/api/v1/client/testworkflow.go index b2f8980656..313f8de174 100644 --- a/pkg/api/v1/client/testworkflow.go +++ b/pkg/api/v1/client/testworkflow.go @@ -124,7 +124,7 @@ func (c TestWorkflowClient) ExecuteTestWorkflow(name string, request testkube.Te // ExecuteTestWorkflows starts new external test workflow executions, reads data and returns IDs // Executions are started asynchronously client can check later for results -func (c TestWorkflowClient) ExecuteTestWorkflows(selector string, parallelism int, request testkube.TestWorkflowExecutionRequest) (executions []testkube.TestWorkflowExecution, err error) { +func (c TestWorkflowClient) ExecuteTestWorkflows(selector string, concurrencyLevel int, request testkube.TestWorkflowExecutionRequest) (executions []testkube.TestWorkflowExecution, err error) { uri := c.testWorkflowExecutionTransport.GetURI("/test-workflow-executions") body, err := json.Marshal(request) @@ -134,7 +134,7 @@ func (c TestWorkflowClient) ExecuteTestWorkflows(selector string, parallelism in params := map[string]string{ "selector": selector, - "parallelism": strconv.Itoa(parallelism), + "concurrency": strconv.Itoa(concurrencyLevel), } return c.testWorkflowExecutionTransport.ExecuteMultiple(http.MethodPost, uri, body, params) From 970d017d8bda93ed7a14be63eb8bd861df7a805b Mon Sep 17 00:00:00 2001 From: Vladislav Sukhin Date: Tue, 12 Nov 2024 17:31:37 +0300 Subject: [PATCH 13/20] fix: move common part Signed-off-by: Vladislav Sukhin --- cmd/tcl/testworkflow-toolkit/commands/execute.go | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/cmd/tcl/testworkflow-toolkit/commands/execute.go b/cmd/tcl/testworkflow-toolkit/commands/execute.go index 1efa685125..fa691b9a6e 100644 --- a/cmd/tcl/testworkflow-toolkit/commands/execute.go +++ b/cmd/tcl/testworkflow-toolkit/commands/execute.go @@ -400,12 +400,13 @@ func NewExecuteCmd() *cobra.Command { } } + // Resolve the params + params, err := commontcl.GetParamsSpec(w.Matrix, w.Shards, w.Count, w.MaxCount, baseMachine) + if err != nil { + ui.Fail(errors.Wrap(err, "matrix and sharding")) + } + for _, testWorkflowName := range testWorkflowNames { - // Resolve the params - params, err := commontcl.GetParamsSpec(w.Matrix, w.Shards, w.Count, w.MaxCount, baseMachine) - if err != nil { - ui.Fail(errors.Wrap(err, "matrix and sharding")) - } fmt.Printf("%s: %s\n", commontcl.ServiceLabel(testWorkflowName), params.Humanize()) // Create operations for each expected execution From 238c55bc6e7e72ec83775bbc3b9da859d1f2335e Mon Sep 17 00:00:00 2001 From: Vladislav Sukhin Date: Wed, 13 Nov 2024 14:59:23 +0300 Subject: [PATCH 14/20] fix: help Signed-off-by: Vladislav Sukhin --- cmd/kubectl-testkube/commands/testworkflows/run.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/kubectl-testkube/commands/testworkflows/run.go b/cmd/kubectl-testkube/commands/testworkflows/run.go index e9e5421360..8961af63f6 100644 --- a/cmd/kubectl-testkube/commands/testworkflows/run.go +++ b/cmd/kubectl-testkube/commands/testworkflows/run.go @@ -182,7 +182,7 @@ func NewRunTestWorkflowCmd() *cobra.Command { cmd.Flags().StringArrayVarP(&masks, "mask", "", []string{}, "regexp to filter downloaded files, single or comma separated, like report/.* or .*\\.json,.*\\.js$") cmd.Flags().StringToStringVarP(&tags, "tag", "", map[string]string{}, "execution tags in a form of name1=val1 passed to executor") cmd.Flags().StringSliceVarP(&selectors, "label", "l", nil, "label key value pair: --label key1=value1 or label expression") - cmd.Flags().IntVar(&concurrencyLevel, "concurrency", 10, "concurrency level for multiple test workflow execution") + cmd.Flags().IntVar(&concurrencyLevel, "concurrency", 10, "concurrency level for multiple test workflow executions") return cmd } From 235680a3eca59c6a18b33a9cb930a9933c68542e Mon Sep 17 00:00:00 2001 From: Vladislav Sukhin Date: Wed, 13 Nov 2024 20:44:21 +0300 Subject: [PATCH 15/20] feat: add execute test workflows api method spec Signed-off-by: Vladislav Sukhin --- api/v1/testkube.yaml | 51 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 51 insertions(+) diff --git a/api/v1/testkube.yaml b/api/v1/testkube.yaml index f462bdaa0b..58d8d7ee55 100644 --- a/api/v1/testkube.yaml +++ b/api/v1/testkube.yaml @@ -4065,6 +4065,57 @@ paths: items: $ref: "#/components/schemas/Problem" /test-workflow-executions: + post: + parameters: + - $ref: "#/components/parameters/Selector" + - $ref: "#/components/parameters/ConcurrencyLevel" + tags: + - api + - test-workflows + - pro + summary: "Execute test workflows" + description: "Execute test workflows in the kubernetes cluster" + operationId: executeTestWorkflows + requestBody: + description: test workflow execution request + required: true + content: + application/json: + schema: + $ref: "#/components/schemas/TestWorkflowExecutionRequest" + responses: + 200: + description: successful execution + content: + application/json: + schema: + type: array + items: + $ref: "#/components/schemas/TestWorkflowExecution" + 400: + description: "problem with body parsing - probably some bad input occurs" + content: + application/problem+json: + schema: + type: array + items: + $ref: "#/components/schemas/Problem" + 402: + description: "missing Pro subscription for a commercial feature" + content: + application/problem+json: + schema: + type: array + items: + $ref: "#/components/schemas/Problem" + 502: + description: problem communicating with kubernetes cluster + content: + application/problem+json: + schema: + type: array + items: + $ref: "#/components/schemas/Problem" get: tags: - test-workflows From 67ecf66169bab84687b95571bdca9228e0400738 Mon Sep 17 00:00:00 2001 From: Vladislav Sukhin Date: Thu, 14 Nov 2024 16:24:20 +0300 Subject: [PATCH 16/20] Update pkg/api/v1/client/interface.go Co-authored-by: Dawid Rusnak --- pkg/api/v1/client/interface.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/api/v1/client/interface.go b/pkg/api/v1/client/interface.go index c4fb9abe4d..df5c671390 100644 --- a/pkg/api/v1/client/interface.go +++ b/pkg/api/v1/client/interface.go @@ -151,7 +151,7 @@ type TestWorkflowAPI interface { UpdateTestWorkflow(workflow testkube.TestWorkflow) (testkube.TestWorkflow, error) DeleteTestWorkflow(name string) error ExecuteTestWorkflow(name string, request testkube.TestWorkflowExecutionRequest) (testkube.TestWorkflowExecution, error) - ExecuteTestWorkflows(sselector string, concurrencyLevel int, request testkube.TestWorkflowExecutionRequest) ([]testkube.TestWorkflowExecution, error) + ExecuteTestWorkflows(selector string, concurrencyLevel int, request testkube.TestWorkflowExecutionRequest) ([]testkube.TestWorkflowExecution, error) GetTestWorkflowExecutionNotifications(id string) (chan testkube.TestWorkflowExecutionNotification, error) GetTestWorkflowExecutionLogs(id string) ([]byte, error) } From fb46a3a4d97f7d89320b0cee4300b518f50237b1 Mon Sep 17 00:00:00 2001 From: Vladislav Sukhin Date: Thu, 14 Nov 2024 19:33:37 +0300 Subject: [PATCH 17/20] fix: remove concurrency level Signed-off-by: Vladislav Sukhin --- cmd/kubectl-testkube/commands/testworkflows/run.go | 4 +--- internal/app/api/v1/testworkflows.go | 7 +------ pkg/api/v1/client/interface.go | 2 +- pkg/api/v1/client/testworkflow.go | 6 ++---- 4 files changed, 5 insertions(+), 14 deletions(-) diff --git a/cmd/kubectl-testkube/commands/testworkflows/run.go b/cmd/kubectl-testkube/commands/testworkflows/run.go index 8961af63f6..5c96c690d3 100644 --- a/cmd/kubectl-testkube/commands/testworkflows/run.go +++ b/cmd/kubectl-testkube/commands/testworkflows/run.go @@ -47,7 +47,6 @@ func NewRunTestWorkflowCmd() *cobra.Command { masks []string tags map[string]string selectors []string - concurrencyLevel int ) cmd := &cobra.Command{ @@ -102,7 +101,7 @@ func NewRunTestWorkflowCmd() *cobra.Command { executions = append(executions, execution) case len(selectors) != 0: selector := strings.Join(selectors, ",") - executions, err = client.ExecuteTestWorkflows(selector, concurrencyLevel, request) + executions, err = client.ExecuteTestWorkflows(selector, request) default: ui.Failf("Pass Test workflow name or labels to run by labels ") } @@ -182,7 +181,6 @@ func NewRunTestWorkflowCmd() *cobra.Command { cmd.Flags().StringArrayVarP(&masks, "mask", "", []string{}, "regexp to filter downloaded files, single or comma separated, like report/.* or .*\\.json,.*\\.js$") cmd.Flags().StringToStringVarP(&tags, "tag", "", map[string]string{}, "execution tags in a form of name1=val1 passed to executor") cmd.Flags().StringSliceVarP(&selectors, "label", "l", nil, "label key value pair: --label key1=value1 or label expression") - cmd.Flags().IntVar(&concurrencyLevel, "concurrency", 10, "concurrency level for multiple test workflow executions") return cmd } diff --git a/internal/app/api/v1/testworkflows.go b/internal/app/api/v1/testworkflows.go index 2a89b3457b..4a741e24b3 100644 --- a/internal/app/api/v1/testworkflows.go +++ b/internal/app/api/v1/testworkflows.go @@ -390,12 +390,7 @@ func (s *TestkubeAPI) ExecuteTestWorkflowHandler() fiber.Handler { if len(testWorkflows) != 0 { request.TestWorkflowExecutionName = strings.Clone(c.Query("testWorkflowExecutionName")) - concurrencyLevel, err := strconv.Atoi(c.Query("concurrency", strconv.Itoa(scheduler.DefaultConcurrencyLevel))) - if err != nil { - return s.BadRequest(c, errPrefix, "can't detect concurrency level", err) - } - - workerpoolService := workerpool.New[testworkflowsv1.TestWorkflow, testkube.TestWorkflowExecutionRequest, testkube.TestWorkflowExecution](concurrencyLevel) + workerpoolService := workerpool.New[testworkflowsv1.TestWorkflow, testkube.TestWorkflowExecutionRequest, testkube.TestWorkflowExecution](scheduler.DefaultConcurrencyLevel) go workerpoolService.SendRequests(s.prepareTestWorkflowRequests(testWorkflows, request)) go workerpoolService.Run(ctx) diff --git a/pkg/api/v1/client/interface.go b/pkg/api/v1/client/interface.go index df5c671390..82f1357377 100644 --- a/pkg/api/v1/client/interface.go +++ b/pkg/api/v1/client/interface.go @@ -151,7 +151,7 @@ type TestWorkflowAPI interface { UpdateTestWorkflow(workflow testkube.TestWorkflow) (testkube.TestWorkflow, error) DeleteTestWorkflow(name string) error ExecuteTestWorkflow(name string, request testkube.TestWorkflowExecutionRequest) (testkube.TestWorkflowExecution, error) - ExecuteTestWorkflows(selector string, concurrencyLevel int, request testkube.TestWorkflowExecutionRequest) ([]testkube.TestWorkflowExecution, error) + ExecuteTestWorkflows(selector string, request testkube.TestWorkflowExecutionRequest) ([]testkube.TestWorkflowExecution, error) GetTestWorkflowExecutionNotifications(id string) (chan testkube.TestWorkflowExecutionNotification, error) GetTestWorkflowExecutionLogs(id string) ([]byte, error) } diff --git a/pkg/api/v1/client/testworkflow.go b/pkg/api/v1/client/testworkflow.go index 313f8de174..d878f93565 100644 --- a/pkg/api/v1/client/testworkflow.go +++ b/pkg/api/v1/client/testworkflow.go @@ -5,7 +5,6 @@ import ( "fmt" "net/http" "net/url" - "strconv" "github.com/kubeshop/testkube/pkg/api/v1/testkube" ) @@ -124,7 +123,7 @@ func (c TestWorkflowClient) ExecuteTestWorkflow(name string, request testkube.Te // ExecuteTestWorkflows starts new external test workflow executions, reads data and returns IDs // Executions are started asynchronously client can check later for results -func (c TestWorkflowClient) ExecuteTestWorkflows(selector string, concurrencyLevel int, request testkube.TestWorkflowExecutionRequest) (executions []testkube.TestWorkflowExecution, err error) { +func (c TestWorkflowClient) ExecuteTestWorkflows(selector string, request testkube.TestWorkflowExecutionRequest) (executions []testkube.TestWorkflowExecution, err error) { uri := c.testWorkflowExecutionTransport.GetURI("/test-workflow-executions") body, err := json.Marshal(request) @@ -133,8 +132,7 @@ func (c TestWorkflowClient) ExecuteTestWorkflows(selector string, concurrencyLev } params := map[string]string{ - "selector": selector, - "concurrency": strconv.Itoa(concurrencyLevel), + "selector": selector, } return c.testWorkflowExecutionTransport.ExecuteMultiple(http.MethodPost, uri, body, params) From e69a2f94a8e641eba8d4a399c8a63442c08ccd7b Mon Sep 17 00:00:00 2001 From: Vladislav Sukhin Date: Thu, 14 Nov 2024 19:59:02 +0300 Subject: [PATCH 18/20] fix: use Label Selector Signed-off-by: Vladislav Sukhin --- api/v1/testkube.yaml | 2 +- .../model_test_workflow_step_execute_test_workflow_ref.go | 4 ++-- pkg/mapper/testworkflows/kube_openapi.go | 8 ++++---- pkg/mapper/testworkflows/openapi_kube.go | 4 ++-- 4 files changed, 9 insertions(+), 9 deletions(-) diff --git a/api/v1/testkube.yaml b/api/v1/testkube.yaml index 58d8d7ee55..26fafe0525 100644 --- a/api/v1/testkube.yaml +++ b/api/v1/testkube.yaml @@ -9042,7 +9042,7 @@ components: config: $ref: "#/components/schemas/TestWorkflowConfigValue" selector: - $ref: "https://raw.githubusercontent.com/garethr/kubernetes-json-schema/master/v1.7.8/_definitions.json#/definitions/io.k8s.apimachinery.pkg.apis.meta.v1.LabelSelector" + $ref: "#/components/schemas/LabelSelector" description: label selector for test workflow TestWorkflowStepExecuteTestRef: diff --git a/pkg/api/v1/testkube/model_test_workflow_step_execute_test_workflow_ref.go b/pkg/api/v1/testkube/model_test_workflow_step_execute_test_workflow_ref.go index e0c8f3156e..be1e4454f7 100644 --- a/pkg/api/v1/testkube/model_test_workflow_step_execute_test_workflow_ref.go +++ b/pkg/api/v1/testkube/model_test_workflow_step_execute_test_workflow_ref.go @@ -23,6 +23,6 @@ type TestWorkflowStepExecuteTestWorkflowRef struct { // matrix of parameters to spawn instances Matrix map[string]interface{} `json:"matrix,omitempty"` // parameters that should be distributed across sharded instances - Shards map[string]interface{} `json:"shards,omitempty"` - Selector *IoK8sApimachineryPkgApisMetaV1LabelSelector `json:"selector,omitempty"` + Shards map[string]interface{} `json:"shards,omitempty"` + Selector *LabelSelector `json:"selector,omitempty"` } diff --git a/pkg/mapper/testworkflows/kube_openapi.go b/pkg/mapper/testworkflows/kube_openapi.go index 5785762402..9076451da8 100644 --- a/pkg/mapper/testworkflows/kube_openapi.go +++ b/pkg/mapper/testworkflows/kube_openapi.go @@ -806,16 +806,16 @@ func MapStepExecuteTestKubeToAPI(v testworkflowsv1.StepExecuteTest) testkube.Tes } } -func MapLabelSelectorRequirementToAPI(v metav1.LabelSelectorRequirement) testkube.IoK8sApimachineryPkgApisMetaV1LabelSelectorRequirement { - return testkube.IoK8sApimachineryPkgApisMetaV1LabelSelectorRequirement{ +func MapLabelSelectorRequirementToAPI(v metav1.LabelSelectorRequirement) testkube.LabelSelectorRequirement { + return testkube.LabelSelectorRequirement{ Key: v.Key, Operator: string(v.Operator), Values: v.Values, } } -func MapSelectorToAPI(v metav1.LabelSelector) testkube.IoK8sApimachineryPkgApisMetaV1LabelSelector { - return testkube.IoK8sApimachineryPkgApisMetaV1LabelSelector{ +func MapSelectorToAPI(v metav1.LabelSelector) testkube.LabelSelector { + return testkube.LabelSelector{ MatchLabels: v.MatchLabels, MatchExpressions: common.MapSlice(v.MatchExpressions, MapLabelSelectorRequirementToAPI), } diff --git a/pkg/mapper/testworkflows/openapi_kube.go b/pkg/mapper/testworkflows/openapi_kube.go index 79b1b97d8f..86b9bf7ede 100644 --- a/pkg/mapper/testworkflows/openapi_kube.go +++ b/pkg/mapper/testworkflows/openapi_kube.go @@ -846,7 +846,7 @@ func MapStepExecuteTestAPIToKube(v testkube.TestWorkflowStepExecuteTestRef) test } } -func MapLabelSelectorRequirementToCRD(v testkube.IoK8sApimachineryPkgApisMetaV1LabelSelectorRequirement) metav1.LabelSelectorRequirement { +func MapLabelSelectorRequirementToCRD(v testkube.LabelSelectorRequirement) metav1.LabelSelectorRequirement { return metav1.LabelSelectorRequirement{ Key: v.Key, Operator: metav1.LabelSelectorOperator(v.Operator), @@ -854,7 +854,7 @@ func MapLabelSelectorRequirementToCRD(v testkube.IoK8sApimachineryPkgApisMetaV1L } } -func MapSelectorToCRD(v testkube.IoK8sApimachineryPkgApisMetaV1LabelSelector) metav1.LabelSelector { +func MapSelectorToCRD(v testkube.LabelSelector) metav1.LabelSelector { return metav1.LabelSelector{ MatchLabels: v.MatchLabels, MatchExpressions: common.MapSlice(v.MatchExpressions, MapLabelSelectorRequirementToCRD), From e907ab821265910b203c8cbf0eece7a31300cda5 Mon Sep 17 00:00:00 2001 From: Vladislav Sukhin Date: Thu, 14 Nov 2024 20:05:15 +0300 Subject: [PATCH 19/20] fix: check for empty name and selector Signed-off-by: Vladislav Sukhin --- cmd/tcl/testworkflow-toolkit/commands/execute.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/cmd/tcl/testworkflow-toolkit/commands/execute.go b/cmd/tcl/testworkflow-toolkit/commands/execute.go index fa691b9a6e..ce47f57f83 100644 --- a/cmd/tcl/testworkflow-toolkit/commands/execute.go +++ b/cmd/tcl/testworkflow-toolkit/commands/execute.go @@ -378,6 +378,10 @@ func NewExecuteCmd() *cobra.Command { ui.Fail(errors.Wrap(err, "unmarshal workflow definition")) } + if w.Name == "" && w.Selector == nil { + ui.Fail(errors.New("either workflow name or selector should be specified")) + } + var testWorkflowNames []string if w.Name != "" { testWorkflowNames = []string{w.Name} From 6d2e076ca8093a16f353be1b965b1fd7292ab7e5 Mon Sep 17 00:00:00 2001 From: Vladislav Sukhin Date: Mon, 18 Nov 2024 17:21:08 +0300 Subject: [PATCH 20/20] fix: dep update Signed-off-by: Vladislav Sukhin --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index a09c966418..2d1b1bf494 100644 --- a/go.mod +++ b/go.mod @@ -40,7 +40,7 @@ require ( github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 github.com/kelseyhightower/envconfig v1.4.0 github.com/kubepug/kubepug v1.7.1 - github.com/kubeshop/testkube-operator v1.17.55-0.20241111112201-61ba79ec4899 + github.com/kubeshop/testkube-operator v1.17.55-0.20241118133003-70462ac10f4a github.com/minio/minio-go/v7 v7.0.47 github.com/montanaflynn/stats v0.6.6 github.com/moogar0880/problems v0.1.1 diff --git a/go.sum b/go.sum index 67c5a685e1..a16cd51848 100644 --- a/go.sum +++ b/go.sum @@ -402,8 +402,8 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/kubepug/kubepug v1.7.1 h1:LKhfSxS8Y5mXs50v+3Lpyec+cogErDLcV7CMUuiaisw= github.com/kubepug/kubepug v1.7.1/go.mod h1:lv+HxD0oTFL7ZWjj0u6HKhMbbTIId3eG7aWIW0gyF8g= -github.com/kubeshop/testkube-operator v1.17.55-0.20241111112201-61ba79ec4899 h1:mbUblLseBd2XosHWmfwonxTJU8w01hsuNXyUEa2ZjWA= -github.com/kubeshop/testkube-operator v1.17.55-0.20241111112201-61ba79ec4899/go.mod h1:P47tw1nKQFufdsZndyq2HG2MSa0zK/lU0XpRfZtEmIk= +github.com/kubeshop/testkube-operator v1.17.55-0.20241118133003-70462ac10f4a h1:xget2cwwqOL+K2Op9FPbMgfzj9lSVJAzZ9p48yxuFrE= +github.com/kubeshop/testkube-operator v1.17.55-0.20241118133003-70462ac10f4a/go.mod h1:P47tw1nKQFufdsZndyq2HG2MSa0zK/lU0XpRfZtEmIk= github.com/leodido/go-urn v1.2.1 h1:BqpAaACuzVSgi/VLzGZIobT2z4v53pjosyNd9Yv6n/w= github.com/leodido/go-urn v1.2.1/go.mod h1:zt4jvISO2HfUBqxjfIshjdMTYS56ZS/qv49ictyFfxY= github.com/lithammer/fuzzysearch v1.1.8 h1:/HIuJnjHuXS8bKaiTMeeDlW2/AyIWk2brx1V8LFgLN4=