From 1078e09e46c6223b80a337c2d1c90bd0fe6e29d2 Mon Sep 17 00:00:00 2001 From: Rafael Raposo <100569684+RRap0so@users.noreply.github.com> Date: Wed, 29 May 2024 01:45:36 +0200 Subject: [PATCH] Add executionClusterLabel (#5394) * wAdd executionClusterLabel Signed-off-by: Rafael Raposo * Run make -c flytectl generate Signed-off-by: Eduardo Apolinario --------- Signed-off-by: Rafael Raposo Signed-off-by: Eduardo Apolinario Co-authored-by: Eduardo Apolinario --- flytectl/cmd/create/execution.go | 57 +++++++++++++------ flytectl/cmd/create/execution_util.go | 21 ++++--- flytectl/cmd/create/execution_util_test.go | 16 ++++++ flytectl/cmd/create/executionconfig_flags.go | 3 +- .../cmd/create/executionconfig_flags_test.go | 14 +++++ flytectl/cmd/get/execution_test.go | 8 +-- flytectl/cmd/get/node_execution_test.go | 8 +-- 7 files changed, 92 insertions(+), 35 deletions(-) diff --git a/flytectl/cmd/create/execution.go b/flytectl/cmd/create/execution.go index afef1db3f3..84b6e2dad9 100644 --- a/flytectl/cmd/create/execution.go +++ b/flytectl/cmd/create/execution.go @@ -79,20 +79,40 @@ The generated spec file can be modified to change the envs values, as shown belo task: core.control_flow.merge_sort.merge version: "v2" -4. Run the execution by passing the generated YAML file. +4. [Optional] Update the TargetExecutionCluster, if needed. +The generated spec file can be modified to change the TargetExecutionCluster values, as shown below: + +.. code-block:: yaml + + iamRoleARN: "" + inputs: + sorted_list1: + - 0 + sorted_list2: + - 0 + envs: + foo: bar + kubeServiceAcct: "" + targetDomain: "" + targetProject: "" + targetExecutionCluster: "" + task: core.control_flow.merge_sort.merge + version: "v2" + +5. Run the execution by passing the generated YAML file. The file can then be passed through the command line. It is worth noting that the source's and target's project and domain can be different. :: flytectl create execution --execFile execution_spec.yaml -p flytesnacks -d staging --targetProject flytesnacks -5. To relaunch an execution, pass the current execution ID as follows: +6. To relaunch an execution, pass the current execution ID as follows: :: flytectl create execution --relaunch ffb31066a0f8b4d52b77 -p flytesnacks -d development -6. To recover an execution, i.e., recreate it from the last known failure point for previously-run workflow execution, run: +7. To recover an execution, i.e., recreate it from the last known failure point for previously-run workflow execution, run: :: @@ -100,7 +120,7 @@ It is worth noting that the source's and target's project and domain can be diff See :ref:` + "`ref_flyteidl.admin.ExecutionRecoverRequest`" + ` for more details. -7. You can create executions idempotently by naming them. This is also a way to *name* an execution for discovery. Note, +8. You can create executions idempotently by naming them. This is also a way to *name* an execution for discovery. Note, an execution id has to be unique within a project domain. So if the *name* matches an existing execution an already exists exceptioj will be raised. @@ -108,7 +128,7 @@ will be raised. flytectl create execution --recover ffb31066a0f8b4d52b77 -p flytesnacks -d development custom_name -8. Generic/Struct/Dataclass/JSON types are supported for execution in a similar manner. +9. Generic/Struct/Dataclass/JSON types are supported for execution in a similar manner. The following is an example of how generic data can be specified while creating the execution. :: @@ -128,7 +148,7 @@ The generated file would look similar to this. Here, empty values have been dump task: core.type_system.custom_objects.add version: v3 -9. Modified file with struct data populated for 'x' and 'y' parameters for the task "core.type_system.custom_objects.add": +10. Modified file with struct data populated for 'x' and 'y' parameters for the task "core.type_system.custom_objects.add": :: @@ -152,7 +172,7 @@ The generated file would look similar to this. Here, empty values have been dump task: core.type_system.custom_objects.add version: v3 -10. If you have configured a plugin that implements github.com/flyteorg/flyteadmin/pkg/workflowengine/interfaces/WorkflowExecutor +11. If you have configured a plugin that implements github.com/flyteorg/flyteadmin/pkg/workflowengine/interfaces/WorkflowExecutor that supports cluster pools, then when creating a new execution, you can assign it to a specific cluster pool: :: @@ -166,17 +186,18 @@ The generated file would look similar to this. Here, empty values have been dump // ExecutionConfig hold configuration for create execution flags and configuration of the actual task or workflow to be launched. type ExecutionConfig struct { // pflag section - ExecFile string `json:"execFile,omitempty" pflag:",file for the execution params. If not specified defaults to <_name>.execution_spec.yaml"` - TargetDomain string `json:"targetDomain" pflag:",project where execution needs to be created. If not specified configured domain would be used."` - TargetProject string `json:"targetProject" pflag:",project where execution needs to be created. If not specified configured project would be used."` - KubeServiceAcct string `json:"kubeServiceAcct" pflag:",kubernetes service account AuthRole for launching execution."` - IamRoleARN string `json:"iamRoleARN" pflag:",iam role ARN AuthRole for launching execution."` - Relaunch string `json:"relaunch" pflag:",execution id to be relaunched."` - Recover string `json:"recover" pflag:",execution id to be recreated from the last known failure point."` - DryRun bool `json:"dryRun" pflag:",execute command without making any modifications."` - Version string `json:"version" pflag:",specify version of execution workflow/task."` - ClusterPool string `json:"clusterPool" pflag:",specify which cluster pool to assign execution to."` - OverwriteCache bool `json:"overwriteCache" pflag:",skip cached results when performing execution,causing all outputs to be re-calculated and stored data to be overwritten. Does not work for recovered executions."` + ExecFile string `json:"execFile,omitempty" pflag:",file for the execution params. If not specified defaults to <_name>.execution_spec.yaml"` + TargetDomain string `json:"targetDomain" pflag:",domain where execution needs to be created. If not specified configured domain would be used."` + TargetProject string `json:"targetProject" pflag:",project where execution needs to be created. If not specified configured project would be used."` + TargetExecutionCluster string `json:"targetExecutionCluster" pflag:",cluster where execution needs to be created. If not specific the default would be used."` + KubeServiceAcct string `json:"kubeServiceAcct" pflag:",kubernetes service account AuthRole for launching execution."` + IamRoleARN string `json:"iamRoleARN" pflag:",iam role ARN AuthRole for launching execution."` + Relaunch string `json:"relaunch" pflag:",execution id to be relaunched."` + Recover string `json:"recover" pflag:",execution id to be recreated from the last known failure point."` + DryRun bool `json:"dryRun" pflag:",execute command without making any modifications."` + Version string `json:"version" pflag:",specify version of execution workflow/task."` + ClusterPool string `json:"clusterPool" pflag:",specify which cluster pool to assign execution to."` + OverwriteCache bool `json:"overwriteCache" pflag:",skip cached results when performing execution,causing all outputs to be re-calculated and stored data to be overwritten. Does not work for recovered executions."` // Non plfag section is read from the execution config generated by get task/launch plan Workflow string `json:"workflow,omitempty"` Task string `json:"task,omitempty"` diff --git a/flytectl/cmd/create/execution_util.go b/flytectl/cmd/create/execution_util.go index 4961f4d9fc..060d22dddf 100644 --- a/flytectl/cmd/create/execution_util.go +++ b/flytectl/cmd/create/execution_util.go @@ -54,7 +54,7 @@ func createExecutionRequestForWorkflow(ctx context.Context, workflowName, projec } } - return createExecutionRequest(lp.Id, inputs, envs, securityContext, authRole, targetExecName), nil + return createExecutionRequest(lp.Id, inputs, envs, securityContext, authRole, targetExecName, executionConfig.TargetExecutionCluster), nil } func createExecutionRequestForTask(ctx context.Context, taskName string, project string, domain string, @@ -102,7 +102,7 @@ func createExecutionRequestForTask(ctx context.Context, taskName string, project Version: task.Id.Version, } - return createExecutionRequest(id, inputs, envs, securityContext, authRole, targetExecName), nil + return createExecutionRequest(id, inputs, envs, securityContext, authRole, targetExecName, executionConfig.TargetExecutionCluster), nil } func relaunchExecution(ctx context.Context, executionName string, project string, domain string, @@ -148,7 +148,7 @@ func recoverExecution(ctx context.Context, executionName string, project string, return nil } -func createExecutionRequest(ID *core.Identifier, inputs *core.LiteralMap, envs *admin.Envs, securityContext *core.SecurityContext, authRole *admin.AuthRole, targetExecName string) *admin.ExecutionCreateRequest { +func createExecutionRequest(ID *core.Identifier, inputs *core.LiteralMap, envs *admin.Envs, securityContext *core.SecurityContext, authRole *admin.AuthRole, targetExecName string, targetExecutionCluster string) *admin.ExecutionCreateRequest { if len(targetExecName) == 0 { targetExecName = "f" + strings.ReplaceAll(uuid.New().String(), "-", "")[:19] @@ -157,6 +157,10 @@ func createExecutionRequest(ID *core.Identifier, inputs *core.LiteralMap, envs * if executionConfig.ClusterPool != "" { clusterAssignment = &admin.ClusterAssignment{ClusterPoolName: executionConfig.ClusterPool} } + var executionClusterLabel *admin.ExecutionClusterLabel + if targetExecutionCluster != "" { + executionClusterLabel = &admin.ExecutionClusterLabel{Value: targetExecutionCluster} + } return &admin.ExecutionCreateRequest{ Project: executionConfig.TargetProject, Domain: executionConfig.TargetDomain, @@ -168,11 +172,12 @@ func createExecutionRequest(ID *core.Identifier, inputs *core.LiteralMap, envs * Principal: "sdk", Nesting: 0, }, - AuthRole: authRole, - SecurityContext: securityContext, - ClusterAssignment: clusterAssignment, - OverwriteCache: executionConfig.OverwriteCache, - Envs: envs, + AuthRole: authRole, + SecurityContext: securityContext, + ClusterAssignment: clusterAssignment, + OverwriteCache: executionConfig.OverwriteCache, + Envs: envs, + ExecutionClusterLabel: executionClusterLabel, }, Inputs: inputs, } diff --git a/flytectl/cmd/create/execution_util_test.go b/flytectl/cmd/create/execution_util_test.go index 526d863ca2..ab6391c983 100644 --- a/flytectl/cmd/create/execution_util_test.go +++ b/flytectl/cmd/create/execution_util_test.go @@ -129,6 +129,22 @@ func TestCreateExecutionRequestForWorkflow(t *testing.T) { assert.Nil(t, err) assert.NotNil(t, execCreateRequest) }) + t.Run("successful with execution Cluster label and envs", func(t *testing.T) { + s := setup() + defer s.TearDown() + + createExecutionUtilSetup() + launchPlan := &admin.LaunchPlan{} + s.FetcherExt.OnFetchLPVersionMatch(s.Ctx, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(launchPlan, nil) + var executionConfigWithEnvs = &ExecutionConfig{ + Envs: map[string]string{}, + TargetExecutionCluster: "cluster", + } + execCreateRequest, err := createExecutionRequestForWorkflow(s.Ctx, "wfName", config.GetConfig().Project, config.GetConfig().Domain, s.CmdCtx, executionConfigWithEnvs, "") + assert.Nil(t, err) + assert.NotNil(t, execCreateRequest) + assert.Equal(t, "cluster", execCreateRequest.Spec.ExecutionClusterLabel.Value) + }) t.Run("failed literal conversion", func(t *testing.T) { s := setup() defer s.TearDown() diff --git a/flytectl/cmd/create/executionconfig_flags.go b/flytectl/cmd/create/executionconfig_flags.go index 9908df93b1..897cc6ecce 100755 --- a/flytectl/cmd/create/executionconfig_flags.go +++ b/flytectl/cmd/create/executionconfig_flags.go @@ -51,8 +51,9 @@ func (ExecutionConfig) mustMarshalJSON(v json.Marshaler) string { func (cfg ExecutionConfig) GetPFlagSet(prefix string) *pflag.FlagSet { cmdFlags := pflag.NewFlagSet("ExecutionConfig", pflag.ExitOnError) cmdFlags.StringVar(&executionConfig.ExecFile, fmt.Sprintf("%v%v", prefix, "execFile"), executionConfig.ExecFile, "file for the execution params. If not specified defaults to <_name>.execution_spec.yaml") - cmdFlags.StringVar(&executionConfig.TargetDomain, fmt.Sprintf("%v%v", prefix, "targetDomain"), executionConfig.TargetDomain, "project where execution needs to be created. If not specified configured domain would be used.") + cmdFlags.StringVar(&executionConfig.TargetDomain, fmt.Sprintf("%v%v", prefix, "targetDomain"), executionConfig.TargetDomain, "domain where execution needs to be created. If not specified configured domain would be used.") cmdFlags.StringVar(&executionConfig.TargetProject, fmt.Sprintf("%v%v", prefix, "targetProject"), executionConfig.TargetProject, "project where execution needs to be created. If not specified configured project would be used.") + cmdFlags.StringVar(&executionConfig.TargetExecutionCluster, fmt.Sprintf("%v%v", prefix, "targetExecutionCluster"), executionConfig.TargetExecutionCluster, "cluster where execution needs to be created. If not specific the default would be used.") cmdFlags.StringVar(&executionConfig.KubeServiceAcct, fmt.Sprintf("%v%v", prefix, "kubeServiceAcct"), executionConfig.KubeServiceAcct, "kubernetes service account AuthRole for launching execution.") cmdFlags.StringVar(&executionConfig.IamRoleARN, fmt.Sprintf("%v%v", prefix, "iamRoleARN"), executionConfig.IamRoleARN, "iam role ARN AuthRole for launching execution.") cmdFlags.StringVar(&executionConfig.Relaunch, fmt.Sprintf("%v%v", prefix, "relaunch"), executionConfig.Relaunch, "execution id to be relaunched.") diff --git a/flytectl/cmd/create/executionconfig_flags_test.go b/flytectl/cmd/create/executionconfig_flags_test.go index 7891b4f9ba..e251b60c49 100755 --- a/flytectl/cmd/create/executionconfig_flags_test.go +++ b/flytectl/cmd/create/executionconfig_flags_test.go @@ -141,6 +141,20 @@ func TestExecutionConfig_SetFlags(t *testing.T) { } }) }) + t.Run("Test_targetExecutionCluster", func(t *testing.T) { + + t.Run("Override", func(t *testing.T) { + testValue := "1" + + cmdFlags.Set("targetExecutionCluster", testValue) + if vString, err := cmdFlags.GetString("targetExecutionCluster"); err == nil { + testDecodeJson_ExecutionConfig(t, fmt.Sprintf("%v", vString), &actual.TargetExecutionCluster) + + } else { + assert.FailNow(t, err.Error()) + } + }) + }) t.Run("Test_kubeServiceAcct", func(t *testing.T) { t.Run("Override", func(t *testing.T) { diff --git a/flytectl/cmd/get/execution_test.go b/flytectl/cmd/get/execution_test.go index 4f8ad3e8a3..51a1a1f825 100644 --- a/flytectl/cmd/get/execution_test.go +++ b/flytectl/cmd/get/execution_test.go @@ -176,7 +176,7 @@ func TestGetExecutionFuncWithIOData(t *testing.T) { nodeExecList := &admin.NodeExecutionList{NodeExecutions: nodeExecutions} inputs := map[string]*core.Literal{ - "val1": &core.Literal{ + "val1": { Value: &core.Literal_Scalar{ Scalar: &core.Scalar{ Value: &core.Scalar_Primitive{ @@ -191,7 +191,7 @@ func TestGetExecutionFuncWithIOData(t *testing.T) { }, } outputs := map[string]*core.Literal{ - "o2": &core.Literal{ + "o2": { Value: &core.Literal_Scalar{ Scalar: &core.Scalar{ Value: &core.Scalar_Primitive{ @@ -288,7 +288,7 @@ func TestGetExecutionFuncWithIOData(t *testing.T) { nodeExecutions := []*admin.NodeExecution{nodeExec1} nodeExecList := &admin.NodeExecutionList{NodeExecutions: nodeExecutions} inputs := map[string]*core.Literal{ - "val1": &core.Literal{ + "val1": { Value: &core.Literal_Scalar{ Scalar: &core.Scalar{ Value: &core.Scalar_Primitive{ @@ -303,7 +303,7 @@ func TestGetExecutionFuncWithIOData(t *testing.T) { }, } outputs := map[string]*core.Literal{ - "o2": &core.Literal{ + "o2": { Value: &core.Literal_Scalar{ Scalar: &core.Scalar{ Value: &core.Scalar_Primitive{ diff --git a/flytectl/cmd/get/node_execution_test.go b/flytectl/cmd/get/node_execution_test.go index 8f53e85484..f4d6040513 100644 --- a/flytectl/cmd/get/node_execution_test.go +++ b/flytectl/cmd/get/node_execution_test.go @@ -177,7 +177,7 @@ func TestGetExecutionDetails(t *testing.T) { nodeExecList := &admin.NodeExecutionList{NodeExecutions: nodeExecutions} inputs := map[string]*core.Literal{ - "val1": &core.Literal{ + "val1": { Value: &core.Literal_Scalar{ Scalar: &core.Scalar{ Value: &core.Scalar_Primitive{ @@ -192,7 +192,7 @@ func TestGetExecutionDetails(t *testing.T) { }, } outputs := map[string]*core.Literal{ - "o2": &core.Literal{ + "o2": { Value: &core.Literal_Scalar{ Scalar: &core.Scalar{ Value: &core.Scalar_Primitive{ @@ -243,7 +243,7 @@ func TestGetExecutionDetails(t *testing.T) { nodeExecList := &admin.NodeExecutionList{NodeExecutions: nodeExecutions} inputs := map[string]*core.Literal{ - "val1": &core.Literal{ + "val1": { Value: &core.Literal_Scalar{ Scalar: &core.Scalar{ Value: &core.Scalar_Primitive{ @@ -258,7 +258,7 @@ func TestGetExecutionDetails(t *testing.T) { }, } outputs := map[string]*core.Literal{ - "o2": &core.Literal{ + "o2": { Value: &core.Literal_Scalar{ Scalar: &core.Scalar{ Value: &core.Scalar_Primitive{