Skip to content

Commit

Permalink
add comments and better solution for backward compativle
Browse files Browse the repository at this point in the history
Signed-off-by: Future-Outlier <eric901201@gmail.com>
  • Loading branch information
Future-Outlier committed Dec 17, 2024
1 parent 0bb8e91 commit 25fea29
Showing 1 changed file with 52 additions and 7 deletions.
59 changes: 52 additions & 7 deletions flytepropeller/pkg/controller/nodes/task/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"runtime/debug"
"strings"
"time"

regErrors "github.com/pkg/errors"
Expand Down Expand Up @@ -40,6 +41,7 @@ import (
)

const pluginContextKey = contextutils.Key("plugin")
const FLYTE_ENABLE_DECK = string("FLYTE_ENABLE_DECK")

type metrics struct {
pluginPanics labeled.Counter
Expand Down Expand Up @@ -83,8 +85,7 @@ func (p *pluginRequestedTransition) AddDeckURI(tCtx *taskExecutionContext) {
p.execInfo.OutputInfo.DeckURI = deckURI

Check warning on line 85 in flytepropeller/pkg/controller/nodes/task/handler.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/task/handler.go#L85

Added line #L85 was not covered by tests
}

// RemoveNonexistentDeckURI removes the deck URI from the plugin execution info if the URI does not exist in remote storage.
func (p *pluginRequestedTransition) RemoveNonexistentDeckURI(ctx context.Context, tCtx *taskExecutionContext) error {
func (p *pluginRequestedTransition) AddDeckURIIfDeckExists(ctx context.Context, tCtx *taskExecutionContext) error {
reader := tCtx.ow.GetReader()
if reader == nil && p.execInfo.OutputInfo != nil {
p.execInfo.OutputInfo.DeckURI = nil
Expand All @@ -97,8 +98,13 @@ func (p *pluginRequestedTransition) RemoveNonexistentDeckURI(ctx context.Context
return regErrors.Wrapf(err, "failed to check existence of deck file")
}

Check warning on line 99 in flytepropeller/pkg/controller/nodes/task/handler.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/task/handler.go#L97-L99

Added lines #L97 - L99 were not covered by tests

if !exists && p.execInfo.OutputInfo != nil {
p.execInfo.OutputInfo.DeckURI = nil
if p.execInfo.OutputInfo == nil {
p.execInfo.OutputInfo = &handler.OutputInfo{}
}

if exists {
deckURIValue := tCtx.ow.GetDeckPath()
p.execInfo.OutputInfo.DeckURI = &deckURIValue
}

Check warning on line 108 in flytepropeller/pkg/controller/nodes/task/handler.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/task/handler.go#L106-L108

Added lines #L106 - L108 were not covered by tests

return nil
Expand Down Expand Up @@ -417,6 +423,21 @@ func (t Handler) fetchPluginTaskMetrics(pluginID, taskType string) (*taskMetrics
return t.taskMetricsMap[metricNameKey], nil
}

func IsDeckEnabled(ctx context.Context, tCtx *taskExecutionContext) (bool, error) {
template, err := tCtx.tr.Read(ctx)
if err != nil {
return false, regErrors.Wrapf(err, "failed to read task template")
}

Check warning on line 430 in flytepropeller/pkg/controller/nodes/task/handler.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/task/handler.go#L429-L430

Added lines #L429 - L430 were not covered by tests

templateConfig := template.GetConfig()
if templateConfig == nil {
return false, nil
}

deckEnabled := strings.ToLower(templateConfig[FLYTE_ENABLE_DECK])
return deckEnabled == "1" || deckEnabled == "t" || deckEnabled == "true", nil

Check warning on line 438 in flytepropeller/pkg/controller/nodes/task/handler.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/task/handler.go#L437-L438

Added lines #L437 - L438 were not covered by tests
}

func (t Handler) invokePlugin(ctx context.Context, p pluginCore.Plugin, tCtx *taskExecutionContext, ts handler.TaskNodeState) (*pluginRequestedTransition, error) {
pluginTrns := &pluginRequestedTransition{}

Expand Down Expand Up @@ -505,12 +526,34 @@ func (t Handler) invokePlugin(ctx context.Context, p pluginCore.Plugin, tCtx *ta
// The deck should be accessible even if the task is still running or has failed.
// It's possible that the deck URI may not exist in remote storage yet or will never exist.
// So, it is console's responsibility to handle the case when the deck URI actually does not exist.
pluginTrns.AddDeckURI(tCtx)
deckEnabled, err := IsDeckEnabled(ctx, tCtx)
if err != nil {
return nil, err
}

Check warning on line 532 in flytepropeller/pkg/controller/nodes/task/handler.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/task/handler.go#L531-L532

Added lines #L531 - L532 were not covered by tests
if deckEnabled {
pluginTrns.AddDeckURI(tCtx)
}

Check warning on line 535 in flytepropeller/pkg/controller/nodes/task/handler.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/task/handler.go#L534-L535

Added lines #L534 - L535 were not covered by tests

// Handle backward compatibility for Flyte deck display behavior.
//
// Before (legacy behavior):
// - Deck URI was only shown if the deck file existed in the terminal state.
// - We relied on a HEAD request to check if the deck file exists, then added the URI to the event.
//
// After (new behavior):
// - If `FLYTE_ENABLE_DECK = true` is set in the task template config (requires Flytekit > 1.14.0),
// we display the deck URI from the beginning rather than waiting until the terminal state.
//
// For backward compatibility with older Flytekit versions (which don't support `FLYTE_ENABLE_DECK`),
// we still need to check deck file existence in the terminal state. This ensures that when the deck
// isn't enabled via config or doesn't exist, we only show the URI in terminal states if the deck file
// is actually present.
switch pluginTrns.pInfo.Phase() {
case pluginCore.PhaseSuccess:
// This is to prevent the console from potentially checking the deck URI that does not exist if in final phase(PhaseSuccess).
err = pluginTrns.RemoveNonexistentDeckURI(ctx, tCtx)
if !deckEnabled {
err = pluginTrns.AddDeckURIIfDeckExists(ctx, tCtx)
}
if err != nil {
return pluginTrns, err
}

Check warning on line 559 in flytepropeller/pkg/controller/nodes/task/handler.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/task/handler.go#L558-L559

Added lines #L558 - L559 were not covered by tests
Expand Down Expand Up @@ -559,7 +602,9 @@ func (t Handler) invokePlugin(ctx context.Context, p pluginCore.Plugin, tCtx *ta
case pluginCore.PhasePermanentFailure:
// This is to prevent the console from potentially checking the deck URI that does not exist if in final
// phase(PhaseFailure).
err = pluginTrns.RemoveNonexistentDeckURI(ctx, tCtx)
if !deckEnabled {
err = pluginTrns.AddDeckURIIfDeckExists(ctx, tCtx)
}
if err != nil {
return pluginTrns, err
}

Check warning on line 610 in flytepropeller/pkg/controller/nodes/task/handler.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/task/handler.go#L603-L610

Added lines #L603 - L610 were not covered by tests
Expand Down

0 comments on commit 25fea29

Please sign in to comment.