Skip to content

Commit

Permalink
Key-value execution tags (#5453)
Browse files Browse the repository at this point in the history
Signed-off-by: Kevin Su <pingsutw@apache.org>
  • Loading branch information
pingsutw authored Jun 8, 2024
1 parent 25c3596 commit 38883c7
Show file tree
Hide file tree
Showing 19 changed files with 379 additions and 186 deletions.
1 change: 1 addition & 0 deletions flyteadmin/pkg/common/entity.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ const (
Signal = "s"
AdminTag = "at"
ExecutionAdminTag = "eat"
ExecutionTag = "et"
)

// ResourceTypeToEntity maps a resource type to an entity suitable for use with Database filters
Expand Down
12 changes: 11 additions & 1 deletion flyteadmin/pkg/common/filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,10 @@ func customizeField(field string, entity Entity) string {
if entity == Execution && executionIdentifierFields[field] {
return fmt.Sprintf("execution_%s", field)
}
// admin_tag table has been migrated to an execution_tag table, so we need to customize the field name.
if entity == AdminTag && field == "name" {
return "key"
}
return field
}

Expand All @@ -265,6 +269,10 @@ func customizeEntity(field string, entity Entity) Entity {
if entity == NamedEntity && entityMetadataFields[field] {
return NamedEntityMetadata
}
// admin_tag table has been migrated to an execution_tag table.
if entity == AdminTag {
return ExecutionTag
}
return entity
}

Expand All @@ -289,8 +297,10 @@ func NewRepeatedValueFilter(entity Entity, function FilterExpression, field stri
return nil, GetInvalidRepeatedValueFilterErr(function)
}
customizedField := customizeField(field, entity)
customizedEntity := customizeEntity(field, entity)

return &inlineFilterImpl{
entity: entity,
entity: customizedEntity,
function: function,
field: customizedField,
repeatedValue: repeatedValue,
Expand Down
66 changes: 38 additions & 28 deletions flyteadmin/pkg/manager/impl/execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -819,29 +819,29 @@ func (m *ExecutionManager) fillInTemplateArgs(ctx context.Context, query core.Ar

func (m *ExecutionManager) launchExecutionAndPrepareModel(
ctx context.Context, request admin.ExecutionCreateRequest, requestedAt time.Time) (
context.Context, *models.Execution, error) {
context.Context, *models.Execution, []*models.ExecutionTag, error) {

err := validation.ValidateExecutionRequest(ctx, request, m.db, m.config.ApplicationConfiguration())
if err != nil {
logger.Debugf(ctx, "Failed to validate ExecutionCreateRequest %+v with err %v", request, err)
return nil, nil, err
return nil, nil, nil, err
}
if request.Spec.LaunchPlan.ResourceType == core.ResourceType_TASK {
logger.Debugf(ctx, "Launching single task execution with [%+v]", request.Spec.LaunchPlan)
// When tasks can have defaults this will need to handle Artifacts as well.
ctx, model, err := m.launchSingleTaskExecution(ctx, request, requestedAt)
return ctx, model, err
return ctx, model, nil, err
}

launchPlanModel, err := util.GetLaunchPlanModel(ctx, m.db, *request.Spec.LaunchPlan)
if err != nil {
logger.Debugf(ctx, "Failed to get launch plan model for ExecutionCreateRequest %+v with err %v", request, err)
return nil, nil, err
return nil, nil, nil, err
}
launchPlan, err := transformers.FromLaunchPlanModel(launchPlanModel)
if err != nil {
logger.Debugf(ctx, "Failed to transform launch plan model %+v with err %v", launchPlanModel, err)
return nil, nil, err
return nil, nil, nil, err
}

var lpExpectedInputs *core.ParameterMap
Expand All @@ -860,23 +860,23 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel(
logger.Debugf(ctx, "Failed to CheckAndFetchInputsForExecution with request.Inputs: %+v"+
"fixed inputs: %+v and expected inputs: %+v with err %v",
request.Inputs, launchPlan.Spec.FixedInputs, lpExpectedInputs, err)
return nil, nil, err
return nil, nil, nil, err
}

workflowModel, err := util.GetWorkflowModel(ctx, m.db, *launchPlan.Spec.WorkflowId)
if err != nil {
logger.Debugf(ctx, "Failed to get workflow with id %+v with err %v", launchPlan.Spec.WorkflowId, err)
return nil, nil, err
return nil, nil, nil, err
}
workflow, err := transformers.FromWorkflowModel(workflowModel)
if err != nil {
logger.Debugf(ctx, "Failed to get workflow with id %+v with err %v", launchPlan.Spec.WorkflowId, err)
return nil, nil, err
return nil, nil, nil, err
}
closure, err := util.FetchAndGetWorkflowClosure(ctx, m.storageClient, workflowModel.RemoteClosureIdentifier)
if err != nil {
logger.Debugf(ctx, "Failed to get workflow with id %+v with err %v", launchPlan.Spec.WorkflowId, err)
return nil, nil, err
return nil, nil, nil, err
}
closure.CreatedAt = workflow.Closure.CreatedAt
workflow.Closure = closure
Expand All @@ -900,7 +900,7 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel(
var sourceExecutionID uint
parentNodeExecutionID, sourceExecutionID, err = m.getInheritedExecMetadata(ctx, requestSpec, &workflowExecutionID)
if err != nil {
return nil, nil, err
return nil, nil, nil, err
}

// Dynamically assign task resource defaults.
Expand All @@ -914,32 +914,32 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel(

inputsURI, err := common.OffloadLiteralMap(ctx, m.storageClient, executionInputs, workflowExecutionID.Project, workflowExecutionID.Domain, workflowExecutionID.Name, shared.Inputs)
if err != nil {
return nil, nil, err
return nil, nil, nil, err
}
userInputsURI, err := common.OffloadLiteralMap(ctx, m.storageClient, request.Inputs, workflowExecutionID.Project, workflowExecutionID.Domain, workflowExecutionID.Name, shared.UserInputs)
if err != nil {
return nil, nil, err
return nil, nil, nil, err
}

executionConfig, err := m.getExecutionConfig(ctx, &request, launchPlan)
if err != nil {
return nil, nil, err
return nil, nil, nil, err
}

namespace := common.GetNamespaceName(
m.config.NamespaceMappingConfiguration().GetNamespaceTemplate(), workflowExecutionID.Project, workflowExecutionID.Domain)

labels, err := resolveStringMap(executionConfig.GetLabels(), launchPlan.Spec.Labels, "labels", m.config.RegistrationValidationConfiguration().GetMaxLabelEntries())
if err != nil {
return nil, nil, err
return nil, nil, nil, err
}
labels, err = m.addProjectLabels(ctx, request.Project, labels)
if err != nil {
return nil, nil, err
return nil, nil, nil, err
}
annotations, err := resolveStringMap(executionConfig.GetAnnotations(), launchPlan.Spec.Annotations, "annotations", m.config.RegistrationValidationConfiguration().GetMaxAnnotationEntries())
if err != nil {
return nil, nil, err
return nil, nil, nil, err
}
var rawOutputDataConfig *admin.RawOutputDataConfig
if executionConfig.RawOutputDataConfig != nil {
Expand All @@ -948,7 +948,7 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel(

clusterAssignment, err := m.getClusterAssignment(ctx, &request)
if err != nil {
return nil, nil, err
return nil, nil, nil, err
}

var executionClusterLabel *admin.ExecutionClusterLabel
Expand All @@ -972,7 +972,7 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel(

overrides, err := m.addPluginOverrides(ctx, &workflowExecutionID, launchPlan.GetSpec().WorkflowId.Name, launchPlan.Id.Name)
if err != nil {
return nil, nil, err
return nil, nil, nil, err
}
if overrides != nil {
executionParameters.TaskPluginOverrides = overrides
Expand Down Expand Up @@ -1041,21 +1041,28 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel(
if err != nil {
logger.Infof(ctx, "Failed to create execution model in transformer for id: [%+v] with err: %v",
workflowExecutionID, err)
return nil, nil, err
return nil, nil, nil, err
}

return ctx, executionModel, nil
executionTagModel, err := transformers.CreateExecutionTagModel(createExecModelInput)
if err != nil {
logger.Infof(ctx, "Failed to create execution tag model in transformer for id: [%+v] with err: %v",
workflowExecutionID, err)
return nil, nil, nil, err
}

return ctx, executionModel, executionTagModel, nil
}

// Inserts an execution model into the database store and emits platform metrics.
func (m *ExecutionManager) createExecutionModel(
ctx context.Context, executionModel *models.Execution) (*core.WorkflowExecutionIdentifier, error) {
ctx context.Context, executionModel *models.Execution, executionTagModel []*models.ExecutionTag) (*core.WorkflowExecutionIdentifier, error) {
workflowExecutionIdentifier := core.WorkflowExecutionIdentifier{
Project: executionModel.ExecutionKey.Project,
Domain: executionModel.ExecutionKey.Domain,
Name: executionModel.ExecutionKey.Name,
}
err := m.db.ExecutionRepo().Create(ctx, *executionModel)
err := m.db.ExecutionRepo().Create(ctx, *executionModel, executionTagModel)
if err != nil {
logger.Debugf(ctx, "failed to save newly created execution [%+v] with id %+v to db with err %v",
workflowExecutionIdentifier, workflowExecutionIdentifier, err)
Expand All @@ -1077,12 +1084,13 @@ func (m *ExecutionManager) CreateExecution(
request.Inputs = request.GetSpec().GetInputs()
}
var executionModel *models.Execution
var executionTagModel []*models.ExecutionTag
var err error
ctx, executionModel, err = m.launchExecutionAndPrepareModel(ctx, request, requestedAt)
ctx, executionModel, executionTagModel, err = m.launchExecutionAndPrepareModel(ctx, request, requestedAt)
if err != nil {
return nil, err
}
workflowExecutionIdentifier, err := m.createExecutionModel(ctx, executionModel)
workflowExecutionIdentifier, err := m.createExecutionModel(ctx, executionModel, executionTagModel)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1127,7 +1135,8 @@ func (m *ExecutionManager) RelaunchExecution(
executionSpec.Metadata.ReferenceExecution = existingExecution.Id
executionSpec.OverwriteCache = request.GetOverwriteCache()
var executionModel *models.Execution
ctx, executionModel, err = m.launchExecutionAndPrepareModel(ctx, admin.ExecutionCreateRequest{
var executionTagModel []*models.ExecutionTag
ctx, executionModel, executionTagModel, err = m.launchExecutionAndPrepareModel(ctx, admin.ExecutionCreateRequest{
Project: request.Id.Project,
Domain: request.Id.Domain,
Name: request.Name,
Expand All @@ -1138,7 +1147,7 @@ func (m *ExecutionManager) RelaunchExecution(
return nil, err
}
executionModel.SourceExecutionID = existingExecutionModel.ID
workflowExecutionIdentifier, err := m.createExecutionModel(ctx, executionModel)
workflowExecutionIdentifier, err := m.createExecutionModel(ctx, executionModel, executionTagModel)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1178,7 +1187,8 @@ func (m *ExecutionManager) RecoverExecution(
executionSpec.Metadata.Mode = admin.ExecutionMetadata_RECOVERED
executionSpec.Metadata.ReferenceExecution = existingExecution.Id
var executionModel *models.Execution
ctx, executionModel, err = m.launchExecutionAndPrepareModel(ctx, admin.ExecutionCreateRequest{
var executionTagModel []*models.ExecutionTag
ctx, executionModel, executionTagModel, err = m.launchExecutionAndPrepareModel(ctx, admin.ExecutionCreateRequest{
Project: request.Id.Project,
Domain: request.Id.Domain,
Name: request.Name,
Expand All @@ -1189,7 +1199,7 @@ func (m *ExecutionManager) RecoverExecution(
return nil, err
}
executionModel.SourceExecutionID = existingExecutionModel.ID
workflowExecutionIdentifier, err := m.createExecutionModel(ctx, executionModel)
workflowExecutionIdentifier, err := m.createExecutionModel(ctx, executionModel, executionTagModel)
if err != nil {
return nil, err
}
Expand Down
5 changes: 4 additions & 1 deletion flyteadmin/pkg/manager/impl/util/filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ var filterFieldEntityPrefix = map[string]common.Entity{
"signal": common.Signal,
"admin_tag": common.AdminTag,
"execution_admin_tag": common.ExecutionAdminTag,
"execution_tag": common.ExecutionTag,
}

func parseField(field string, primaryEntity common.Entity) (common.Entity, string) {
Expand Down Expand Up @@ -121,7 +122,7 @@ func prepareValues(field string, values []string) (interface{}, error) {
}

var allowedJoinEntities = map[common.Entity]sets.String{
common.Execution: sets.NewString(common.Execution, common.LaunchPlan, common.Workflow, common.Task, common.AdminTag),
common.Execution: sets.NewString(common.Execution, common.LaunchPlan, common.Workflow, common.Task, common.AdminTag, common.ExecutionTag),
common.LaunchPlan: sets.NewString(common.LaunchPlan, common.Workflow),
common.NodeExecution: sets.NewString(common.NodeExecution, common.Execution),
common.NodeExecutionEvent: sets.NewString(common.NodeExecutionEvent),
Expand All @@ -133,6 +134,7 @@ var allowedJoinEntities = map[common.Entity]sets.String{
common.Project: sets.NewString(common.Project),
common.Signal: sets.NewString(common.Signal),
common.AdminTag: sets.NewString(common.AdminTag),
common.ExecutionTag: sets.NewString(common.ExecutionTag),
}

var entityColumns = map[common.Entity]sets.String{
Expand All @@ -148,6 +150,7 @@ var entityColumns = map[common.Entity]sets.String{
common.Project: models.ProjectColumns,
common.Signal: models.SignalColumns,
common.AdminTag: models.AdminTagColumns,
common.ExecutionTag: models.ExecutionTagColumns,
}

func ParseFilters(filterParams string, primaryEntity common.Entity) ([]common.InlineFilter, error) {
Expand Down
55 changes: 55 additions & 0 deletions flyteadmin/pkg/repositories/config/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -1222,6 +1222,61 @@ var ContinuedMigrations = []*gormigrate.Migration{
return tx.Table("launch_plans").Migrator().DropColumn(&models.LaunchPlan{}, "launch_condition_type")
},
},

{
ID: "2024-06-06-execution-tags",
Migrate: func(tx *gorm.DB) error {
type ExecutionKey struct {
Project string `gorm:"primary_key;column:execution_project;size:255"`
Domain string `gorm:"primary_key;column:execution_domain;size:255"`
Name string `gorm:"primary_key;column:execution_name;size:255"`
}

type ExecutionTag struct {
ID uint `gorm:"index;autoIncrement;not null"`
CreatedAt time.Time `gorm:"type:time"`
UpdatedAt time.Time `gorm:"type:time"`
DeletedAt *time.Time `gorm:"index"`
ExecutionKey
Key string `gorm:"primary_key;index:tag_key;size:255"`
Value string `gorm:"primary_key;index:tag_value;size:255"`
}

return tx.Transaction(func(_ *gorm.DB) error {
// Create an execution_tags Table
if err := tx.AutoMigrate(&ExecutionTag{}); err != nil {
return err
}
// Drop execution_admin_tags and admin_tags tables, and create a new table execution_tags
// to store tags associated with executions.
sql := "INSERT INTO execution_tags (execution_project, execution_domain, execution_name, created_at, updated_at, deleted_at, key, value)" +
" SELECT execution_project, execution_domain, execution_name, created_at, updated_at, deleted_at, name as key, null as value" +
" FROM execution_admin_tags" +
" INNER JOIN admin_tags a on execution_admin_tags.admin_tag_id = a.id;"
if err := tx.Exec(sql).Error; err != nil {
return err
}
return nil
})
},
Rollback: func(tx *gorm.DB) error {
return tx.Migrator().DropTable("execution_tags")
},
},

{
ID: "2024-06-06-drop-execution_admin-tags",
Migrate: func(tx *gorm.DB) error {
return tx.Migrator().DropTable("execution_admin_tags")
},
},

{
ID: "2024-06-06-drop-admin-tags",
Migrate: func(tx *gorm.DB) error {
return tx.Migrator().DropTable("admin_tags")
},
},
}

var m = append(LegacyMigrations, NoopMigrations...)
Expand Down
4 changes: 2 additions & 2 deletions flyteadmin/pkg/repositories/gormimpl/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@ const taskExecutionTableName = "task_executions"
const taskTableName = "tasks"
const workflowTableName = "workflows"
const descriptionEntityTableName = "description_entities"
const AdminTagsTableName = "admin_tags"
const executionAdminTagsTableName = "execution_admin_tags"
const executionTagsTableName = "execution_tags"

const limit = "limit"
const filters = "filters"
Expand All @@ -49,6 +48,7 @@ var entityToTableName = map[common.Entity]string{
common.Signal: "signals",
common.AdminTag: "admin_tags",
common.ExecutionAdminTag: "execution_admin_tags",
common.ExecutionTag: "execution_tags",
}

var innerJoinExecToNodeExec = fmt.Sprintf(
Expand Down
Loading

0 comments on commit 38883c7

Please sign in to comment.