Skip to content
This repository has been archived by the owner on May 31, 2024. It is now read-only.

Commit

Permalink
Hydrate failure node
Browse files Browse the repository at this point in the history
Signed-off-by: Kevin Su <pingsutw@gmail.com>
  • Loading branch information
pingsutw committed Jan 18, 2024
1 parent 54ddddb commit faa901c
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 2 deletions.
25 changes: 25 additions & 0 deletions cmd/register/files_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,31 @@ func TestRegisterFromFiles(t *testing.T) {
err = registerFromFilesFunc(s.Ctx, args, s.CmdCtx)
assert.Nil(t, err)
})
t.Run("Register a workflow with a failure node", func(t *testing.T) {
s := setup()
testScope := promutils.NewTestScope()
labeled.SetMetricKeys(contextutils.AppNameKey, contextutils.ProjectKey, contextutils.DomainKey)
registerFilesSetup()
rconfig.DefaultFilesConfig.Archive = true
rconfig.DefaultFilesConfig.OutputLocationPrefix = s3Output
rconfig.DefaultFilesConfig.DeprecatedSourceUploadPath = s3Output
mockStorage, err := storage.NewDataStore(&storage.Config{
Type: storage.TypeMemory,
}, testScope.NewSubScope("flytectl"))
assert.Nil(t, err)
Client = mockStorage

args := []string{"testdata/failure-node.tgz"}
s.MockAdminClient.OnCreateTaskMatch(mock.Anything, mock.Anything).Return(nil, nil)
s.MockAdminClient.OnCreateWorkflowMatch(mock.Anything, mock.Anything).Return(nil, nil)
s.MockAdminClient.OnCreateLaunchPlanMatch(mock.Anything, mock.Anything).Return(nil, nil)
s.MockAdminClient.OnUpdateLaunchPlanMatch(mock.Anything, mock.Anything).Return(nil, nil)
mockDataProxy := s.MockClient.DataProxyClient().(*mocks.DataProxyServiceClient)
mockDataProxy.OnCreateUploadLocationMatch(s.Ctx, mock.Anything).Return(&service.CreateUploadLocationResponse{}, nil)

err = registerFromFilesFunc(s.Ctx, args, s.CmdCtx)
assert.Nil(t, err)
})
t.Run("Failed fast registration while uploading the codebase", func(t *testing.T) {
s := setup()
registerFilesSetup()
Expand Down
4 changes: 2 additions & 2 deletions cmd/register/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
// Long descriptions are whitespace sensitive when generating docs using sphinx.
const (
registerCmdShort = "Registers tasks, workflows, and launch plans from a list of generated serialized files."
registercmdLong = `
registerCmdLong = `
Take input files as serialized versions of the tasks/workflows/launchplans and register them with FlyteAdmin.
Currently, these input files are protobuf files generated as output from Flytekit serialize.
Project and Domain are mandatory fields to be passed for registration and an optional version which defaults to v1.
Expand All @@ -23,7 +23,7 @@ func RemoteRegisterCommand() *cobra.Command {
registerCmd := &cobra.Command{
Use: "register",
Short: registerCmdShort,
Long: registercmdLong,
Long: registerCmdLong,
}
registerResourcesFuncs := map[string]cmdcore.CommandEntry{
"files": {CmdFunc: registerFromFilesFunc, Aliases: []string{"file"}, PFlagProvider: rconfig.DefaultFilesConfig,
Expand Down
10 changes: 10 additions & 0 deletions cmd/register/register_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -491,13 +491,23 @@ func hydrateSpec(message proto.Message, uploadLocation storage.DataReference, co
return err
}
}
if workflowSpec.Template.GetFailureNode() != nil {
if err := hydrateNode(workflowSpec.Template.GetFailureNode(), config.Version, config.Force); err != nil {
return err
}

Check warning on line 497 in cmd/register/register_util.go

View check run for this annotation

Codecov / codecov/patch

cmd/register/register_util.go#L496-L497

Added lines #L496 - L497 were not covered by tests
}
hydrateIdentifier(workflowSpec.Template.Id, config.Version, config.Force)
for _, subWorkflow := range workflowSpec.SubWorkflows {
for _, Noderef := range subWorkflow.Nodes {
if err := hydrateNode(Noderef, config.Version, config.Force); err != nil {
return err
}
}
if subWorkflow.GetFailureNode() != nil {
if err := hydrateNode(subWorkflow.GetFailureNode(), config.Version, config.Force); err != nil {
return err
}

Check warning on line 509 in cmd/register/register_util.go

View check run for this annotation

Codecov / codecov/patch

cmd/register/register_util.go#L508-L509

Added lines #L508 - L509 were not covered by tests
}
hydrateIdentifier(subWorkflow.Id, config.Version, config.Force)
}
case *admin.TaskSpec:
Expand Down
Binary file added cmd/register/testdata/failure-node.tgz
Binary file not shown.

0 comments on commit faa901c

Please sign in to comment.