Skip to content
This repository has been archived by the owner on May 31, 2024. It is now read-only.

Commit

Permalink
Support envs when creating execution (#408)
Browse files Browse the repository at this point in the history
* Support envs when creating execution

Signed-off-by: Hongxin Liang <honnix@users.noreply.github.com>

* Update doc

Signed-off-by: Hongxin Liang <honnix@users.noreply.github.com>

---------

Signed-off-by: Hongxin Liang <honnix@users.noreply.github.com>
  • Loading branch information
honnix authored Jun 5, 2023
1 parent e5e63ba commit 0cb2bfa
Show file tree
Hide file tree
Showing 7 changed files with 466 additions and 14 deletions.
34 changes: 27 additions & 7 deletions cmd/create/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,36 +60,55 @@ The generated spec file can be modified to change the input values, as shown bel
task: core.control_flow.merge_sort.merge
version: "v2"
3. Run the execution by passing the generated YAML file.
3. [Optional] Update the envs for the execution, if needed.
The generated spec file can be modified to change the envs values, as shown below:
.. code-block:: yaml
iamRoleARN: ""
inputs:
sorted_list1:
- 0
sorted_list2:
- 0
envs:
foo: bar
kubeServiceAcct: ""
targetDomain: ""
targetProject: ""
task: core.control_flow.merge_sort.merge
version: "v2"
4. Run the execution by passing the generated YAML file.
The file can then be passed through the command line.
It is worth noting that the source's and target's project and domain can be different.
::
flytectl create execution --execFile execution_spec.yaml -p flytesnacks -d staging --targetProject flytesnacks
4. To relaunch an execution, pass the current execution ID as follows:
5. To relaunch an execution, pass the current execution ID as follows:
::
flytectl create execution --relaunch ffb31066a0f8b4d52b77 -p flytesnacks -d development
5. To recover an execution, i.e., recreate it from the last known failure point for previously-run workflow execution, run:
6. To recover an execution, i.e., recreate it from the last known failure point for previously-run workflow execution, run:
::
flytectl create execution --recover ffb31066a0f8b4d52b77 -p flytesnacks -d development
See :ref:` + "`ref_flyteidl.admin.ExecutionRecoverRequest`" + ` for more details.
6. You can create executions idempotently by naming them. This is also a way to *name* an execution for discovery. Note,
7. You can create executions idempotently by naming them. This is also a way to *name* an execution for discovery. Note,
an execution id has to be unique within a project domain. So if the *name* matches an existing execution an already exists exceptioj
will be raised.
::
flytectl create execution --recover ffb31066a0f8b4d52b77 -p flytesnacks -d development custom_name
7. Generic/Struct/Dataclass/JSON types are supported for execution in a similar manner.
8. Generic/Struct/Dataclass/JSON types are supported for execution in a similar manner.
The following is an example of how generic data can be specified while creating the execution.
::
Expand All @@ -109,7 +128,7 @@ The generated file would look similar to this. Here, empty values have been dump
task: core.type_system.custom_objects.add
version: v3
8. Modified file with struct data populated for 'x' and 'y' parameters for the task "core.type_system.custom_objects.add":
9. Modified file with struct data populated for 'x' and 'y' parameters for the task "core.type_system.custom_objects.add":
::
Expand All @@ -133,7 +152,7 @@ The generated file would look similar to this. Here, empty values have been dump
task: core.type_system.custom_objects.add
version: v3
9. If you have configured a plugin that implements github.com/flyteorg/flyteadmin/pkg/workflowengine/interfaces/WorkflowExecutor
10. If you have configured a plugin that implements github.com/flyteorg/flyteadmin/pkg/workflowengine/interfaces/WorkflowExecutor
that supports cluster pools, then when creating a new execution, you can assign it to a specific cluster pool:
::
Expand Down Expand Up @@ -162,6 +181,7 @@ type ExecutionConfig struct {
Workflow string `json:"workflow,omitempty"`
Task string `json:"task,omitempty"`
Inputs map[string]interface{} `json:"inputs" pflag:"-"`
Envs map[string]string `json:"envs" pflag:"-"`
}

type ExecutionType int
Expand Down
1 change: 1 addition & 0 deletions cmd/create/execution_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ func (s *createSuite) Test_CreateTaskExecution() {
},
},
ClusterAssignment: &admin.ClusterAssignment{ClusterPoolName: "gpu"},
Envs: &admin.Envs{},
},
}
s.MockAdminClient.
Expand Down
20 changes: 16 additions & 4 deletions cmd/create/execution_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ func createExecutionRequestForWorkflow(ctx context.Context, workflowName, projec
Literals: paramLiterals,
}

