Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support failure node #4308

Merged
merged 24 commits into from
Dec 9, 2023
Merged

Add support failure node #4308

merged 24 commits into from
Dec 9, 2023

Conversation

pingsutw
Copy link
Member

@pingsutw pingsutw commented Oct 26, 2023

Describe your changes

  • Compile failure node
  • Add FailureNodeLookup
  • Update Failure node binding.
  • Transition workflow status to WorkflowPhaseHandlingFailureNode if workflow fails and workflow has a failureNode.
  • Transition subworkflow status to running if subworkflow has a failureNode and subworkflow fails.

Example

Three workflows

  1. wf1: both workflow and subworkflow have a failure node. propeller will create two tasks when failing
  2. wf2: Only one workflow has a failure node. propeller will create one task when failing
  3. wf3: failure node is a workflowNode. propeller will create another workflow when failing
flytectl demo start --dev
make compile  # compile the single binary
flyte start --config flyte-single-binary-local.yaml
pip install git+https://github.com/flyteorg/flytekit.git@2582a8bc16f80ab3a7101af8360e9d7212236e43
pyflyte run --remote workflow.py wf
import typing
from click.testing import CliRunner

from flytekit import task, workflow, ImageSpec, WorkflowFailurePolicy
from flytekit.clis.sdk_in_container import pyflyte
from flytekit.types.error.error import FlyteError

new_flytekit = "git+https://github.com/flyteorg/flytekit.git@5a415107b0aff272a16eb147860a65d47a10c4d8"
image_spec = ImageSpec(packages=[new_flytekit], apt_packages=["git"], registry="pingsutw")


@task(container_image=image_spec)
def create_cluster(name: str):
    print(f"Creating cluster: {name}")


@task(container_image=image_spec)
def t1(a: int, b: str):
    print(f"{a} {b}")
    raise ValueError("Fail!")


@task(container_image=image_spec)
def delete_cluster(name: str, err: typing.Optional[FlyteError] = None):
    print(f"Deleting cluster {name}")
    print(err)


@task(container_image=image_spec)
def clean_up(name: str, err: typing.Optional[FlyteError] = None):
    print(f"Deleting cluster {name} due to {err}")
    print(err)


@workflow(on_failure=clean_up)
def subwf(name: str = "kevin"):
    c = create_cluster(name=name)
    t = t1(a=1, b="2")
    d = delete_cluster(name=name)
    c >> t >> d


@workflow(on_failure=clean_up, failure_policy=WorkflowFailurePolicy.FAIL_AFTER_EXECUTABLE_NODES_COMPLETE)
def wf1(name: str = "kevin"):
    c = create_cluster(name=name)
    subwf(name="pingsutw")
    t = t1(a=1, b="2")
    d = delete_cluster(name=name)
    c >> t >> d


@workflow(on_failure=clean_up)
def wf2(name: str = "kevin"):
    c = create_cluster(name=name)
    t = t1(a=1, b="2")
    d = delete_cluster(name=name)
    c >> t >> d


@workflow
def clean_up_wf(name: str = "kevin"):
    return create_cluster(name=name)


@workflow(on_failure=clean_up_wf)
def wf3(name: str = "Kevin"):
    c = create_cluster(name=name)
    t = t1(a=1, b="2")
    d = delete_cluster(name=name)
    c >> t >> d


if __name__ == '__main__':
    runner = CliRunner()
    result = runner.invoke(pyflyte.main, ["run", "--remote", "failure_node.py", "wf2"])
    print(result.output)

Tracking issue

#1506

Screenshots

image

Follow up

  1. We should save error message as input.pb for failure node. For now, err is always None in flytekit.
@task(container_image=image_spec)
def clean_up(name: str, err: typing.Optional[FlyteError] = None):
    print(f"Deleting cluster {name} due to {err}")
    print(err)
  1. Support try and catch
@workflow
def wf3(name: str = "Kevin"):
    id = create_cluster(name=name)
    try:
        t = t1(a=1, b="2")
    except:
        d = delete_cluster(id=id)
    c >> t >> d

or

@workflow
def wf3(name: str = "Kevin"):
    id = create_cluster(name=name)
   with on_failure(delete_cluster(id=id)):
        t = t1(a=1, b="2")

Signed-off-by: Kevin Su <pingsutw@apache.org>
@pingsutw pingsutw marked this pull request as draft October 26, 2023 19:51
@pingsutw pingsutw changed the title Add support failure node [WIP] Add support failure node Oct 26, 2023
@codecov
Copy link

codecov bot commented Oct 26, 2023

Codecov Report

Attention: 20 lines in your changes are missing coverage. Please review.

Comparison is base (b50ba87) 58.92% compared to head (340f535) 58.98%.

Files Patch % Lines
...er/pkg/controller/nodes/subworkflow/subworkflow.go 0.00% 10 Missing ⚠️
flytepropeller/pkg/controller/workflow/executor.go 68.18% 7 Missing ⚠️
...peller/pkg/controller/nodes/subworkflow/handler.go 40.00% 3 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##           master    #4308      +/-   ##
==========================================
+ Coverage   58.92%   58.98%   +0.05%     
==========================================
  Files         620      621       +1     
  Lines       52441    52483      +42     
