Skip to content

Commit

Permalink
init failure node
Browse files Browse the repository at this point in the history
Signed-off-by: Kevin Su <pingsutw@apache.org>
  • Loading branch information
pingsutw committed Oct 26, 2023
1 parent 4bad6c2 commit 5319a29
Show file tree
Hide file tree
Showing 7 changed files with 32 additions and 6 deletions.
2 changes: 1 addition & 1 deletion charts/flyte-core/values-eks.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ configmap:
propeller:
resourcemanager:
type: noop
# Note: By default resource manager is disable for propeller, Please use `type: redis` to enaable
# Note: By default resource manager is disabled for propeller, Please use `type: redis` to enable
# type: redis
# resourceMaxQuota: 10000
# redis:
Expand Down
2 changes: 2 additions & 0 deletions flytepropeller/pkg/apis/flyteworkflow/v1alpha1/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"encoding/json"
"fmt"

"github.com/golang/protobuf/jsonpb"
"github.com/pkg/errors"
Expand Down Expand Up @@ -331,6 +332,7 @@ func (in *WorkflowSpec) GetOutputs() *OutputVarMap {
}

func (in *WorkflowSpec) GetNode(nodeID NodeID) (ExecutableNode, bool) {
fmt.Print("Getting node ", nodeID, " from ", in.Nodes)
n, ok := in.Nodes[nodeID]
return n, ok
}
Expand Down
4 changes: 4 additions & 0 deletions flytepropeller/pkg/compiler/transformers/k8s/node.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package k8s