envs := makeEnvs(executionConfig)

// Set both deprecated field and new field for security identity passing
var securityContext *core.SecurityContext
var authRole *admin.AuthRole
Expand All @@ -52,7 +54,7 @@ func createExecutionRequestForWorkflow(ctx context.Context, workflowName, projec
}
}

return createExecutionRequest(lp.Id, inputs, securityContext, authRole, targetExecName), nil
return createExecutionRequest(lp.Id, inputs, envs, securityContext, authRole, targetExecName), nil
}

func createExecutionRequestForTask(ctx context.Context, taskName string, project string, domain string,
Expand All @@ -73,6 +75,8 @@ func createExecutionRequestForTask(ctx context.Context, taskName string, project
Literals: variableLiterals,
}

envs := makeEnvs(executionConfig)

// Set both deprecated field and new field for security identity passing
var securityContext *core.SecurityContext
var authRole *admin.AuthRole
Expand All @@ -98,7 +102,7 @@ func createExecutionRequestForTask(ctx context.Context, taskName string, project
Version: task.Id.Version,
}

return createExecutionRequest(id, inputs, securityContext, authRole, targetExecName), nil
return createExecutionRequest(id, inputs, envs, securityContext, authRole, targetExecName), nil
}

func relaunchExecution(ctx context.Context, executionName string, project string, domain string,
Expand Down Expand Up @@ -144,8 +148,7 @@ func recoverExecution(ctx context.Context, executionName string, project string,
return nil
}

func createExecutionRequest(ID *core.Identifier, inputs *core.LiteralMap, securityContext *core.SecurityContext,
authRole *admin.AuthRole, targetExecName string) *admin.ExecutionCreateRequest {
func createExecutionRequest(ID *core.Identifier, inputs *core.LiteralMap, envs *admin.Envs, securityContext *core.SecurityContext, authRole *admin.AuthRole, targetExecName string) *admin.ExecutionCreateRequest {

if len(targetExecName) == 0 {
targetExecName = "f" + strings.ReplaceAll(uuid.New().String(), "-", "")[:19]
Expand All @@ -169,6 +172,7 @@ func createExecutionRequest(ID *core.Identifier, inputs *core.LiteralMap, securi
SecurityContext: securityContext,
ClusterAssignment: clusterAssignment,
OverwriteCache: executionConfig.OverwriteCache,
Envs: envs,
},
Inputs: inputs,
}
Expand Down Expand Up @@ -251,3 +255,11 @@ func readConfigAndValidate(project string, domain string) (ExecutionParams, erro
}
return ExecutionParams{name: name, execType: execType}, nil
}

func makeEnvs(executionConfig *ExecutionConfig) *admin.Envs {
var values []*core.KeyValuePair
for key, value := range executionConfig.Envs {
values = append(values, &core.KeyValuePair{Key: key, Value: value})
}
return &admin.Envs{Values: values}
}
56 changes: 56 additions & 0 deletions cmd/create/execution_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,30 @@ func TestCreateExecutionRequestForWorkflow(t *testing.T) {
assert.Nil(t, err)
assert.NotNil(t, execCreateRequest)
})
t.Run("successful with envs", func(t *testing.T) {
s := setup()
createExecutionUtilSetup()
launchPlan := &admin.LaunchPlan{}
s.FetcherExt.OnFetchLPVersionMatch(s.Ctx, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(launchPlan, nil)
var executionConfigWithEnvs = &ExecutionConfig{
Envs: map[string]string{"foo": "bar"},
}
execCreateRequest, err := createExecutionRequestForWorkflow(s.Ctx, "wfName", config.GetConfig().Project, config.GetConfig().Domain, s.CmdCtx, executionConfigWithEnvs, "")
assert.Nil(t, err)
assert.NotNil(t, execCreateRequest)
})
t.Run("successful with empty envs", func(t *testing.T) {
s := setup()
createExecutionUtilSetup()
launchPlan := &admin.LaunchPlan{}
s.FetcherExt.OnFetchLPVersionMatch(s.Ctx, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(launchPlan, nil)
var executionConfigWithEnvs = &ExecutionConfig{
Envs: map[string]string{},
}
execCreateRequest, err := createExecutionRequestForWorkflow(s.Ctx, "wfName", config.GetConfig().Project, config.GetConfig().Domain, s.CmdCtx, executionConfigWithEnvs, "")
assert.Nil(t, err)
assert.NotNil(t, execCreateRequest)
})
t.Run("failed literal conversion", func(t *testing.T) {
s := setup()
createExecutionUtilSetup()
Expand Down Expand Up @@ -144,6 +168,38 @@ func TestCreateExecutionRequestForTask(t *testing.T) {
assert.Nil(t, err)
assert.NotNil(t, execCreateRequest)
})
t.Run("successful with envs", func(t *testing.T) {
s := setup()
createExecutionUtilSetup()
task := &admin.Task{
Id: &core.Identifier{
Name: "taskName",
},
}
s.FetcherExt.OnFetchTaskVersionMatch(s.Ctx, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(task, nil)
var executionConfigWithEnvs = &ExecutionConfig{
Envs: map[string]string{"foo": "bar"},
}
execCreateRequest, err := createExecutionRequestForTask(s.Ctx, "taskName", config.GetConfig().Project, config.GetConfig().Domain, s.CmdCtx, executionConfigWithEnvs, "")
assert.Nil(t, err)
assert.NotNil(t, execCreateRequest)
})
t.Run("successful with empty envs", func(t *testing.T) {
s := setup()
createExecutionUtilSetup()
task := &admin.Task{
Id: &core.Identifier{
Name: "taskName",
},
}
s.FetcherExt.OnFetchTaskVersionMatch(s.Ctx, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(task, nil)
var executionConfigWithEnvs = &ExecutionConfig{
Envs: map[string]string{},
}
execCreateRequest, err := createExecutionRequestForTask(s.Ctx, "taskName", config.GetConfig().Project, config.GetConfig().Domain, s.CmdCtx, executionConfigWithEnvs, "")
assert.Nil(t, err)
assert.NotNil(t, execCreateRequest)
})
t.Run("failed literal conversion", func(t *testing.T) {
s := setup()
createExecutionUtilSetup()
Expand Down
1 change: 1 addition & 0 deletions cmd/get/execution_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
type ExecutionConfig struct {
IamRoleARN string `yaml:"iamRoleARN"`
Inputs map[string]yaml.Node `yaml:"inputs"`
Envs map[string]string `yaml:"envs"`
KubeServiceAcct string `yaml:"kubeServiceAcct"`
TargetDomain string `yaml:"targetDomain"`
TargetProject string `yaml:"targetProject"`
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ require (
github.com/docker/docker v20.10.7+incompatible
github.com/docker/go-connections v0.4.0
github.com/enescakir/emoji v1.0.0
github.com/flyteorg/flyteidl v1.3.8
github.com/flyteorg/flyteidl v1.5.10
github.com/flyteorg/flytepropeller v1.1.1
github.com/flyteorg/flytestdlib v1.0.13
github.com/go-ozzo/ozzo-validation/v4 v4.3.0
Expand Down
Loading

0 comments on commit 0cb2bfa

Please sign in to comment.