Skip to content

Commit

Permalink
Add variable_map to GetDataResponse
Browse files Browse the repository at this point in the history
Signed-off-by: mao3267 <chenvincent610@gmail.com>
  • Loading branch information
mao3267 committed Jan 4, 2025
1 parent fd9a378 commit 547c155
Show file tree
Hide file tree
Showing 24 changed files with 1,047 additions and 635 deletions.
25 changes: 20 additions & 5 deletions flyteadmin/pkg/manager/impl/node_execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -538,24 +538,39 @@ func (m *NodeExecutionManager) GetNodeExecutionData(

var outputs *core.LiteralMap
var outputURLBlob *admin.UrlBlob
var outputVariableMap *core.VariableMap
group.Go(func() error {
var err error
outputs, outputURLBlob, err = util.GetOutputs(groupCtx, m.urlData, m.config.ApplicationConfiguration().GetRemoteDataConfig(),
m.storageClient, nodeExecution.GetClosure())
return err
})

// TODO: Get the output variable map from the node execution model
// group.Go(func() error {
// var err error

// modelNode, err := m.db.NodeExecutionRepo().Get(groupCtx, repoInterfaces.NodeExecutionResource{
// NodeExecutionIdentifier: request.GetId(),
// })

// node, err := transformers.FromNodeExecutionModel(modelNode, transformers.DefaultExecutionTransformerOptions)

// return err
// })

err = group.Wait()
if err != nil {
return nil, err
}

response := &admin.NodeExecutionGetDataResponse{
Inputs: inputURLBlob,
Outputs: outputURLBlob,
FullInputs: inputs,
FullOutputs: outputs,
FlyteUrls: common.FlyteURLsFromNodeExecutionID(request.GetId(), nodeExecution.GetClosure() != nil && nodeExecution.GetClosure().GetDeckUri() != ""),
Inputs: inputURLBlob,
Outputs: outputURLBlob,
FullInputs: inputs,
FullOutputs: outputs,
FlyteUrls: common.FlyteURLsFromNodeExecutionID(request.GetId(), nodeExecution.GetClosure() != nil && nodeExecution.GetClosure().GetDeckUri() != ""),
OutputVariableMap: outputVariableMap,
}

if len(nodeExecutionModel.DynamicWorkflowRemoteClosureReference) > 0 {
Expand Down
27 changes: 22 additions & 5 deletions flyteadmin/pkg/manager/impl/task_execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,24 +328,41 @@ func (m *TaskExecutionManager) GetTaskExecutionData(

var outputs *core.LiteralMap
var outputURLBlob *admin.UrlBlob
var outputVariableMap *core.VariableMap
group.Go(func() error {
var err error
outputs, outputURLBlob, err = util.GetOutputs(groupCtx, m.urlData, m.config.ApplicationConfiguration().GetRemoteDataConfig(),
m.storageClient, taskExecution.GetClosure())
return err
})

group.Go(func() error {
var err error
taskModel, err := m.db.TaskRepo().Get(groupCtx, repoInterfaces.Identifier{
Project: taskExecution.GetId().GetTaskId().GetProject(),
Domain: taskExecution.GetId().GetTaskId().GetDomain(),
Name: taskExecution.GetId().GetTaskId().GetName(),
Version: taskExecution.GetId().GetTaskId().GetVersion(),
})

task, err := transformers.FromTaskModel(taskModel)
outputVariableMap = task.GetClosure().GetCompiledTask().GetTemplate().GetInterface().Outputs

return err
})

err = group.Wait()
if err != nil {
return nil, err
}

response := &admin.TaskExecutionGetDataResponse{
Inputs: inputURLBlob,
Outputs: outputURLBlob,
FullInputs: inputs,
FullOutputs: outputs,
FlyteUrls: common.FlyteURLsFromTaskExecutionID(request.GetId(), false),
Inputs: inputURLBlob,
Outputs: outputURLBlob,
FullInputs: inputs,
FullOutputs: outputs,
FlyteUrls: common.FlyteURLsFromTaskExecutionID(request.GetId(), false),
OutputVariableMap: outputVariableMap,
}

m.metrics.TaskExecutionInputBytes.Observe(float64(response.GetInputs().GetBytes()))
Expand Down
6 changes: 6 additions & 0 deletions flyteidl/clients/go/assets/admin.swagger.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions flyteidl/gen/pb-es/flyteidl/admin/node_execution_pb.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions flyteidl/gen/pb-es/flyteidl/admin/task_execution_pb.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions flyteidl/gen/pb-es/flyteidl/service/dataproxy_pb.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 547c155

Please sign in to comment.