import (
"context"
"github.com/flyteorg/flyte/flytestdlib/logger"
"strings"

"github.com/go-test/deep"
Expand Down Expand Up @@ -33,7 +35,9 @@ func buildNodeSpec(n *core.Node, tasks []*core.CompiledTask, errs errors.Compile
if n.GetTaskNode() != nil {
taskID := n.GetTaskNode().GetReferenceId().String()
// TODO: Use task index for quick lookup
logger.Info(context.Background(), "kevin Looking up task", "taskID", taskID)
for _, t := range tasks {
logger.Infof(context.Background(), "kevin Comparing %v with %v", t.Template.Id.String(), taskID)
if t.Template.Id.String() == taskID {
task = t.Template
break
Expand Down
2 changes: 2 additions & 0 deletions flytepropeller/pkg/controller/executors/node_lookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package executors

import (
"context"
"fmt"

"github.com/flyteorg/flyte/flytepropeller/pkg/apis/flyteworkflow/v1alpha1"
)
Expand Down Expand Up @@ -46,6 +47,7 @@ type staticNodeLookup struct {
}

func (s staticNodeLookup) GetNode(nodeID v1alpha1.NodeID) (v1alpha1.ExecutableNode, bool) {
fmt.Print("staticNodeLookup.GetNode")
n, ok := s.nodes[nodeID]
return n, ok
}
Expand Down
3 changes: 2 additions & 1 deletion flytepropeller/pkg/controller/nodes/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ func (c *recursiveNodeExecutor) RecursiveNodeHandler(ctx context.Context, execCo
currentNodeCtx := contextutils.WithNodeID(ctx, currentNode.GetID())
nodeStatus := nl.GetNodeExecutionStatus(ctx, currentNode.GetID())
nodePhase := nodeStatus.GetPhase()
logger.Infof(currentNodeCtx, "Handling node [%v] Status [%v]", currentNode.GetID(), nodePhase.String())

if canHandleNode(nodePhase) {
// TODO Follow up Pull Request,
Expand Down Expand Up @@ -989,7 +990,7 @@ func (c *nodeExecutor) handleNotYetStartedNode(ctx context.Context, dag executor

if np != nodeStatus.GetPhase() {
// assert np == Queued!
logger.Infof(ctx, "Change in node state detected from [%s] -> [%s]", nodeStatus.GetPhase().String(), np.String())
logger.Infof(ctx, "Change in node state detected1 from [%s] -> [%s]", nodeStatus.GetPhase().String(), np.String())
p = p.WithOccuredAt(occurredAt)

nev, err := ToNodeExecutionEvent(nCtx.NodeExecutionMetadata().GetNodeExecutionID(),
Expand Down
1 change: 1 addition & 0 deletions flytepropeller/pkg/controller/nodes/node_exec_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,7 @@ func isAboveInterruptibleFailureThreshold(numFailures uint32, maxAttempts uint32

func (c *nodeExecutor) BuildNodeExecutionContext(ctx context.Context, executionContext executors.ExecutionContext,
nl executors.NodeLookup, currentNodeID v1alpha1.NodeID) (interfaces.NodeExecutionContext, error) {
fmt.Printf("-------------- BuildNodeExecutionContext for node [%v] in execution [%v]\n", currentNodeID, executionContext.GetID())
n, ok := nl.GetNode(currentNodeID)
if !ok {
return nil, fmt.Errorf("failed to find node with ID [%s] in execution [%s]", currentNodeID, executionContext.GetID())
Expand Down
24 changes: 20 additions & 4 deletions flytepropeller/pkg/controller/workflow/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,13 +170,22 @@ func (c *workflowExecutor) handleRunningWorkflow(ctx context.Context, w *v1alpha
func (c *workflowExecutor) handleFailureNode(ctx context.Context, w *v1alpha1.FlyteWorkflow) (Status, error) {
execErr := executionErrorOrDefault(w.GetExecutionStatus().GetExecutionError(), w.GetExecutionStatus().GetMessage())
errorNode := w.GetOnFailureNode()
logger.Infof(ctx, "Handling FailureNode [%v]", errorNode)
execcontext := executors.NewExecutionContext(w, w, w, nil, executors.InitializeControlFlow())
state, err := c.nodeExecutor.RecursiveNodeHandler(ctx, execcontext, w, w, errorNode)

// TODO: GetNodeExecutionStatus doesn't work. How do we get the error node status from CRD
status := w.GetExecutionStatus().GetNodeExecutionStatus(ctx, errorNode.GetID())
failureNodeLookup := executors.NewFailureNodeLookup(errorNode.(*v1alpha1.NodeSpec), status)

Check failure on line 178 in flytepropeller/pkg/controller/workflow/executor.go

View workflow job for this annotation

GitHub Actions / compile

undefined: executors.NewFailureNodeLookup
state, err := c.nodeExecutor.RecursiveNodeHandler(ctx, execcontext, w, failureNodeLookup, errorNode)
logger.Infof(ctx, "FailureNode [%v] finished with state [%v]", errorNode, state)
logger.Infof(ctx, "FailureNode [%v] finished with error [%v]", errorNode, err)
if err != nil {
logger.Infof(ctx, "test")
return StatusFailureNode(execErr), err
}

if state.HasFailed() {
logger.Infof(ctx, "test1 [%v]", state.Err)
return StatusFailed(state.Err), nil
}

Expand All @@ -187,6 +196,8 @@ func (c *workflowExecutor) handleFailureNode(ctx context.Context, w *v1alpha1.Fl
Message: "FailureNode Timed-out"}), nil
}

logger.Infof(ctx, "test2")

if state.PartiallyComplete() {
// Re-enqueue the workflow
c.enqueueWorkflow(w.GetK8sWorkflowID().String())
Expand Down Expand Up @@ -220,6 +231,7 @@ func (c *workflowExecutor) handleFailingWorkflow(ctx context.Context, w *v1alpha
}

errorNode := w.GetOnFailureNode()
logger.Infof(ctx, "Handling xxx FailureNode [%v]", errorNode)
if errorNode != nil {
return StatusFailureNode(execErr), nil
}
Expand Down Expand Up @@ -282,13 +294,17 @@ func (c *workflowExecutor) TransitionToPhase(ctx context.Context, execID *core.W
wfEvent.Phase = core.WorkflowExecution_RUNNING
wStatus.UpdatePhase(v1alpha1.WorkflowPhaseRunning, "Workflow Started", nil)
wfEvent.OccurredAt = utils.GetProtoTime(wStatus.GetStartedAt())
case v1alpha1.WorkflowPhaseHandlingFailureNode:
fallthrough
case v1alpha1.WorkflowPhaseFailing:
wfEvent.Phase = core.WorkflowExecution_FAILING
wfEvent.OutputResult = convertToExecutionError(toStatus.Err, previousError)
wStatus.UpdatePhase(v1alpha1.WorkflowPhaseFailing, "", wfEvent.GetError())
wfEvent.OccurredAt = utils.GetProtoTime(nil)
case v1alpha1.WorkflowPhaseHandlingFailureNode:
// TODO: Add core.WorkflowPhaseHandlingFailureNode to proto
wfEvent.Phase = core.WorkflowExecution_FAILING
wfEvent.OutputResult = convertToExecutionError(toStatus.Err, previousError)
wStatus.UpdatePhase(v1alpha1.WorkflowPhaseHandlingFailureNode, "", wfEvent.GetError())
wfEvent.OccurredAt = utils.GetProtoTime(nil)
case v1alpha1.WorkflowPhaseFailed:
wfEvent.Phase = core.WorkflowExecution_FAILED
wfEvent.OutputResult = convertToExecutionError(toStatus.Err, previousError)
Expand Down Expand Up @@ -422,7 +438,7 @@ func (c *workflowExecutor) HandleFlyteWorkflow(ctx context.Context, w *v1alpha1.
case v1alpha1.WorkflowPhaseHandlingFailureNode:
newStatus, err := c.handleFailureNode(ctx, w)
if err != nil {
return err
return errors.Errorf("failed to handle failure node for workflow [%s], err: [%s]", w.ID, err.Error())
}
failureErr := c.TransitionToPhase(ctx, w.ExecutionID.WorkflowExecutionIdentifier, wStatus, newStatus)
// Ignore ExecutionNotFound and IncompatibleCluster errors to allow graceful failure
Expand Down

0 comments on commit 5319a29

Please sign in to comment.