diff --git a/flytepropeller/pkg/controller/nodes/array/handler.go b/flytepropeller/pkg/controller/nodes/array/handler.go index 51d3105a0a..802d9ddd38 100644 --- a/flytepropeller/pkg/controller/nodes/array/handler.go +++ b/flytepropeller/pkg/controller/nodes/array/handler.go @@ -81,6 +81,11 @@ func (a *arrayNodeHandler) Abort(ctx context.Context, nCtx interfaces.NodeExecut eventRecorder := newArrayEventRecorder(nCtx.EventsRecorder()) messageCollector := errorcollector.NewErrorMessageCollector() + + taskPhase := idlcore.TaskExecution_ABORTED + if arrayNodeState.Phase == v1alpha1.ArrayNodePhaseFailing { + taskPhase = idlcore.TaskExecution_FAILED + } switch arrayNodeState.Phase { case v1alpha1.ArrayNodePhaseExecuting, v1alpha1.ArrayNodePhaseFailing: for i, nodePhaseUint64 := range arrayNodeState.SubNodePhases.GetItems() { @@ -122,13 +127,12 @@ func (a *arrayNodeHandler) Abort(ctx context.Context, nCtx interfaces.NodeExecut } // update state for subNodes - if err := eventRecorder.finalize(ctx, nCtx, idlcore.TaskExecution_ABORTED, 0, a.eventConfig); err != nil { + if err := eventRecorder.finalize(ctx, nCtx, taskPhase, 0, a.eventConfig); err != nil { // a task event with abort phase is already emitted when handling ArrayNodePhaseFailing - if eventsErr.IsAlreadyExists(err) { - return nil + if !eventsErr.IsAlreadyExists(err) { + logger.Errorf(ctx, "ArrayNode event recording failed: [%s]", err.Error()) + return err } - logger.Errorf(ctx, "ArrayNode event recording failed: [%s]", err.Error()) - return err } return nil @@ -466,6 +470,14 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu return handler.UnknownTransition, err } + // ensure task_execution set to failed - this should already be sent by the abort handler + if err := eventRecorder.finalize(ctx, nCtx, idlcore.TaskExecution_FAILED, 0, a.eventConfig); err != nil { + if !eventsErr.IsAlreadyExists(err) { + logger.Errorf(ctx, "ArrayNode event recording failed: [%s]", err.Error()) + return handler.UnknownTransition, err + } + } + // fail with reported error if one exists if arrayNodeState.Error != nil { return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoFailureErr(arrayNodeState.Error, nil)), nil @@ -609,6 +621,14 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu return handler.UnknownTransition, err } + // ensure task_execution set to succeeded + if err := eventRecorder.finalize(ctx, nCtx, idlcore.TaskExecution_SUCCEEDED, 0, a.eventConfig); err != nil { + if !eventsErr.IsAlreadyExists(err) { + logger.Errorf(ctx, "ArrayNode event recording failed: [%s]", err.Error()) + return handler.UnknownTransition, err + } + } + return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoSuccess( &handler.ExecutionInfo{ OutputInfo: &handler.OutputInfo{