==========================================
+ Hits        30903    30957      +54     
+ Misses      19073    19059      -14     
- Partials     2465     2467       +2     
Flag Coverage Δ
unittests 58.98% <70.14%> (+0.05%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

pingsutw and others added 11 commits November 14, 2023 12:08
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Ketan Umare <ketan.umare@gmail.com>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
@pingsutw pingsutw changed the title [WIP] Add support failure node Add support failure node Nov 17, 2023
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
@pingsutw pingsutw marked this pull request as ready for review November 30, 2023 20:00
@dosubot dosubot bot added the size:L This PR changes 100-499 lines, ignoring generated files. label Nov 30, 2023
@pingsutw pingsutw requested a review from EngHabu December 1, 2023 00:27
Copy link
Contributor

@hamersaw hamersaw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC we add the FailureNode as part of the DAG so that we can use workflow inputs. A few questions / thoughts:
(1) Is there anything that stops us from using other task outputs? If not the FailureNode will only execute if those tasks successfully completed right?
(2) This requires failure_policy=WorkflowFailurePolicy.FAIL_AFTER_EXECUTABLE_NODES_COMPLETE to be set to work IIUC. I am not sure it's reasonable to require this. IMO this is implementation dependent, but doesn't make any sense for an end user.

flytepropeller/pkg/compiler/workflow_compiler.go Outdated Show resolved Hide resolved
flytepropeller/pkg/compiler/workflow_compiler_test.go Outdated Show resolved Hide resolved
flytepropeller/pkg/compiler/workflow_compiler_test.go Outdated Show resolved Hide resolved
flytepropeller/pkg/compiler/workflow_compiler_test.go Outdated Show resolved Hide resolved
case v1alpha1.WorkflowPhaseFailing:
wfEvent.Phase = core.WorkflowExecution_FAILING
wfEvent.OutputResult = convertToExecutionError(toStatus.Err, previousError)
wStatus.UpdatePhase(v1alpha1.WorkflowPhaseFailing, "", wfEvent.GetError())
wfEvent.OccurredAt = utils.GetProtoTime(nil)
case v1alpha1.WorkflowPhaseHandlingFailureNode:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain why we need another phase instead of just applying this during the WorkflowPhaseFailing handle?

Copy link
Member Author

@pingsutw pingsutw Dec 4, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since we handle failure node in WorkflowPhaseHandlingFailureNode phase here

case v1alpha1.WorkflowPhaseHandlingFailureNode:
newStatus, err := c.handleFailureNode(ctx, w)
if err != nil {
return err
}
failureErr := c.TransitionToPhase(ctx, w.ExecutionID.WorkflowExecutionIdentifier, wStatus, newStatus)
// Ignore ExecutionNotFound and IncompatibleCluster errors to allow graceful failure
if failureErr != nil && !(eventsErr.IsNotFound(failureErr) || eventsErr.IsEventIncompatibleClusterError(failureErr)) {
return failureErr
}
.
we are able to move it to WorkflowPhaseFailing as well

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah OK, so you didn't add the WorkflowPhaseHandlingFailureNode just implemented it correctly?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yah, right

Signed-off-by: Kevin Su <pingsutw@apache.org>
@pingsutw
Copy link
Member Author

pingsutw commented Dec 4, 2023

(1) Is there anything that stops us from using other task outputs? If not the FailureNode will only execute if those tasks successfully completed right?

it's a bit hard to implement in flytekit. To achieve that, we need to support try and catch in flytekit. like

@workflow
def wf3(name: str = "Kevin"):
    try:
        id = create_cluster(name=name)
        t = t1(a=1, b="2")
    except:
        d = delete_cluster(id=id)
    c >> t >> d

Failure node is used in the @workflow, so there is no way to pass the input (promise) to failure node. Therefore, we uses workflow inputs by default.

case v1alpha1.WorkflowPhaseFailing:
wfEvent.Phase = core.WorkflowExecution_FAILING
wfEvent.OutputResult = convertToExecutionError(toStatus.Err, previousError)
wStatus.UpdatePhase(v1alpha1.WorkflowPhaseFailing, "", wfEvent.GetError())
wfEvent.OccurredAt = utils.GetProtoTime(nil)
case v1alpha1.WorkflowPhaseHandlingFailureNode:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah OK, so you didn't add the WorkflowPhaseHandlingFailureNode just implemented it correctly?

@hamersaw
Copy link
Contributor

hamersaw commented Dec 5, 2023

(1) Is there anything that stops us from using other task outputs? If not the FailureNode will only execute if those tasks successfully completed right?

it's a bit hard to implement in flytekit. To achieve that, we need to support try and catch in flytekit. like

@workflow
def wf3(name: str = "Kevin"):
    try:
        id = create_cluster(name=name)
        t = t1(a=1, b="2")
    except:
        d = delete_cluster(id=id)
    c >> t >> d

Failure node is used in the @workflow, so there is no way to pass the input (promise) to failure node. Therefore, we uses workflow inputs by default.

IMO the current behavior is better. I think you had it right.

@kumare3
Copy link
Contributor

kumare3 commented Dec 5, 2023

do you mean

@workflow
def wf3(name: str = "Kevin"):
    id = create_cluster(name=name)
    try:
        t = t1(a=1, b="2")
    except:
        d = delete_cluster(id=id)
    c >> t >> d

This is not easy to implement without AST parsing

@kumare3
Copy link
Contributor

kumare3 commented Dec 5, 2023

@workflow
def wf3(name: str = "Kevin"):
    id = create_cluster(name=name)
   with on_failure(delete_cluster(id=id)):
        t = t1(a=1, b="2")

@pingsutw pingsutw merged commit 91d24a9 into master Dec 9, 2023
43 checks passed
@pingsutw pingsutw deleted the failure_node branch December 9, 2023 00:37
pvditt pushed a commit that referenced this pull request Dec 13, 2023
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Ketan Umare <ketan.umare@gmail.com>
Co-authored-by: Ketan Umare <ketan.umare@gmail.com>
Co-authored-by: Ketan Umare <16888709+kumare3@users.noreply.github.com>
Signed-off-by: Paul Dittamo <pvdittamo@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
size:L This PR changes 100-499 lines, ignoring generated files.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants