Skip to content

Commit

Permalink
Address comment
Browse files Browse the repository at this point in the history
Signed-off-by: Kevin Su <pingsutw@apache.org>
  • Loading branch information
pingsutw committed Dec 4, 2023
1 parent d6b4b1d commit b2ab9aa
Show file tree
Hide file tree
Showing 3 changed files with 5 additions and 15 deletions.
2 changes: 1 addition & 1 deletion flytepropeller/pkg/compiler/workflow_compiler.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ func (w workflowBuilder) ValidateWorkflow(fg *flyteWorkflow, errs errors.Compile

if fg.Template.FailureNode != nil {
failureNode := fg.Template.FailureNode
v.ValidateNode(&wf, wf.GetOrCreateNodeBuilder(failureNode), false /* validateConditionTypes */, errs.NewScope())
v.ValidateNode(&wf, wf.GetOrCreateNodeBuilder(failureNode), false, errs.NewScope())
wf.AddEdges(wf.GetOrCreateNodeBuilder(failureNode), c.EdgeDirectionUpstream, errs.NewScope())
}

Expand Down
15 changes: 3 additions & 12 deletions flytepropeller/pkg/compiler/workflow_compiler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,14 +148,8 @@ func TestCompileWorkflowWithFailureNode(t *testing.T) {
// Detect what other workflows/tasks does this coreWorkflow reference
subWorkflows := make([]*core.WorkflowTemplate, 0)
reqs, err := GetRequirements(inputWorkflow, subWorkflows)
if err != nil {
fmt.Printf("failed to get requirements. Error: %v", err)
return
}

fmt.Printf("Needed Tasks: [%v], Needed Workflows [%v]\n",
strings.Join(dumpIdentifierNames(reqs.GetRequiredTaskIds()), ","),
strings.Join(dumpIdentifierNames(reqs.GetRequiredLaunchPlanIds()), ","))
assert.Nil(t, err)
assert.Equal(t, reqs.taskIds, []common.Identifier{{Name: "cleanup"}, {Name: "task_123"}})

// Replace with logic to satisfy the requirements
workflows := make([]common.InterfaceProvider, 0)
Expand Down Expand Up @@ -193,10 +187,7 @@ func TestCompileWorkflowWithFailureNode(t *testing.T) {
compiledTasks := make([]*core.CompiledTask, 0, len(tasks))
for _, task := range tasks {
compiledTask, err := CompileTask(task)
if err != nil {
fmt.Printf("failed to compile task [%v]. Error: %v", task.Id, err)
return
}
assert.Nil(t, err)

compiledTasks = append(compiledTasks, compiledTask)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,7 @@ func (s *subworkflowHandler) getExecutionContextForDownstream(nCtx interfaces.No

func (s *subworkflowHandler) HandleFailureNodeOfSubWorkflow(ctx context.Context, nCtx interfaces.NodeExecutionContext, subworkflow v1alpha1.ExecutableSubWorkflow, nl executors.NodeLookup) (handler.Transition, error) {
originalError := nCtx.NodeStateReader().GetWorkflowNodeState().Error
failureNode := subworkflow.GetOnFailureNode()
if failureNode != nil {
if failureNode := subworkflow.GetOnFailureNode(); failureNode != nil {
execContext, err := s.getExecutionContextForDownstream(nCtx)
if err != nil {
return handler.UnknownTransition, err
Expand Down

0 comments on commit b2ab9aa

Please sign in to comment.