From 69b08f7c2d65bd4980ad4ba415a13ee0ba72bc3d Mon Sep 17 00:00:00 2001 From: Dennis Keck <26092524+fellhorn@users.noreply.github.com> Date: Mon, 17 Jun 2024 14:27:45 +0200 Subject: [PATCH] Fix: Make 'flytectl compile' consider launchplans used within workflows (#5463) * Fix: Make 'flytectl compile' consider launchplans used within workflows Signed-off-by: Dennis Keck <26092524+fellhorn@users.noreply.github.com> * Add raw file for test Signed-off-by: Dennis Keck <26092524+fellhorn@users.noreply.github.com> * Add documentation on how to create a package Signed-off-by: Dennis Keck <26092524+fellhorn@users.noreply.github.com> --------- Signed-off-by: Dennis Keck <26092524+fellhorn@users.noreply.github.com> --- flytectl/cmd/compile/compile.go | 74 +++++++++++++++--- flytectl/cmd/compile/compile_test.go | 5 ++ .../cmd/compile/testdata/launchplan-in-wf.py | 16 ++++ .../cmd/compile/testdata/launchplan-in-wf.tgz | Bin 0 -> 875 bytes 4 files changed, 85 insertions(+), 10 deletions(-) create mode 100644 flytectl/cmd/compile/testdata/launchplan-in-wf.py create mode 100644 flytectl/cmd/compile/testdata/launchplan-in-wf.tgz diff --git a/flytectl/cmd/compile/compile.go b/flytectl/cmd/compile/compile.go index 972c553e97..60235c1b91 100644 --- a/flytectl/cmd/compile/compile.go +++ b/flytectl/cmd/compile/compile.go @@ -80,21 +80,16 @@ func compileFromPackage(packagePath string) error { return err } - // compile workflows - for wfName, workflow := range workflows { + var providers []common.InterfaceProvider + var compiledWorkflows = map[string]*core.CompiledWorkflowClosure{} - fmt.Println("\nCompiling workflow:", wfName) - plan := plans[wfName] + // compile workflows + for _, workflow := range workflows { + providers, err = handleWorkflow(workflow, compiledTasks, compiledWorkflows, providers, plans, workflows) - _, err := compiler.CompileWorkflow(workflow.Template, - workflow.SubWorkflows, - compiledTasks, - []common.InterfaceProvider{compiler.NewLaunchPlanInterfaceProvider(*plan)}) if err != nil { - fmt.Println(":( Error Compiling workflow:", wfName) return err } - } fmt.Println("All Workflows compiled successfully!") @@ -105,6 +100,65 @@ func compileFromPackage(packagePath string) error { return nil } +func handleWorkflow( + workflow *admin.WorkflowSpec, + compiledTasks []*core.CompiledTask, + compiledWorkflows map[string]*core.CompiledWorkflowClosure, + compiledLaunchPlanProviders []common.InterfaceProvider, + plans map[string]*admin.LaunchPlan, + workflows map[string]*admin.WorkflowSpec) ([]common.InterfaceProvider, error) { + reqs, _ := compiler.GetRequirements(workflow.Template, workflow.SubWorkflows) + wfName := workflow.Template.Id.Name + + // Check if all the subworkflows referenced by launchplan are compiled + for i := range reqs.GetRequiredLaunchPlanIds() { + lpID := &reqs.GetRequiredLaunchPlanIds()[i] + lpWfName := plans[lpID.Name].Spec.WorkflowId.Name + missingWorkflow := workflows[lpWfName] + if compiledWorkflows[lpWfName] == nil { + // Recursively compile the missing workflow first + err := error(nil) + compiledLaunchPlanProviders, err = handleWorkflow(missingWorkflow, compiledTasks, compiledWorkflows, compiledLaunchPlanProviders, plans, workflows) + if err != nil { + return nil, err + } + } + } + + fmt.Println("\nCompiling workflow:", wfName) + + wf, err := compiler.CompileWorkflow(workflow.Template, + workflow.SubWorkflows, + compiledTasks, + compiledLaunchPlanProviders) + + if err != nil { + fmt.Println(":( Error Compiling workflow:", wfName) + return nil, err + } + compiledWorkflows[wfName] = wf + + // Update the expected inputs and outputs for the launchplans which reference this workflow + for _, plan := range plans { + if plan.Spec.WorkflowId.Name == wfName { + plan.Closure.ExpectedOutputs = wf.Primary.Template.Interface.Outputs + newMap := make(map[string]*core.Parameter) + + for key, value := range wf.Primary.Template.Interface.Inputs.Variables { + newMap[key] = &core.Parameter{ + Var: value, + } + } + plan.Closure.ExpectedInputs = &core.ParameterMap{ + Parameters: newMap, + } + compiledLaunchPlanProviders = append(compiledLaunchPlanProviders, compiler.NewLaunchPlanInterfaceProvider(*plan)) + } + } + + return compiledLaunchPlanProviders, nil +} + const ( compileShort = `Validate flyte packages without registration needed.` compileLong = ` diff --git a/flytectl/cmd/compile/compile_test.go b/flytectl/cmd/compile/compile_test.go index 4004ca897b..2d91260aff 100644 --- a/flytectl/cmd/compile/compile_test.go +++ b/flytectl/cmd/compile/compile_test.go @@ -43,6 +43,8 @@ func TestCompileCommand(t *testing.T) { assert.NotNil(t, err, "calling compile with Empty file flag does not error") } +// New packages can be created by using the following command +// pyflyte --pkgs package -f func TestCompilePackage(t *testing.T) { // valid package contains two workflows // with three tasks @@ -69,4 +71,7 @@ func TestCompilePackage(t *testing.T) { err = compileFromPackage("testdata/invalidworkflow.tgz") assert.NotNil(t, err, "unable to handle invalid workflow") + // testing workflows with launchplans used within workflow + err = compileFromPackage("testdata/launchplan-in-wf.tgz") + assert.Nil(t, err, "unable to compile workflow with launchplans used within workflow") } diff --git a/flytectl/cmd/compile/testdata/launchplan-in-wf.py b/flytectl/cmd/compile/testdata/launchplan-in-wf.py new file mode 100644 index 0000000000..8a54c69f28 --- /dev/null +++ b/flytectl/cmd/compile/testdata/launchplan-in-wf.py @@ -0,0 +1,16 @@ +# Tests that a LaunchPlan with inputs can be used in a workflow for flytectl compile +from flytekit import LaunchPlan, task, workflow + +@task +def my_task(num: int) -> int: + return num + 1 + + +@workflow +def inner_workflow(num: int) -> int: + return my_task(num=num) + + +@workflow +def outer_workflow() -> int: + return LaunchPlan.get_or_create(inner_workflow, "name_override", default_inputs={"num": 42})() diff --git a/flytectl/cmd/compile/testdata/launchplan-in-wf.tgz b/flytectl/cmd/compile/testdata/launchplan-in-wf.tgz new file mode 100644 index 0000000000000000000000000000000000000000..b297277ce0b807eb705c203a0fcd6039e6072e4d GIT binary patch literal 875 zcmV-x1C;z9iwFq$#%5*$|7L7?bY(4YVPk7yXJsyQXLxo6{H z=8ny4ICHO|NA&39nbZHwof;3&qoBmxbyO5hyJ-*mnW8d=4*Mttx}DGo#O2ZRM70iE zq1)o`n8n^IjF~#4n#xSm!U*{P(e0ieH0n*i*P*>S^}8Vt#$C2b+g~4M{hOvfJTl8r zlE(;DH4)QIjTyS8Sxl^di&+*rRMK4kHST$C=kB=s#-AwoX9geSv!%T`Ujqq|0!Gpn z-oRO3k@7g3&q$(GdXKZ7A{Fq4>v@UzGExv($2qUtl9ut32rpN5@OC~|TKAk5xBUwq z1a6I&x1#f5l8??K+7(2WNVb@Rb(t0v{15&I|36zad*UbA)*i(8f9HRu8&m$*RRjE= zj{j%1EPc6UM=)u4u_M^WBsTmGUX3;cN!^!_iYn+wM(A9G!YTv*ga4n9|Ct5;eW`M<$t@_!xtpN{{3WfrmhYb@cd*!KzB&sGFJM49{=UTF>N(CIYFh-}2uR>+HJ zlDdzJ`An%WalPwk2|-B!{15(z{2xdOV{ZLlOU3_8wWj^ADh&LeivJHp{y(?)PZzmA zfFR=s|AYS_{|BBHihWxmX0wO94RD|TEo(aeHw?oB|EJ>r-?=&Y_*#1qp8K9Wd_Z<3 zNo4uUj502fcia$-&6xyZw^9E{ovbC)EYM$Zdhp#G>V35PW$_qvq8W*_36m9T9E09q_u_C);8$?m3G(L`+eZ1E)8Nt2v=F z3NDIZF?1S@oQQ$(oNZaIp7Uy>?Yo{hw-cU|JrR7fHK&O8LKG%+&u49sK`~^8cyg|H&fv z2X=8G-T|y9{tZyfKve($0000000000000000000000000007{z`2$XxUKs#T006ft Bt-$~Q literal 0 HcmV?d00001