-
Notifications
You must be signed in to change notification settings - Fork 674
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add support failure node #4308
Add support failure node #4308
Conversation
Signed-off-by: Kevin Su <pingsutw@apache.org>
Codecov ReportAttention:
Additional details and impacted files@@ Coverage Diff @@
## master #4308 +/- ##
==========================================
+ Coverage 58.92% 58.98% +0.05%
==========================================
Files 620 621 +1
Lines 52441 52483 +42
==========================================
+ Hits 30903 30957 +54
+ Misses 19073 19059 -14
- Partials 2465 2467 +2
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
Signed-off-by: Ketan Umare <ketan.umare@gmail.com>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC we add the FailureNode as part of the DAG so that we can use workflow inputs. A few questions / thoughts:
(1) Is there anything that stops us from using other task outputs? If not the FailureNode
will only execute if those tasks successfully completed right?
(2) This requires failure_policy=WorkflowFailurePolicy.FAIL_AFTER_EXECUTABLE_NODES_COMPLETE
to be set to work IIUC. I am not sure it's reasonable to require this. IMO this is implementation dependent, but doesn't make any sense for an end user.
case v1alpha1.WorkflowPhaseFailing: | ||
wfEvent.Phase = core.WorkflowExecution_FAILING | ||
wfEvent.OutputResult = convertToExecutionError(toStatus.Err, previousError) | ||
wStatus.UpdatePhase(v1alpha1.WorkflowPhaseFailing, "", wfEvent.GetError()) | ||
wfEvent.OccurredAt = utils.GetProtoTime(nil) | ||
case v1alpha1.WorkflowPhaseHandlingFailureNode: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you explain why we need another phase instead of just applying this during the WorkflowPhaseFailing
handle?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since we handle failure node in WorkflowPhaseHandlingFailureNode
phase here
flyte/flytepropeller/pkg/controller/workflow/executor.go
Lines 422 to 431 in ad0597c
case v1alpha1.WorkflowPhaseHandlingFailureNode: | |
newStatus, err := c.handleFailureNode(ctx, w) | |
if err != nil { | |
return err | |
} | |
failureErr := c.TransitionToPhase(ctx, w.ExecutionID.WorkflowExecutionIdentifier, wStatus, newStatus) | |
// Ignore ExecutionNotFound and IncompatibleCluster errors to allow graceful failure | |
if failureErr != nil && !(eventsErr.IsNotFound(failureErr) || eventsErr.IsEventIncompatibleClusterError(failureErr)) { | |
return failureErr | |
} |
we are able to move it to
WorkflowPhaseFailing
as well
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah OK, so you didn't add the WorkflowPhaseHandlingFailureNode
just implemented it correctly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yah, right
Signed-off-by: Kevin Su <pingsutw@apache.org>
it's a bit hard to implement in flytekit. To achieve that, we need to support try and catch in flytekit. like @workflow
def wf3(name: str = "Kevin"):
try:
id = create_cluster(name=name)
t = t1(a=1, b="2")
except:
d = delete_cluster(id=id)
c >> t >> d Failure node is used in the |
case v1alpha1.WorkflowPhaseFailing: | ||
wfEvent.Phase = core.WorkflowExecution_FAILING | ||
wfEvent.OutputResult = convertToExecutionError(toStatus.Err, previousError) | ||
wStatus.UpdatePhase(v1alpha1.WorkflowPhaseFailing, "", wfEvent.GetError()) | ||
wfEvent.OccurredAt = utils.GetProtoTime(nil) | ||
case v1alpha1.WorkflowPhaseHandlingFailureNode: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah OK, so you didn't add the WorkflowPhaseHandlingFailureNode
just implemented it correctly?
IMO the current behavior is better. I think you had it right. |
do you mean @workflow
def wf3(name: str = "Kevin"):
id = create_cluster(name=name)
try:
t = t1(a=1, b="2")
except:
d = delete_cluster(id=id)
c >> t >> d This is not easy to implement without AST parsing |
@workflow
def wf3(name: str = "Kevin"):
id = create_cluster(name=name)
with on_failure(delete_cluster(id=id)):
t = t1(a=1, b="2") |
Signed-off-by: Kevin Su <pingsutw@apache.org> Signed-off-by: Ketan Umare <ketan.umare@gmail.com> Co-authored-by: Ketan Umare <ketan.umare@gmail.com> Co-authored-by: Ketan Umare <16888709+kumare3@users.noreply.github.com> Signed-off-by: Paul Dittamo <pvdittamo@gmail.com>
Describe your changes
WorkflowPhaseHandlingFailureNode
if workflow fails and workflow has a failureNode.running
if subworkflow has a failureNode and subworkflow fails.Example
Three workflows
flytectl demo start --dev make compile # compile the single binary flyte start --config flyte-single-binary-local.yaml pip install git+https://github.com/flyteorg/flytekit.git@2582a8bc16f80ab3a7101af8360e9d7212236e43 pyflyte run --remote workflow.py wf
Tracking issue
#1506
Screenshots
Follow up
None
in flytekit.or