diff --git a/content/en/docs/components/katib/user-guides/katib-config.md b/content/en/docs/components/katib/user-guides/katib-config.md index cbe9fd723f..c33996ea21 100644 --- a/content/en/docs/components/katib/user-guides/katib-config.md +++ b/content/en/docs/components/katib/user-guides/katib-config.md @@ -8,7 +8,7 @@ This guide describes [the Katib Config](https://github.com/kubeflow/katib/blob/19268062f1b187dde48114628e527a2a35b01d64/manifests/v1beta1/installs/katib-standalone/katib-config.yaml) — the main configuration file for every Katib component. We use Kubernetes [ConfigMap](https://kubernetes.io/docs/tasks/configure-pod-container/configure-pod-configmap/) to -fetch that config into [the Katib control plane components](/docs/components/katib/installation/#installing-control-plane). +fetch that config into [the Katib control plane components](/docs/components/katib/reference/architecture/#katib-control-plane-components). The ConfigMap must be deployed in the [`KATIB_CORE_NAMESPACE`](/docs/components/katib/user-guides/env-variables/#katib-controller) diff --git a/content/en/docs/components/pipelines/legacy-v1/tutorials/api-pipelines.md b/content/en/docs/components/pipelines/legacy-v1/tutorials/api-pipelines.md index 0758dc3bd9..5122e33b8e 100644 --- a/content/en/docs/components/pipelines/legacy-v1/tutorials/api-pipelines.md +++ b/content/en/docs/components/pipelines/legacy-v1/tutorials/api-pipelines.md @@ -39,7 +39,7 @@ wget -O ${PIPELINE_FILE} ${PIPELINE_URL} dsl-compile --py ${PIPELINE_FILE} --output ${PIPELINE_NAME}.tar.gz ``` -After running the commands above, you should get two files in your current directory: `sequential.py` and `sequential.tar.gz`. Run the following command to deploy the generated `.tar.gz` file as you would do using the [Kubeflow Pipelines UI](/docs/components/pipelines/user-guides/core-functions/run-a-pipeline/#1-run-from-the-kfp-dashboard), but this time using the REST API. +After running the commands above, you should get two files in your current directory: `sequential.py` and `sequential.tar.gz`. Run the following command to deploy the generated `.tar.gz` file as you would do using the [Kubeflow Pipelines UI](/docs/components/pipelines/user-guides/core-functions/run-a-pipeline/#run-pipeline---kfp-dashboard), but this time using the REST API. ``` SVC=localhost:8888 diff --git a/content/en/docs/components/pipelines/operator-guides/multi-user.md b/content/en/docs/components/pipelines/operator-guides/multi-user.md index a1774416d5..3f1d44fbcd 100644 --- a/content/en/docs/components/pipelines/operator-guides/multi-user.md +++ b/content/en/docs/components/pipelines/operator-guides/multi-user.md @@ -42,10 +42,10 @@ Pipeline definitions are not isolated right now, and are shared across all names How to connect Pipelines SDK to Kubeflow Pipelines will depend on __what kind__ of Kubeflow deployment you have, and __from where you are running your code__. -* [Full Kubeflow (from inside cluster)](/docs/components/pipelines/user-guides/core-functions/connect-api/#full-kubeflow-subfrom-inside-clustersub) -* [Full Kubeflow (from outside cluster)](/docs/components/pipelines/user-guides/core-functions/connect-api/#full-kubeflow-subfrom-outside-clustersub) -* [Standalone Kubeflow Pipelines (from inside cluster)](/docs/components/pipelines/user-guides/core-functions/connect-api/#standalone-kubeflow-pipelines-subfrom-inside-clustersub) -* [Standalone Kubeflow Pipelines (from outside cluster)](/docs/components/pipelines/user-guides/core-functions/connect-api/#standalone-kubeflow-pipelines-subfrom-outside-clustersub) +* [Full Kubeflow (from inside cluster)](/docs/components/pipelines/user-guides/core-functions/connect-api/#kubeflow-platform---inside-the-cluster) +* [Full Kubeflow (from outside cluster)](/docs/components/pipelines/user-guides/core-functions/connect-api/#kubeflow-platform---outside-the-cluster) +* [Standalone Kubeflow Pipelines (from inside cluster)](/docs/components/pipelines/user-guides/core-functions/connect-api/#standalone-kfp---inside-the-cluster) +* [Standalone Kubeflow Pipelines (from outside cluster)](/docs/components/pipelines/user-guides/core-functions/connect-api/#standalone-kfp---outside-the-cluster) The following Python code will create an experiment (and associated run) from a Pod inside a full Kubeflow cluster. diff --git a/content/en/docs/components/pipelines/user-guides/components/_index.md b/content/en/docs/components/pipelines/user-guides/components/_index.md index 13e805a51e..aab6bcf234 100644 --- a/content/en/docs/components/pipelines/user-guides/components/_index.md +++ b/content/en/docs/components/pipelines/user-guides/components/_index.md @@ -1,5 +1,6 @@ +++ title = "Create components" +description = "Create pipelines with reusable components." weight = 3 +++ diff --git a/content/en/docs/components/pipelines/user-guides/core-functions/_index.md b/content/en/docs/components/pipelines/user-guides/core-functions/_index.md index b5dc8ab738..8129818b4c 100644 --- a/content/en/docs/components/pipelines/user-guides/core-functions/_index.md +++ b/content/en/docs/components/pipelines/user-guides/core-functions/_index.md @@ -1,5 +1,5 @@ +++ title = "Core Functions" -description = "Documentation for users of Kubeflow Pipelines." +description = "Learn about the core functions of Kubeflow Pipelines." weight = 2 +++ diff --git a/content/en/docs/components/pipelines/user-guides/core-functions/build-advanced-pipeline.md b/content/en/docs/components/pipelines/user-guides/core-functions/build-advanced-pipeline.md index 5f3c43af18..e331905b85 100644 --- a/content/en/docs/components/pipelines/user-guides/core-functions/build-advanced-pipeline.md +++ b/content/en/docs/components/pipelines/user-guides/core-functions/build-advanced-pipeline.md @@ -1,6 +1,7 @@ +++ title = "Build a More Advanced ML Pipeline" -weight = 6 +description = "Create a more advanced pipeline that leverages additional KFP features." +weight = 199 +++ {{% kfp-v2-keywords %}} diff --git a/content/en/docs/components/pipelines/user-guides/core-functions/caching.md b/content/en/docs/components/pipelines/user-guides/core-functions/caching.md index 297f569fcf..b954bdc050 100644 --- a/content/en/docs/components/pipelines/user-guides/core-functions/caching.md +++ b/content/en/docs/components/pipelines/user-guides/core-functions/caching.md @@ -1,7 +1,7 @@ +++ title = "Use Caching" -description = "How to use caching in Kubeflow Pipelines." -weight = 5 +description = "Learn about caching in Kubeflow Pipelines." +weight = 104 +++ Kubeflow Pipelines support caching to eliminate redundant executions and improve @@ -26,7 +26,7 @@ be marked with a green "arrow from cloud" icon. ## How to use caching Caching is enabled by default for all components in KFP. You can disable caching -for a component by calling `.set_caching_options(False)` on a task object. +for a component by calling [`.set_caching_options(enable_caching=False)`](https://kubeflow-pipelines.readthedocs.io/en/latest/source/dsl.html#kfp.dsl.PipelineTask.set_caching_options) on a task object. ```python from kfp import dsl diff --git a/content/en/docs/components/pipelines/user-guides/core-functions/cli.md b/content/en/docs/components/pipelines/user-guides/core-functions/cli.md index b781ca3c7a..62eb9deae4 100644 --- a/content/en/docs/components/pipelines/user-guides/core-functions/cli.md +++ b/content/en/docs/components/pipelines/user-guides/core-functions/cli.md @@ -1,6 +1,7 @@ +++ -title = "Interact with KFP via the CLI" -weight = 4 +title = "Use the KFP CLI" +description = "Learn how to interact with Kubeflow Pipelines using the KFP CLI." +weight = 203 +++ {{% kfp-v2-keywords %}} diff --git a/content/en/docs/components/pipelines/user-guides/core-functions/compile-a-pipeline.md b/content/en/docs/components/pipelines/user-guides/core-functions/compile-a-pipeline.md index ba9774ea2c..485366d815 100644 --- a/content/en/docs/components/pipelines/user-guides/core-functions/compile-a-pipeline.md +++ b/content/en/docs/components/pipelines/user-guides/core-functions/compile-a-pipeline.md @@ -1,16 +1,20 @@ +++ title = "Compile a Pipeline" -description = "Compile pipelines and components to YAML" -weight = 2 +description = "Define and compile a basic pipeline using the KFP SDK." +weight = 101 +++ {{% kfp-v2-keywords %}} -To submit a pipeline for execution, you must compile it to YAML with the KFP SDK compiler: +## Overview + +To [submit a pipeline for execution](/docs/components/pipelines/user-guides/core-functions/run-a-pipeline/), you must compile it to YAML with the KFP SDK compiler. + +In the following example, the compiler creates a file called `pipeline.yaml`, which contains a hermetic representation of your pipeline. +The output is called an [Intermediate Representation (IR) YAML](#ir-yaml), which is a serialized [`PipelineSpec`][pipeline-spec] protocol buffer message. ```python -from kfp import dsl -from kfp import compiler +from kfp import compiler, dsl @dsl.component def comp(message: str) -> str: @@ -25,9 +29,19 @@ def my_pipeline(message: str) -> str: compiler.Compiler().compile(my_pipeline, package_path='pipeline.yaml') ``` -In this example, the compiler creates a file called `pipeline.yaml`, which contains a hermetic representation of your pipeline. The output is called intermediate representation (IR) YAML. You can view an example of IR YAML on [GitHub][compiled-output-example]. The contents of the file is the serialized [`PipelineSpec`][pipeline-spec] protocol buffer message and is not intended to be human-readable. +Because components are actually pipelines, you may also compile them to IR YAML: + +```python +@dsl.component +def comp(message: str) -> str: + print(message) + return message + +compiler.Compiler().compile(comp, package_path='component.yaml') +``` -You can find human-readable information about the pipeline in the comments at the top of the compiled YAML: +You can view an [example of IR YAML][compiled-output-example] on GitHub. +The contents of the file are not intended to be human-readable, however the comments at the top of the file provide a summary of the pipeline: ```yaml # PIPELINE DEFINITION @@ -40,16 +54,21 @@ You can find human-readable information about the pipeline in the comments at th ... ``` -You can also compile components, as opposed to pipelines, to IR YAML: +## Type checking -```python -@dsl.component -def comp(message: str) -> str: - print(message) - return message +By default, the DSL compiler statically type checks your pipeline to ensure type consistency between components that pass data between one another. +Static type checking helps identify component I/O inconsistencies without having to run the pipeline, shortening development iterations. -compiler.Compiler().compile(comp, package_path='component.yaml') -``` +Specifically, the type checker checks for type equality between the type of data a component input expects and the type of the data provided. +See [Data Types][data-types] for more information about KFP data types. + +For example, for parameters, a list input may only be passed to parameters with a `typing.List` annotation. +Similarly, a float may only be passed to parameters with a `float` annotation. + +Input data types and annotations must also match for artifacts, with one exception: the `Artifact` type is compatible with all other artifact types. +In this sense, the `Artifact` type is both the default artifact type and an artifact "any" type. + +As described in the following section, you can disable type checking. ## Compiler arguments @@ -63,25 +82,14 @@ The [`Compiler.compile`][compiler-compile] method accepts the following argument | `pipeline_parameters` | `Dict[str, Any]` | _Optional_
Map of parameter names to argument values. This lets you provide default values for pipeline or component parameters. You can override these default values during pipeline submission. | `type_check` | `bool` | _Optional_
Indicates whether static type checking is enabled during compilation.
-## Type checking - -By default, the DSL compiler statically type checks your pipeline to ensure type consistency between components that pass data between one another. Static type checking helps identify component I/O inconsistencies without having to run the pipeline, shortening development iterations. - -Specifically, the type checker checks for type equality between the type of data a component input expects and the type of the data provided. See [Data Types][data-types] for more information about KFP data types. - -For example, for parameters, a list input may only be passed to parameters with a `typing.List` annotation. Similarly, a float may only be passed to parameters with a `float` annotation. - -Input data types and annotations must also match for artifacts, with one exception: the `Artifact` type is compatible with all other artifact types. In this sense, the `Artifact` type is both the default artifact type and an artifact "any" type. - -As described in the following section, you can disable type checking. - ## IR YAML -The IR YAML is an intermediate representation of a compiled pipeline or component. It is an instance of the [`PipelineSpec`][pipeline-spec] protocol buffer message type, which is a platform-agnostic pipeline representation protocol. It is considered an intermediate representation because the KFP backend compiles `PipelineSpec` to [Argo Workflow][argo-workflow] YAML as the final pipeline definition for execution. +The IR YAML is an intermediate representation of a compiled pipeline or component. +It is an instance of the [`PipelineSpec`][pipeline-spec] protocol buffer message type, which is a platform-agnostic pipeline representation protocol. +It is considered an intermediate representation because the KFP backend compiles `PipelineSpec` to [Argo Workflow][argo-workflow] YAML as the final pipeline definition for execution. Unlike the v1 component YAML, the IR YAML is not intended to be written directly. - -While IR YAML is not intended to be easily human readable, you can still inspect it if you know a bit about its contents: +While IR YAML is not intended to be easily human-readable, you can still inspect it if you know a bit about its contents: | Section | Description | Example | |-------|-------------|---------| diff --git a/content/en/docs/components/pipelines/user-guides/core-functions/connect-api.md b/content/en/docs/components/pipelines/user-guides/core-functions/connect-api.md index 4a8553f4df..c3ee6f312e 100644 --- a/content/en/docs/components/pipelines/user-guides/core-functions/connect-api.md +++ b/content/en/docs/components/pipelines/user-guides/core-functions/connect-api.md @@ -1,50 +1,49 @@ +++ -title = "Connect the Pipelines SDK to Kubeflow Pipelines" -description = "How to connect the Pipelines SDK to Kubeflow Pipelines in various ways" -weight = 7 +title = "Connect the SDK to the API" +description = "Learn how to connect the Kubeflow Pipelines SDK to the API." +weight = 201 +++ -How to connect Pipelines SDK to Kubeflow Pipelines will depend on __what kind__ of Kubeflow deployment you have, and __from where you are running your code__. +## Overview -* [Full Kubeflow (from inside cluster)](#full-kubeflow-subfrom-inside-clustersub) -* [Full Kubeflow (from outside cluster)](#full-kubeflow-subfrom-outside-clustersub) -* [Standalone Kubeflow Pipelines (from inside cluster)](#standalone-kubeflow-pipelines-subfrom-inside-clustersub) -* [Standalone Kubeflow Pipelines (from outside cluster)](#standalone-kubeflow-pipelines-subfrom-outside-clustersub) +The [Kubeflow Pipelines SDK](https://kubeflow-pipelines.readthedocs.io/en/stable/) provides a Python interface to interact with the Kubeflow Pipelines API. +This guide will show you how to connect the SDK to the Pipelines API in various scenarios. -{{% alert title="Tip" color="info" %}} -Before you begin, you will need to: -* [Deploy Kubeflow Pipelines](/docs/components/pipelines/legacy-v1/overview/) -* [Install the Kubeflow Pipelines SDK](/docs/components/pipelines/legacy-v1/sdk/install-sdk/) -{{% /alert %}} +## Kubeflow Platform -## Full Kubeflow (from inside cluster) +When running Kubeflow Pipelines as part of a multi-user [Kubeflow Platform](/docs/started/introduction/#what-is-kubeflow-platform), how you authenticate the Pipelines SDK will depend on whether you are running your code __inside__ or __outside__ the cluster. + +### **Kubeflow Platform - Inside the Cluster**
Click to expand
-When running the Pipelines SDK inside a multi-user Kubeflow cluster, a [ServiceAccount token volume](https://kubernetes.io/docs/tasks/configure-pod-container/configure-service-account/#service-account-token-volume-projection) -can be mounted to the Pod, the Kubeflow Pipelines SDK can use this token to authenticate itself with the Kubeflow Pipelines API. +A [ServiceAccount token volume](https://kubernetes.io/docs/tasks/configure-pod-container/configure-service-account/#service-account-token-volume-projection) can be mounted to a Pod running in the same cluster as Kubeflow Pipelines. +The Kubeflow Pipelines SDK can use this token to authenticate itself with the Kubeflow Pipelines API. -The following code creates a `kfp.Client()` using a ServiceAccount token for authentication. +The following Python code will create a `kfp.Client()` using a ServiceAccount token for authentication: ```python import kfp -# the namespace in which you deployed Kubeflow Pipelines -namespace = "kubeflow" +# by default, when run from inside a Kubernetes cluster: +# - the token is read from the `KF_PIPELINES_SA_TOKEN_PATH` path +# - the host is set to `http://ml-pipeline-ui.kubeflow.svc.cluster.local` +kfp_client = kfp.Client() -# the KF_PIPELINES_SA_TOKEN_PATH environment variable is used when no `path` is set -# the default KF_PIPELINES_SA_TOKEN_PATH is /var/run/secrets/kubeflow/pipelines/token -credentials = kfp.auth.ServiceAccountTokenVolumeCredentials(path=None) +# test the client by listing experiments +experiments = kfp_client.list_experiments(namespace="my-profile") +print(experiments) +``` -client = kfp.Client(host=f"http://ml-pipeline-ui.{namespace}", credentials=credentials) +#### ServiceAccount Token Volume -print(client.list_experiments()) -``` +To use the preceding code, you will need to run it from a Pod that has a ServiceAccount token volume mounted. +You may manually add a `volume` and `volumeMount` to your PodSpec or use Kubeflow's [`PodDefaults`](https://github.com/kubeflow/kubeflow/tree/master/components/admission-webhook) to inject the required volume. -The following Pod demonstrates mounting a ServiceAccount token volume. +__Option 1 - manually add a volume to your PodSpec:__ ```yaml apiVersion: v1 @@ -75,7 +74,7 @@ spec: audience: pipelines.kubeflow.org ``` -You may use Kubeflow's [`PodDefaults`](https://github.com/kubeflow/kubeflow/tree/master/components/admission-webhook) to inject the required ServiceAccount token volume into your Pods: +__Option 2 - use a `PodDefault` to inject the volume:__ ```yaml apiVersion: kubeflow.org/v1alpha1 @@ -113,7 +112,7 @@ spec: * The Notebook Spawner UI will be aware of any `PodDefaults` in the user's namespace (they are selectable under the "configurations" section). {{% /alert %}} -### RBAC Authorization +#### RBAC Authorization The Kubeflow Pipelines API respects Kubernetes RBAC, and will check RoleBindings assigned to the ServiceAccount before allowing it to take Pipelines API actions. @@ -142,177 +141,251 @@ subjects: for a list of some important `pipelines.kubeflow.org` RBAC verbs. * Kubeflow Notebooks pods run as the `default-editor` ServiceAccount by default, so the RoleBindings for `default-editor` apply to them and give them access to submit pipelines in their own namespace. +* For more information about profiles, see the [Manage Profile Contributors](/docs/components/central-dash/profiles/#manage-profile-contributors) guide. {{% /alert %}}
-## Full Kubeflow (from outside cluster) +### **Kubeflow Platform - Outside the Cluster**
Click to expand
-The process to authenticate the Pipelines SDK from outside the cluster in multi-user mode will vary by distribution: - -* [Kubeflow on Google Cloud](/docs/distributions/gke/pipelines/authentication-sdk/#connecting-to-kubeflow-pipelines-in-a-full-kubeflow-deployment) -* [Kubeflow on AWS](/docs/distributions/aws/pipeline/#authenticate-kubeflow-pipeline-using-sdk-outside-cluster) -* [Kubeflow on Azure](https://awslabs.github.io/kubeflow-manifests/docs/component-guides/pipelines/) -* [Kubeflow on IBM Cloud](/docs/distributions/ibm/pipelines/#2-authenticating-multi-user-kubeflow-pipelines-with-the-sdk) +{{% alert title="Kubeflow Notebooks" color="warning" %}} +As Kubeflow Notebooks run on Pods _inside the cluster_, they can NOT use the following method to authenticate the Pipelines SDK, see the [inside the cluster](#kubeflow-platform---inside-the-cluster) method. +{{% /alert %}} -### Example for Dex +The precise method to authenticate from _outside the cluster_ will depend on how you [deployed Kubeflow Platform](/docs/started/installing-kubeflow/#kubeflow-platform). +Because most distributions use [Dex](https://dexidp.io/) as their identity provider, this example will show you how to authenticate with Dex using a Python script. -For the deployments that use [Dex](https://dexidp.io/) as their identity provider, this example demonstrates how to authenticate the Pipelines SDK from outside the cluster. +You will need to make the Kubeflow Pipelines API accessible on the remote machine. +If your Kubeflow Istio gateway is already exposed, skip this step and use that URL directly. -__Step 1:__ expose your `istio-ingressgateway` service locally (if your Kubeflow Istio gateway is not already exposed on a domain) +The following command will expose the `istio-ingressgateway` service on `localhost:8080`: ```bash -# `svc/istio-ingressgateway` may be called something else, or use different ports +# TIP: svc/istio-ingressgateway may be called something else, +# or use different ports in your distribution kubectl port-forward --namespace istio-system svc/istio-ingressgateway 8080:80 ``` -__Step 2:__ this Python code defines a `get_istio_auth_session()` function that returns a session cookie by authenticating with dex +The following Python code defines a `KFPClientManager()` class that creates an authenticated `kfp.Client()` by interacting with Dex: ```python import re +from urllib.parse import urlsplit, urlencode + +import kfp import requests -from urllib.parse import urlsplit +import urllib3 + -def get_istio_auth_session(url: str, username: str, password: str) -> dict: +class KFPClientManager: """ - Determine if the specified URL is secured by Dex and try to obtain a session cookie. - WARNING: only Dex `staticPasswords` and `LDAP` authentication are currently supported - (we default default to using `staticPasswords` if both are enabled) - - :param url: Kubeflow server URL, including protocol - :param username: Dex `staticPasswords` or `LDAP` username - :param password: Dex `staticPasswords` or `LDAP` password - :return: auth session information + A class that creates `kfp.Client` instances with Dex authentication. """ - # define the default return object - auth_session = { - "endpoint_url": url, # KF endpoint URL - "redirect_url": None, # KF redirect URL, if applicable - "dex_login_url": None, # Dex login URL (for POST of credentials) - "is_secured": None, # True if KF endpoint is secured - "session_cookie": None # Resulting session cookies in the form "key1=value1; key2=value2" - } - - # use a persistent session (for cookies) - with requests.Session() as s: - - ################ - # Determine if Endpoint is Secured - ################ - resp = s.get(url, allow_redirects=True) - if resp.status_code != 200: - raise RuntimeError( - f"HTTP status code '{resp.status_code}' for GET against: {url}" + + def __init__( + self, + api_url: str, + dex_username: str, + dex_password: str, + dex_auth_type: str = "local", + skip_tls_verify: bool = False, + ): + """ + Initialize the KfpClient + + :param api_url: the Kubeflow Pipelines API URL + :param skip_tls_verify: if True, skip TLS verification + :param dex_username: the Dex username + :param dex_password: the Dex password + :param dex_auth_type: the auth type to use if Dex has multiple enabled, one of: ['ldap', 'local'] + """ + self._api_url = api_url + self._skip_tls_verify = skip_tls_verify + self._dex_username = dex_username + self._dex_password = dex_password + self._dex_auth_type = dex_auth_type + self._client = None + + # disable SSL verification, if requested + if self._skip_tls_verify: + urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) + + # ensure `dex_default_auth_type` is valid + if self._dex_auth_type not in ["ldap", "local"]: + raise ValueError( + f"Invalid `dex_auth_type` '{self._dex_auth_type}', must be one of: ['ldap', 'local']" ) - auth_session["redirect_url"] = resp.url + def _get_session_cookies(self) -> str: + """ + Get the session cookies by authenticating against Dex + :return: a string of session cookies in the form "key1=value1; key2=value2" + """ - # if we were NOT redirected, then the endpoint is UNSECURED - if len(resp.history) == 0: - auth_session["is_secured"] = False - return auth_session + # use a persistent session (for cookies) + s = requests.Session() + + # GET the api_url, which should redirect to Dex + resp = s.get( + self._api_url, allow_redirects=True, verify=not self._skip_tls_verify + ) + if resp.status_code == 200: + pass + elif resp.status_code == 403: + # if we get 403, we might be at the oauth2-proxy sign-in page + # the default path to start the sign-in flow is `/oauth2/start?rd=` + url_obj = urlsplit(resp.url) + url_obj = url_obj._replace( + path="/oauth2/start", query=urlencode({"rd": url_obj.path}) + ) + resp = s.get( + url_obj.geturl(), allow_redirects=True, verify=not self._skip_tls_verify + ) else: - auth_session["is_secured"] = True - - ################ - # Get Dex Login URL - ################ - redirect_url_obj = urlsplit(auth_session["redirect_url"]) - - # if we are at `/auth?=xxxx` path, we need to select an auth type - if re.search(r"/auth$", redirect_url_obj.path): - - ####### - # TIP: choose the default auth type by including ONE of the following - ####### - - # OPTION 1: set "staticPasswords" as default auth type - redirect_url_obj = redirect_url_obj._replace( - path=re.sub(r"/auth$", "/auth/local", redirect_url_obj.path) + raise RuntimeError( + f"HTTP status code '{resp.status_code}' for GET against: {self._api_url}" ) - # OPTION 2: set "ldap" as default auth type - # redirect_url_obj = redirect_url_obj._replace( - # path=re.sub(r"/auth$", "/auth/ldap", redirect_url_obj.path) - # ) - - # if we are at `/auth/xxxx/login` path, then no further action is needed (we can use it for login POST) - if re.search(r"/auth/.*/login$", redirect_url_obj.path): - auth_session["dex_login_url"] = redirect_url_obj.geturl() - - # else, we need to be redirected to the actual login page + + # if we were NOT redirected, then the endpoint is unsecured + if len(resp.history) == 0: + # no cookies are needed + return "" + + # if we are at `../auth` path, we need to select an auth type + url_obj = urlsplit(resp.url) + if re.search(r"/auth$", url_obj.path): + url_obj = url_obj._replace( + path=re.sub(r"/auth$", f"/auth/{self._dex_auth_type}", url_obj.path) + ) + + # if we are at `../auth/xxxx/login` path, then we are at the login page + if re.search(r"/auth/.*/login$", url_obj.path): + dex_login_url = url_obj.geturl() else: - # this GET should redirect us to the `/auth/xxxx/login` path - resp = s.get(redirect_url_obj.geturl(), allow_redirects=True) + # otherwise, we need to follow a redirect to the login page + resp = s.get( + url_obj.geturl(), allow_redirects=True, verify=not self._skip_tls_verify + ) if resp.status_code != 200: raise RuntimeError( - f"HTTP status code '{resp.status_code}' for GET against: {redirect_url_obj.geturl()}" + f"HTTP status code '{resp.status_code}' for GET against: {url_obj.geturl()}" ) + dex_login_url = resp.url - # set the login url - auth_session["dex_login_url"] = resp.url - - ################ - # Attempt Dex Login - ################ + # attempt Dex login resp = s.post( - auth_session["dex_login_url"], - data={"login": username, "password": password}, - allow_redirects=True + dex_login_url, + data={"login": self._dex_username, "password": self._dex_password}, + allow_redirects=True, + verify=not self._skip_tls_verify, ) + if resp.status_code != 200: + raise RuntimeError( + f"HTTP status code '{resp.status_code}' for POST against: {dex_login_url}" + ) + + # if we were NOT redirected, then the login credentials were probably invalid if len(resp.history) == 0: raise RuntimeError( - f"Login credentials were probably invalid - " - f"No redirect after POST to: {auth_session['dex_login_url']}" + f"Login credentials are probably invalid - " + f"No redirect after POST to: {dex_login_url}" + ) + + # if we are at `../approval` path, we need to approve the login + url_obj = urlsplit(resp.url) + if re.search(r"/approval$", url_obj.path): + dex_approval_url = url_obj.geturl() + + # approve the login + resp = s.post( + dex_approval_url, + data={"approval": "approve"}, + allow_redirects=True, + verify=not self._skip_tls_verify, ) + if resp.status_code != 200: + raise RuntimeError( + f"HTTP status code '{resp.status_code}' for POST against: {url_obj.geturl()}" + ) + + return "; ".join([f"{c.name}={c.value}" for c in s.cookies]) + + def _create_kfp_client(self) -> kfp.Client: + try: + session_cookies = self._get_session_cookies() + except Exception as ex: + raise RuntimeError(f"Failed to get Dex session cookies") from ex - # store the session cookies in a "key1=value1; key2=value2" string - auth_session["session_cookie"] = "; ".join([f"{c.name}={c.value}" for c in s.cookies]) + # monkey patch the kfp.Client to support disabling SSL verification + # kfp only added support in v2: https://github.com/kubeflow/pipelines/pull/7174 + original_load_config = kfp.Client._load_config - return auth_session + def patched_load_config(client_self, *args, **kwargs): + config = original_load_config(client_self, *args, **kwargs) + config.verify_ssl = not self._skip_tls_verify + return config + + patched_kfp_client = kfp.Client + patched_kfp_client._load_config = patched_load_config + + return patched_kfp_client( + host=self._api_url, + cookies=session_cookies, + ) + + def create_kfp_client(self) -> kfp.Client: + """Get a newly authenticated Kubeflow Pipelines client.""" + return self._create_kfp_client() ``` -__Step 3:__ this Python code uses the above `get_istio_auth_session()` function to create a `kfp.Client()` +The following Python code shows how to use the `KFPClientManager()` class to create a `kfp.Client()`: ```python -import kfp +# initialize a KFPClientManager +kfp_client_manager = KFPClientManager( + api_url="http://localhost:8080/pipeline", + skip_tls_verify=True, -KUBEFLOW_ENDPOINT = "http://localhost:8080" -KUBEFLOW_USERNAME = "user@example.com" -KUBEFLOW_PASSWORD = "12341234" + dex_username="user@example.com", + dex_password="12341234", -auth_session = get_istio_auth_session( - url=KUBEFLOW_ENDPOINT, - username=KUBEFLOW_USERNAME, - password=KUBEFLOW_PASSWORD + # can be 'ldap' or 'local' depending on your Dex configuration + dex_auth_type="local", ) -client = kfp.Client(host=f"{KUBEFLOW_ENDPOINT}/pipeline", cookies=auth_session["session_cookie"]) -print(client.list_experiments()) +# get a newly authenticated KFP client +# TIP: long-lived sessions might need to get a new client when their session expires +kfp_client = kfp_client_manager.create_kfp_client() + +# test the client by listing experiments +experiments = kfp_client.list_experiments(namespace="my-profile") +print(experiments) ```
-## Standalone Kubeflow Pipelines (from inside cluster) +## Standalone Kubeflow Pipelines + +When running Kubeflow Pipelines in [standalone mode](/docs/components/pipelines/operator-guides/installation/), there will be no concept of multi-user authentication or RBAC. +The specific steps will depend on whether you are running your code __inside__ or __outside__ the cluster. + +### **Standalone KFP - Inside the Cluster**
Click to expand
-{{% alert title="Warning" color="warning" %}} -This information only applies to _Standalone Kubeflow Pipelines_. -{{% /alert %}} - When running inside the Kubernetes cluster, you may connect Pipelines SDK directly to the `ml-pipeline-ui` service via [cluster-internal service DNS resolution](https://kubernetes.io/docs/concepts/services-networking/service/#discovering-services). -{{% alert title="Tip" color="info" %}} +{{% alert title="Warning" color="warning" %}} In [standalone deployments](/docs/components/pipelines/operator-guides/installation/) of Kubeflow Pipelines, there is no authentication enforced on the `ml-pipeline-ui` service. {{% /alert %}} -For example, when running in the __same namespace__ as Kubeflow: +When running in the __same namespace__ as Kubeflow: ```python import kfp @@ -322,7 +395,7 @@ client = kfp.Client(host="http://ml-pipeline-ui:80") print(client.list_experiments()) ``` -For example, when running in a __different namespace__ to Kubeflow: +When running in a __different namespace__ to Kubeflow: ```python import kfp @@ -337,19 +410,15 @@ print(client.list_experiments())
-## Standalone Kubeflow Pipelines (from outside cluster) +### **Standalone KFP - Outside the Cluster**
Click to expand
-{{% alert title="Warning" color="warning" %}} -This information only applies to _Standalone Kubeflow Pipelines_. -{{% /alert %}} - When running outside the Kubernetes cluster, you may connect Pipelines SDK to the `ml-pipeline-ui` service by using [kubectl port-forwarding](https://kubernetes.io/docs/tasks/access-application-cluster/port-forward-access-application-cluster/). -{{% alert title="Tip" color="info" %}} +{{% alert title="Warning" color="warning" %}} In [standalone deployments](/docs/components/pipelines/operator-guides/installation/) of Kubeflow Pipelines, there is no authentication enforced on the `ml-pipeline-ui` service. {{% /alert %}} @@ -372,9 +441,4 @@ print(client.list_experiments())
- -## Next Steps - -* [Using the Kubeflow Pipelines SDK](/docs/components/pipelines/legacy-v1/tutorials/sdk-examples/) -* [Kubeflow Pipelines SDK Reference](https://kubeflow-pipelines.readthedocs.io/en/stable/) -* [Experiment with the Kubeflow Pipelines API](/docs/components/pipelines/legacy-v1/tutorials/api-pipelines/) +
\ No newline at end of file diff --git a/content/en/docs/components/pipelines/user-guides/core-functions/control-flow.md b/content/en/docs/components/pipelines/user-guides/core-functions/control-flow.md index fd9b435366..0647140096 100644 --- a/content/en/docs/components/pipelines/user-guides/core-functions/control-flow.md +++ b/content/en/docs/components/pipelines/user-guides/core-functions/control-flow.md @@ -1,25 +1,54 @@ +++ -title = "Create pipelines with control flow" -weight = 9 +title = "Control Flow" +description = "Use control flow such as conditionals, loops, and exit handling in Kubeflow Pipelines." +weight = 103 +++ {{% kfp-v2-keywords %}} -Although a KFP pipeline decorated with the `@dsl.pipeline` decorator looks like a normal Python function, it is actually an expression of pipeline topology and control flow semantics, constructed using the KFP domain-specific language (DSL). [Pipeline Basics][pipeline-basics] covered how data passing expresses [pipeline topology through task dependencies][data-passing]. This section describes how to use control flow in your pipelines using the KFP DSL. The DSL features three types of control flow, each implemented by a Python context manager: +## Overview -1. Conditions -2. Looping -3. Exit handling +Although a KFP pipeline decorated with the `@dsl.pipeline` decorator looks like a normal Python function, it is actually an expression of pipeline topology and control flow semantics, constructed using the KFP domain-specific language (DSL). -### Conditions (dsl.If, dsl.Elif, dsl.Else) +The [components guide][pipeline-basics] shows how pipeline topology is expressed by [data passing and task dependencies][data-passing]. +This section describes how to introduce control flow in your pipelines to create more complex workflows. -The [`dsl.If`][dsl-if] context manager enables conditional execution of tasks within its scope based on the output of an upstream task or pipeline input parameter. The context manager takes two arguments: a required `condition` and an optional `name`. The `condition` is a comparative expression where at least one of the two operands is an output from an upstream task or a pipeline input parameter. +The core types of control flow in KFP pipelines are: + +1. [__Conditions__](#conditions) +2. [__Loops__](#loops) +3. [__Exit handling__](#exit-handling) + +## Conditions + +Kubeflow Pipelines supports common conditional control flow constructs. +You can use these constructs to conditionally execute tasks based on the output of an upstream task or pipeline input parameter. + +{{% alert title="Deprecated" color="warning" %}} +The `dsl.Condition` is deprecated in favor of the functionally identical `dsl.If`, which is concise, Pythonic, and consistent with the `dsl.Elif` and `dsl.Else` objects. +{{% /alert %}} + +### **dsl.If** / **dsl.Elif** / **dsl.Else** + +The [`dsl.If`][dsl-if] context manager enables conditional execution of tasks within its scope based on the output of an upstream task or pipeline input parameter. + +The context manager takes two arguments: a required `condition` and an optional `name`. +The `condition` is a comparative expression where at least one of the two operands is an output from an upstream task or a pipeline input parameter. In the following pipeline, `conditional_task` only executes if `coin_flip_task` has the output `'heads'`. ```python from kfp import dsl +@dsl.component +def flip_coin() -> str: + import random + return random.choice(['heads', 'tails']) + +#@dsl.component +#def my_comp(): +# print('Conditional task executed!') + @dsl.pipeline def my_pipeline(): coin_flip_task = flip_coin() @@ -32,6 +61,15 @@ You may also use [`dsl.Elif`][dsl-elif] and [`dsl.Else`][dsl-else] context manag ```python from kfp import dsl +@dsl.component +def flip_three_sided_coin() -> str: + import random + return random.choice(['heads', 'tails', 'draw']) + +@dsl.component +def print_comp(text: str): + print(text) + @dsl.pipeline def my_pipeline(): coin_flip_task = flip_three_sided_coin() @@ -43,17 +81,33 @@ def my_pipeline(): print_comp(text='Draw!') ``` -{{% alert title="Deprecated" color="warning" %}} -dsl.Condition is deprecated in favor of the functionally identical dsl.If, which is concise, Pythonic, and consistent with the dsl.Elif and dsl.Else objects. -{{% /alert %}} +### **dsl.OneOf** -#### dsl.OneOf +[`dsl.OneOf`][dsl-oneof] can be used to gather outputs from mutually exclusive branches into a single task output which can be consumed by a downstream task or outputted from a pipeline. +Branches are mutually exclusive if exactly one will be executed. +To enforce this, the KFP SDK compiler requires `dsl.OneOf` consume from tasks within a logically associated group of conditional branches and that one of the branches is a `dsl.Else` branch. -[`dsl.OneOf`][dsl-oneof] can be used to gather outputs from mutually exclusive branches into a single task output which can be consumed by a downstream task or outputted from a pipeline. Branches are mutually exclusive if exactly one will be executed. To enforce this, the KFP SDK compiler requires `dsl.OneOf` consume from taksks within a logically associated group of conditional branches and that one of the branches is a `dsl.Else` branch. +{{% oss-be-unsupported feature_name="`dsl.OneOf`" %}} + +For example, the following pipeline uses `dsl.OneOf` to gather outputs from mutually exclusive branches: ```python from kfp import dsl +@dsl.component +def flip_three_sided_coin() -> str: + import random + return random.choice(['heads', 'tails', 'draw']) + +@dsl.component +def print_and_return(text: str) -> str: + print(text) + return text + +@dsl.component +def announce_result(result: str): + print(f'The result is: {result}') + @dsl.pipeline def my_pipeline() -> str: coin_flip_task = flip_three_sided_coin() @@ -69,20 +123,35 @@ def my_pipeline() -> str: return oneof ``` -You should provide task outputs to the `dsl.OneOf` using `.output` or `.outputs[]`, just as you would pass an output to a downstream task. The outputs provided to `dsl.OneOf` must be of the same type and cannot be other instances of `dsl.OneOf` or [`dsl.Collected`][dsl-collected]. +You should provide task outputs to the `dsl.OneOf` using `.output` or `.outputs[]`, just as you would pass an output to a downstream task. +The outputs provided to `dsl.OneOf` must be of the same type and cannot be other instances of `dsl.OneOf` or [`dsl.Collected`][dsl-collected]. -{{% oss-be-unsupported feature_name="`dsl.OneOf`" %}} +## Loops -### Parallel looping (dsl.ParallelFor) +Kubeflow Pipelines supports loops which cause fan-out and fan-in of tasks. -The [`dsl.ParallelFor`][dsl-parallelfor] context manager allows parallel execution of tasks over a static set of items. The context manager takes three arguments: a required `items`, an optional `parallelism`, and an optional `name`. `items` is the static set of items to loop over and `parallelism` is the maximum number of concurrent iterations permitted while executing the `dsl.ParallelFor` group. `parallelism=0` indicates unconstrained parallelism. +### **dsl.ParallelFor** +The [`dsl.ParallelFor`][dsl-parallelfor] context manager allows parallel execution of tasks over a static set of items. + +The context manager takes three arguments: + +- `items`: static set of items to loop over +- `name` (optional): is the name of the loop context +- `parallelism` (optional): is the maximum number of concurrent iterations while executing the `dsl.ParallelFor` group + - note, `parallelism=0` indicates unconstrained parallelism + +{{% oss-be-unsupported feature_name="Setting `parallelism`" gh_issue_link=https://github.com/kubeflow/pipelines/issues/8718 %}} In the following pipeline, `train_model` will train a model for 1, 5, 10, and 25 epochs, with no more than two training tasks running at one time: ```python from kfp import dsl +#@dsl.component +#def train_model(epochs: int) -> Model: +# ... + @dsl.pipeline def my_pipeline(): with dsl.ParallelFor( @@ -91,45 +160,74 @@ def my_pipeline(): ) as epochs: train_model(epochs=epochs) ``` -{{% oss-be-unsupported feature_name="Setting `parallelism`" gh_issue_link=https://github.com/kubeflow/pipelines/issues/8718 %}} -#### dsl.Collected +### **dsl.Collected** + +Use [`dsl.Collected`][dsl-collected] with `dsl.ParallelFor` to gather outputs from a parallel loop of tasks. + +{{% oss-be-unsupported feature_name="`dsl.Collected`" gh_issue_link=https://github.com/kubeflow/pipelines/issues/6161 %}} -Use [`dsl.Collected`][dsl-collected] with `dsl.ParallelFor` to gather outputs from a parallel loop of tasks: +#### **Example:** Using `dsl.Collected` as an input to a downstream task + +Downstream tasks might consume `dsl.Collected` outputs via an input annotated with a `List` of parameters or a `List` of artifacts. + +For example, in the following pipeline, `max_accuracy` has the input `models` with type `Input[List[Model]]`, and will find the model with the highest accuracy from the models trained in the parallel loop: ```python from kfp import dsl +from kfp.dsl import Model, Input + +#def score_model(model: Model) -> float: +# return ... + +#@dsl.component +#def train_model(epochs: int) -> Model: +# ... + +@dsl.component +def max_accuracy(models: Input[List[Model]]) -> float: + return max(score_model(model) for model in models) @dsl.pipeline def my_pipeline(): + + # Train a model for 1, 5, 10, and 25 epochs with dsl.ParallelFor( items=[1, 5, 10, 25], ) as epochs: train_model_task = train_model(epochs=epochs) - max_accuracy(models=dsl.Collected(train_model_task.outputs['model'])) + + # Find the model with the highest accuracy + max_accuracy( + models=dsl.Collected(train_model_task.outputs['model']) + ) ``` -Downstream tasks might consume `dsl.Collected` outputs via an input annotated with a `List` of parameters or a `List` of artifacts. For example, `select_best` in the preceding example has the input `models` with type `Input[List[Model]]`, as shown by the following component definition: +#### **Example:** Nested lists of parameters -```python -from kfp import dsl -from kfp.dsl import Model, Input +You can use `dsl.Collected` to collect outputs from nested loops in a *nested list* of parameters. -@dsl.component -def select_best(models: Input[List[Model]]) -> float: - return max(score_model(model) for model in models) -``` - -You can use `dsl.Collected` to collect outputs from nested loops in a *nested list* of parameters. For example, output parameters from two nested `dsl.ParallelFor` groups are collected in a multilevel nested list of parameters, where each nested list contains the output parameters from one of the `dsl.ParallelFor` groups. The number of nested levels is based on the number of nested `dsl.ParallelFor` contexts. +For example, output parameters from two nested `dsl.ParallelFor` groups are collected in a multilevel nested list of parameters, where each nested list contains the output parameters from one of the `dsl.ParallelFor` groups. +The number of nested levels is based on the number of nested `dsl.ParallelFor` contexts. By comparison, *artifacts* created in nested loops are collected in a *flat* list. -You can also return a `dsl.Collected` from a pipeline. Use a `List` of parameters or a `List` of artifacts in the return annotation, as shown in the following example: +#### **Example:** Returning `dsl.Collected` from a pipeline + +You can also return a `dsl.Collected` from a pipeline. + +Use a `List` of parameters or a `List` of artifacts in the return annotation, as shown in the following example: ```python +from typing import List + from kfp import dsl from kfp.dsl import Model +#@dsl.component +#def train_model(epochs: int) -> Model: +# ... + @dsl.pipeline def my_pipeline() -> List[Model]: with dsl.ParallelFor( @@ -139,16 +237,38 @@ def my_pipeline() -> List[Model]: return dsl.Collected(train_model_task.outputs['model']) ``` -{{% oss-be-unsupported feature_name="`dsl.Collected`" gh_issue_link=https://github.com/kubeflow/pipelines/issues/6161 %}} +## Exit handling + +Kubeflow Pipelines supports exit handlers for implementing cleanup and error handling tasks that run after the main pipeline tasks finish execution. + +### **dsl.ExitHandler** -### Exit handling (dsl.ExitHandler) +The [`dsl.ExitHandler`][dsl-exithandler] context manager allows pipeline authors to specify an exit task which will run after the tasks within the context manager's scope finish execution, even if one of those tasks fails. -The [`dsl.ExitHandler`][dsl-exithandler] context manager allows pipeline authors to specify an exit task which will run after the tasks within the context manager's scope finish execution, even if one of those tasks fails. This is analogous to using a `try:` block followed by a `finally:` block in normal Python, where the exit task is in the `finally:` block. The context manager takes two arguments: a required `exit_task` and an optional `name`. `exit_task` accepts an instantiated [`PipelineTask`][dsl-pipelinetask]. +This construct is analogous to using a `try:` block followed by a `finally:` block in normal Python, where the exit task is in the `finally:` block. +The context manager takes two arguments: a required `exit_task` and an optional `name`. `exit_task` accepts an instantiated [`PipelineTask`][dsl-pipelinetask]. -In the following pipeline, `clean_up_task` will execute after both `create_dataset` and `train_and_save_models` finish or either of them fail: +#### **Example:** Basic cleanup task + +The most common use case for `dsl.ExitHandler` is to run a cleanup task after the main pipeline tasks finish execution. + +In the following pipeline, `clean_up_task` will execute after both `create_dataset` and `train_and_save_models` finish (regardless of whether they succeed or fail): ```python from kfp import dsl +from kfp.dsl import Dataset + +#@dsl.component +#def clean_up_resources(): +# ... + +#@dsl.component +#def create_datasets(): +# ... + +#@dsl.component +#def train_and_save_models(dataset: Dataset): +# ... @dsl.pipeline def my_pipeline(): @@ -158,7 +278,13 @@ def my_pipeline(): train_task = train_and_save_models(dataset=dataset_task.output) ``` -The task you use as an exit task may use a special input that provides access to pipeline and task status metadata, including pipeline failure or success status. You can use this special input by annotating your exit task with the [`dsl.PipelineTaskFinalStatus`][dsl-pipelinetaskfinalstatus] annotation. The argument for this parameter will be provided by the backend automatically at runtime. You should not provide any input to this annotation when you instantiate your exit task. +### **Example:** Accessing pipeline and task status metadata + +The task you use as an exit task may use a special input that provides access to pipeline and task status metadata, including pipeline failure or success status. + +You can use this special input by annotating your exit task with the [`dsl.PipelineTaskFinalStatus`][dsl-pipelinetaskfinalstatus] annotation. +The argument for this parameter will be provided by the backend automatically at runtime. +You should not provide any input to this annotation when you instantiate your exit task. The following pipeline uses `dsl.PipelineTaskFinalStatus` to obtain information about the pipeline and task failure, even after `fail_op` fails: @@ -166,6 +292,9 @@ The following pipeline uses `dsl.PipelineTaskFinalStatus` to obtain information from kfp import dsl from kfp.dsl import PipelineTaskFinalStatus +@dsl.component +def print_op(message: str): + print(message) @dsl.component def exit_op(user_input: str, status: PipelineTaskFinalStatus): @@ -184,29 +313,43 @@ def fail_op(): @dsl.pipeline def my_pipeline(): - print_op() + print_op(message='Starting pipeline...') print_status_task = exit_op(user_input='Task execution status:') with dsl.ExitHandler(exit_task=print_status_task): fail_op() ``` -#### Ignore upstream failure +#### **Example:** Ignoring upstream task failures -The [`.ignore_upstream_failure()`][ignore-upstream-failure] task method on [`PipelineTask`][dsl-pipelinetask] enables another approach to author pipelines with exit handling behavior. Calling this method on a task causes the task to ignore failures of any specified upstream tasks (as established by data exchange or by use of [`.after()`][dsl-pipelinetask-after]). If the task has no upstream tasks, this method has no effect. +The [`.ignore_upstream_failure()`][ignore-upstream-failure] task method on [`PipelineTask`][dsl-pipelinetask] enables another approach to author pipelines with exit handling behavior. +Calling this method on a task causes the task to ignore failures of any specified upstream tasks (as established by data exchange or by use of [`.after()`][dsl-pipelinetask-after]). +If the task has no upstream tasks, this method has no effect. -In the following pipeline definition, `clean_up_task` is executed after `fail_op`, regardless of whether `fail_op` succeeds: +In the following pipeline definition, `clean_up_task` is executed after `fail_task`, regardless of whether `fail_op` succeeds: ```python from kfp import dsl +@dsl.component +def cleanup_op(message: str = 'Cleaning up...'): + print(message) + +@dsl.component +def fail_op(message: str): + print(message) + raise ValueError('Task failed!') + @dsl.pipeline() def my_pipeline(text: str = 'message'): - task = fail_op(message=text) - clean_up_task = print_op( - message=task.output).ignore_upstream_failure() + fail_task = fail_op(message=text) + clean_up_task = cleanup_op( + message=fail_task.output + ).ignore_upstream_failure() ``` -Note that the component used for the caller task (`print_op` in the example above) requires a default value for all inputs it consumes from an upstream task. The default value is applied if the upstream task fails to produce the outputs that are passed to the caller task. Specifying default values ensures that the caller task always succeeds, regardless of the status of the upstream task. +Note that the component used for the caller task (`cleanup_op` in the example above) requires a default value for all inputs it consumes from an upstream task. +The default value is applied if the upstream task fails to produce the outputs that are passed to the caller task. +Specifying default values ensures that the caller task always succeeds, regardless of the status of the upstream task. [data-passing]: /docs/components/pipelines/user-guides/components/compose-components-into-pipelines#data-passing-and-task-dependencies [pipeline-basics]: /docs/components/pipelines/user-guides/components/compose-components-into-pipelines diff --git a/content/en/docs/components/pipelines/user-guides/core-functions/execute-kfp-pipelines-locally.md b/content/en/docs/components/pipelines/user-guides/core-functions/execute-kfp-pipelines-locally.md index 08b9e16796..b1bb2f486d 100644 --- a/content/en/docs/components/pipelines/user-guides/core-functions/execute-kfp-pipelines-locally.md +++ b/content/en/docs/components/pipelines/user-guides/core-functions/execute-kfp-pipelines-locally.md @@ -1,16 +1,32 @@ +++ title = "Execute KFP pipelines locally" -weight = 3 +description = "Learn how to run Kubeflow Pipelines locally." +weight = 200 +++ {{% kfp-v2-keywords %}} ## Overview + KFP supports executing components and pipelines locally, enabling a tight development loop before running your code remotely. -Executing components and pipelines locally is easy. Simply initialize a local session using `local.init()`, then call the component or pipeline like a normal Python function. KFP will log information about the execution. Once execution completes, you can access the task outputs just as you would when composing a pipeline; the only difference is that the outputs are now materialized values, not references to future outputs. +Executing components and pipelines locally is easy. Simply initialize a local session using `local.init()`, then call the component or pipeline like a normal Python function. +KFP will log information about the execution. +Once execution completes, you can access the task outputs just as you would when composing a pipeline; the only difference is that the outputs are now materialized values, not references to future outputs. + +## Limitations + +Local execution is designed to help quickly *test* components and pipelines locally before testing in a remote environment. -In the following example, we use the `DockerRunner` type. Runner types are covered in more detail below. +Local execution comes with several limitations: + +- Local execution does not feature optimizations and additional features such as caching, retry, etc. While these feature are important for production pipelines, they are less critical for a local testing environment. You will find that task methods like `.set_retry`, `.set_caching_options`, etc. have no effect locally. +- Local execution makes simple assumptions about the resources available on your machine. Local execution does not support specifying resource requests/limits/affinities related to memory, cores, accelerators, etc. You will find that task methods like `.set_memory_limit`, `.set_memory_request`, `.set_accelerator_type` etc. have no effect locally. +- While local pipeline execution has full support for sequential and nested pipelines, it does not yet support `dsl.Condition`, `dsl.ParallelFor`, or `dsl.ExitHandler`. + +## Basic Example + +In the following example, we use the `DockerRunner` type, the [runner types](#runner-types) are covered in more detail below. ```python from kfp import local @@ -59,9 +75,11 @@ def add(a: int, b: int, out_artifact: Output[Artifact]): task = add(a=1, b=2) + # can read artifact contents with open(task.outputs['out_artifact'].path) as f: contents = f.read() + assert json.loads(contents) == 3 assert task.outputs['out_artifact'].metadata['operation'] == 'addition' ``` @@ -74,13 +92,17 @@ local.init(runner=..., pipeline_root='~/my/component/outputs') ``` -## Local runners +## Runner Types + +Kubeflow pipelines has two local runners that you can use to execute your components and pipelines locally: `DockerRunner` and `SubprocessRunner`. -You can choose from two local runner types which indicate how and where your component should be executed: `DockerRunner` and `SubprocessRunner`. +__We strongly recommended using `DockerRunner` whenever possible.__ -### Recommended: DockerRunner +### **Runner:** DockerRunner -When invoking components locally using the `DockerRunner`, the task will execute in a container. +The `DockerRunner` requires [Docker to be installed](https://docs.docker.com/engine/install/), but requires essentially no knowledge of Docker to use. + +For example, to use the `DockerRunner`: ```python from kfp import local @@ -89,19 +111,19 @@ local.init(runner=local.DockerRunner()) ``` Since the local `DockerRunner` executes each task in a separate container, the `DockerRunner`: + - Offers the strongest form of local runtime environment isolation - Is most faithful to the remote runtime environment - Allows execution of all component types: [Lightweight Python Component][lightweight-python-component], [Containerized Python Components][containerized-python-components], and [Container Components][container-components] -**It is recommended to use `DockerRunner` whenever possible.** +When you use the `DockerRunner`, KFP mounts your local pipeline root to the container to write outputs outside of the container. +This means that your component outputs will still be available for inspection even after the container exits. -When you use the `DockerRunner`, KFP mounts your local pipeline root to the container to write outputs outside of the container. This means that your component outputs will still be available for inspection even after the container exits. - -The `DockerRunner` requires [Docker to be installed](https://docs.docker.com/engine/install/), but requires essentially no knowledge of Docker to use. +### **Runner:** SubprocessRunner -### Alternative: SubprocessRunner +The `SubprocessRunner` is only recommended where Docker cannot be installed, such as in some notebook environments. -The `SubprocessRunner` is recommended when executing components in local environments where Docker cannot be installed, such as in some notebook environments. +For example, to use the `SubprocessRunner`: ```python from kfp import local @@ -114,23 +136,17 @@ Since `SubprocessRunner` runs your code in a subprocess, the `SubprocessRunner`: - Does not support custom images or easily support tasks with complex environment dependencies - Only allows execution of [Lightweight Python Component][lightweight-python-component] -By default, the `SubprocessRunner` will install your dependencies into a virtual environment. This is recommended, but can be disabled by setting `use_venv=False`: +{{% alert title="Tip" color="info" %}} +By default, the `SubprocessRunner` will install your dependencies into a virtual environment. + +This is recommended, but can be disabled by setting `use_venv=False`: ```python from kfp import local local.init(runner=local.SubprocessRunner(use_venv=False)) ``` - -## Limitations -Local execution is designed to help quickly *test* components and pipelines locally before testing in a remote environment. - -Local execution comes with several limitations: -- Local execution does not feature optimizations and additional features such as caching, retry, etc. While these feature are important for production pipelines, they are less critical for a local testing environment. You will find that task methods like `.set_retry`, `.set_caching_options`, etc. have no effect locally. -- Local execution makes simple assumptions about the resources available on your machine. Local execution does not support specifying resource requests/limits/affinities related to memory, cores, accelerators, etc. You will find that task methods like `.set_memory_limit`, `.set_memory_request`, `.set_accelerator_type` etc. have no effect locally. - -While local pipeline execution has full support for sequential and nested pipelines, it does not yet support `dsl.Condition`, `dsl.ParallelFor`, or `dsl.ExitHandler`. - +{{% /alert %}} [lightweight-python-component]: /docs/components/pipelines/user-guides/components/lightweight-python-components/ [containerized-python-components]: /docs/components/pipelines/user-guides/components/containerized-python-components diff --git a/content/en/docs/components/pipelines/user-guides/core-functions/platform-specific-features.md b/content/en/docs/components/pipelines/user-guides/core-functions/platform-specific-features.md index 4555d5106f..b59a5c9cdb 100644 --- a/content/en/docs/components/pipelines/user-guides/core-functions/platform-specific-features.md +++ b/content/en/docs/components/pipelines/user-guides/core-functions/platform-specific-features.md @@ -1,35 +1,44 @@ +++ -title = "Author Tasks with Platform-Specific Functionality" -weight = 10 +title = "Use Platform-Specific Features" +description = "Learn how to use platform-specific features in Kubeflow Pipelines." +weight = 105 +++ +## Overview -One of the benefits of KFP is cross-platform portability. The KFP SDK compiles pipeline definitions to [IR YAML][ir-yaml] which can be read and executed by different backends, including the [Kubeflow Pipelines open source backend][oss-be] and [Vertex AI Pipelines](https://cloud.google.com/vertex-ai/docs/pipelines/introduction). +One of the benefits of KFP is cross-platform portability. +The KFP SDK compiles pipeline definitions to [IR YAML][ir-yaml] which can be read and executed by different backends, including the Kubeflow Pipelines [open source backend][oss-be] and [Vertex AI Pipelines](https://cloud.google.com/vertex-ai/docs/pipelines/introduction). For cases where features are not portable across platforms, users may author pipelines with platform-specific functionality via KFP SDK platform-specific plugin libraries. +In general, platform-specific plugin libraries provide functions that act on tasks similarly to [task-level configuration methods][task-level-config-methods] provided by the KFP SDK directly. -In general, platform-specific plugin libraries provide functions that act on tasks similarly to [task-level configuration methods][task-level-config-methods] provided by the KFP SDK directly. Platform-specific plugin libraries may also provide pre-baked components. + +## kfp-kubernetes +Currently, the only KFP SDK platform-specific plugin library is [`kfp-kubernetes`][kfp-kubernetes-pypi], which is supported by the Kubeflow Pipelines [open source backend][oss-be] and enables direct access to some Kubernetes resources and functionality. - +For more information, see the [`kfp-kubernetes` documentation ][kfp-kubernetes-docs]. + +### **Kubernetes PersistentVolumeClaims** -## Example: Read/write to a Kubernetes PVC using kfp-kubernetes -Currently the only KFP SDK platform-specific plugin library is [`kfp-kubernetes`][kfp-kubernetes-pypi], which is supported by the [Kubeflow Pipelines open source backend][oss-be] and enables direct access to some Kubernetes resources and functionality. +In this example we will use `kfp-kubernetes` to create a [PersistentVolumeClaim (PVC)][persistent-volume], use the PVC to pass data between tasks, and then delete the PVC. -The following uses `kfp-kubernetes` to demonstrate typical usage of a plugin library. Specifically, we will use `kfp-kubernetes` to create a [PersistentVolumeClaim (PVC)][persistent-volume], use the PVC to pass data between tasks, and delete the PVC after using it. See the [`kfp-kubernetes` documentation for more information][kfp-kubernetes-docs]. +We will assume you have basic familiarity with `PersistentVolume` and `PersistentVolumeClaim` resources in Kubernetes, in addition to [authoring components][authoring-components], and [authoring pipelines][authoring-pipelines] in KFP. -The following assumes basic familiarity with [PersistentVolume and PersistentVolumeClaim][persistent-volume] concepts in Kubernetes, [authoring components][authoring-components], and [authoring pipelines][authoring-pipelines]. +#### **Step 1:** Install the `kfp-kubernetes` library -### Step 1: Install the platform-specific plugin library with the KFP SDK +Run the following command to install the `kfp-kubernetes` library: ```sh pip install kfp[kubernetes] ``` -### Step 2: Create components that read/write to the mount path +#### **Step 2:** Create components that read/write to the mount path -Create two simple components that read and write to a file. In a later step, we will mount the associated volume to the `/data` directory. +Create two simple components that read and write to a file in the `/data` directory. + +In a later step, we will mount a PVC volume to the `/data` directory. ```python from kfp import dsl @@ -51,9 +60,11 @@ def consumer() -> str: return content ``` -### Step 3: Dynamically provision a PVC using CreatePVC +#### **Step 3:** Dynamically provision a PVC using CreatePVC + +Now that we have our components, we can begin constructing a pipeline. -Now that we have our components, we can begin constructing a pipeline. First, we need a PVC to mount. We'll use the `kubernetes.CreatePVC` pre-baked component to dynamically provision a PVC. +We need a PVC to mount, so we will create one using the `kubernetes.CreatePVC` pre-baked component: ```python from kfp import kubernetes @@ -69,12 +80,14 @@ def my_pipeline(): ) ``` -This component provisions a 5GB PVC from the [StorageClass][storage-class] `'standard'` with the `ReadWriteMany` [access mode][access-mode]. The PVC will be named after the underlying Argo workflow that creates it, concatenated with the suffix `-my-pvc`. The `CreatePVC` component returns this name as the output `'name'`. +This component provisions a 5GB PVC from the [StorageClass][storage-class] `'standard'` with the `ReadWriteMany` [access mode][access-mode]. +The PVC will be named after the underlying Argo workflow that creates it, concatenated with the suffix `-my-pvc`. The `CreatePVC` component returns this name as the output `'name'`. +#### **Step 4:** Read and write data to the PVC -### Step 4: Read and write data to the PVC +Next, we'll use the `mount_pvc` task modifier with the `producer` and `consumer` components. -Next, we'll use the `mount_pvc` task modifier with the `producer` and `consumer` components. We'll also schedule `task2` to run after `task1` to prevent the components from writing and reading to the PVC at the same time. +We schedule `task2` to run after `task1` so the components don't read and write to the PVC at the same time. ```python # write to the PVC @@ -95,13 +108,14 @@ Next, we'll use the `mount_pvc` task modifier with the `producer` and `consumer` task2.after(task1) ``` -### Step 5: Delete the PVC +#### **Step 5:** Delete the PVC Finally, we can schedule deletion of the PVC after `task2` finishes to clean up the Kubernetes resources we created. ```python delete_pvc1 = kubernetes.DeletePVC( - pvc_name=pvc1.outputs['name']).after(task2) + pvc_name=pvc1.outputs['name'] + ).after(task2) ``` For the full pipeline and more information, see a [similar example][full-example] in the [`kfp-kubernetes` documentation][kfp-kubernetes-docs]. diff --git a/content/en/docs/components/pipelines/user-guides/core-functions/run-a-pipeline.md b/content/en/docs/components/pipelines/user-guides/core-functions/run-a-pipeline.md index c5d866d8a3..7bea71445e 100644 --- a/content/en/docs/components/pipelines/user-guides/core-functions/run-a-pipeline.md +++ b/content/en/docs/components/pipelines/user-guides/core-functions/run-a-pipeline.md @@ -1,83 +1,150 @@ +++ title = "Run a Pipeline" description = "Execute a pipeline on the KFP backend" -weight = 1 +weight = 100 +++ {{% kfp-v2-keywords %}} -The KFP offers three ways to run a pipeline. +## Overview + +Kubeflow Pipelines (KFP) provides several ways to trigger a pipeline run: + +1. [__KFP Dashboard__](#run-pipeline---kfp-dashboard) +2. [__KFP SDK Client__](#run-pipeline---kfp-sdk-client) +3. [__KFP CLI__](#run-pipeline---kfp-cli) + +{{% alert title="Tip" color="info" %}} +This guide only covers how to trigger an immediate pipeline run. +For more advanced scheduling options please see the "Recurring Runs" section of the dashboard and API. +{{% /alert %}} + +## Run Pipeline - KFP Dashboard -## 1. Run from the KFP Dashboard The first and easiest way to run a pipeline is by submitting it via the KFP dashboard. -To submit a pipeline to the KFP Dashboard: +To submit a pipeline for an immediate run: + +1. [Compile a pipeline][compile-a-pipeline] to IR YAML. -1. [Compile the pipeline][compile-a-pipeline] to IR YAML. +2. From the "Pipelines" tab in the dashboard, select `+ Upload pipeline`: -2. From the Dashboard, select "+ Upload pipeline". + Upload pipeline button -Upload pipeline button +3. Upload the pipeline `.yaml`, `.zip` or `.tar.gz` file, populate the upload form, then click `Create`. -3. Upload the pipeline IR YAML file or an archived pipeline as a .zip or .tar.gz file, populate the upload pipeline form, and click “Create”. + Upload pipeline screen -Upload pipeline screen +4. From the "Runs" tab, select `+ Create run`: -4. From the Runs tab, select "+ Create run": + Create run button -Create run button +5. Select the pipeline you want to run, populate the run form, then click `Start`: -5. Choose the pipeline you uploaded, provide a name, any run parameters, and click "Start". -Start a run screen + Start a run screen +## Run Pipeline - KFP SDK Client -## 2. Run from the KFP SDK client -You may also programatically submit pipeline runs from the KFP SDK client. The client supports two ways of submitting runs: from IR YAML or from a Python pipeline function. For either approach, start by instantiating a `Client` using the `host` URL of your KFP instance: +You may also programmatically submit pipeline runs from the KFP SDK client. +The client supports two ways of submitting runs: _from IR YAML_ or _from a Python pipeline function_. + +{{% alert title="Note" color="dark" %}} +See the [Connect the SDK to the API](/docs/components/pipelines/user-guides/core-functions/connect-api/) guide for more information about creating a KFP client. +{{% /alert %}} + +For either approach, start by instantiating a [`kfp.Client()`][kfp-client]: ```python -from kfp.client import Client -client = Client(host='') +import kfp + +# TIP: you may need to authenticate with the KFP instance +kfp_client = kfp.Client() ``` -To submit IR YAML for execution use the `.create_run_from_pipeline_package` method: +To submit __IR YAML__ for execution use the [`.create_run_from_pipeline_package()`][kfp-client-create_run_from_pipeline_package] method: ```python -client.create_run_from_pipeline_package('pipeline.yaml', arguments={'param': 'a', 'other_param': 2}) +#from kfp import compiler, dsl +# +#@dsl.component +#def add(a: float, b: float) -> float: +# return a + b +# +#@dsl.pipeline(name="Add two Numbers") +#def add_pipeline(a: float, b: float): +# add_task = add(a=a, b=b) +# +#compiler.Compiler().compile( +# add_pipeline, +# package_path="./add-pipeline.yaml" +#) + +kfp_client.create_run_from_pipeline_package( + "./add-pipeline.yaml", + arguments={ + "a": 1, + "b": 2, + } +) ``` -To submit a Python pipeline function for execution use the `.create_run_from_pipeline_func` convenience method, which wraps compilation and run submission into one method: +To submit a python __pipeline function__ for execution use the [`.create_run_from_pipeline_func()`][kfp-client-create_run_from_pipeline_func] convenience method, which wraps compilation and run submission into one method: ```python -client.create_run_from_pipeline_func(pipeline_func, arguments={'param': 'a', 'other_param': 2}) +#from kfp import dsl +# +#@dsl.component +#def add(a: float, b: float) -> float: +# return a + b +# +#@dsl.pipeline(name="Add two Numbers") +#def add_pipeline(a: float, b: float): +# add_task = add(a=a, b=b) + +kfp_client.create_run_from_pipeline_func( + add_pipeline, + arguments={ + "a": 1, + "b": 2, + } +) ``` -See the [KFP SDK Client reference documentation][kfp-sdk-api-ref-client] for a detailed description of the `Client` constructor and method parameters. +## Run Pipeline - KFP CLI -## 3. Run from the KFP SDK CLI -The `kfp run create` command allows you to submit a pipeline from the command line. `kfp run create --help` shows that this command takes the form: +The [`kfp run create`][kfp-cli-run-create] command allows you to submit a pipeline from the command line. + +Here is the output of `kfp run create --help`: ```shell kfp run create [OPTIONS] [ARGS]... ``` -For example, the following command submits the `path/to/pipeline.yaml` IR YAML to the KFP backend: +For example, the following command submits the `./path/to/pipeline.yaml` IR YAML to the KFP backend: ```shell -kfp run create --experiment-name my-experiment --package-file path/to/pipeline.yaml +kfp run create \ + --experiment-name "my-experiment" \ + --package-file "./path/to/pipeline.yaml" ``` -For more information about the `kfp run create` command, see [Command Line Interface][kfp-run-create-reference-docs] in the [KFP SDK reference documentation][kfp-sdk-api-ref]. For a summary of the available commands in the KFP CLI, see [Command-line Interface][kfp-cli]. +For more information about the `kfp` CLI, please see: + +- [Use the CLI][use-the-cli] +- [CLI Reference][kfp-cli] -[kfp-sdk-api-ref]: https://kubeflow-pipelines.readthedocs.io/en/master/index.html [compile-a-pipeline]: /docs/components/pipelines/user-guides/core-functions/compile-a-pipeline/ -[kfp-sdk-api-ref-client]: https://kubeflow-pipelines.readthedocs.io/en/master/source/client.html -[kfp-cli]: /docs/components/pipelines/user-guides/core-functions/cli/ -[kfp-run-create-reference-docs]: https://kubeflow-pipelines.readthedocs.io/en/master/source/cli.html#kfp-run-create +[use-the-cli]: /docs/components/pipelines/user-guides/core-functions/cli/ +[kfp-cli]: https://kubeflow-pipelines.readthedocs.io/en/latest/source/cli.html +[kfp-cli-run-create]: https://kubeflow-pipelines.readthedocs.io/en/latest/source/cli.html#kfp-run-create +[kfp-client]: https://kubeflow-pipelines.readthedocs.io/en/latest/source/client.html#kfp.client.Client +[kfp-client-create_run_from_pipeline_func]: https://kubeflow-pipelines.readthedocs.io/en/latest/source/client.html#kfp.client.Client.create_run_from_pipeline_func +[kfp-client-create_run_from_pipeline_package]: https://kubeflow-pipelines.readthedocs.io/en/latest/source/client.html#kfp.client.Client.create_run_from_pipeline_package \ No newline at end of file diff --git a/content/en/docs/components/pipelines/user-guides/data-handling/_index.md b/content/en/docs/components/pipelines/user-guides/data-handling/_index.md index 0384499c99..b56b7b0f09 100644 --- a/content/en/docs/components/pipelines/user-guides/data-handling/_index.md +++ b/content/en/docs/components/pipelines/user-guides/data-handling/_index.md @@ -1,4 +1,5 @@ +++ title = "Data Handling" +description = "Learn how to handle data in Kubeflow Pipelines." weight = 4 +++ diff --git a/content/en/docs/components/pipelines/user-guides/data-handling/artifacts.md b/content/en/docs/components/pipelines/user-guides/data-handling/artifacts.md index 3a32ca3f1d..7f040ebd30 100644 --- a/content/en/docs/components/pipelines/user-guides/data-handling/artifacts.md +++ b/content/en/docs/components/pipelines/user-guides/data-handling/artifacts.md @@ -245,7 +245,7 @@ On the [KFP open source][oss-be] UI, `ClassificationMetrics`, `SlicedClassificat [python-components]: /docs/components/pipelines/user-guides/components/lightweight-python-components [dsl-parallelfor]: https://kubeflow-pipelines.readthedocs.io/en/latest/source/dsl.html#kfp.dsl.ParallelFor [dsl-collected]: https://kubeflow-pipelines.readthedocs.io/en/latest/source/dsl.html#kfp.dsl.Collected -[parallel-looping]: /docs/components/pipelines/user-guides/core-functions/control-flow/#parallel-looping-dslparallelfor +[parallel-looping]: /docs/components/pipelines/user-guides/core-functions/control-flow/#dslparallelfor [traditional-artifact-syntax]: /docs/components/pipelines/user-guides/data-handling/artifacts/#traditional-artifact-syntax [multiple-outputs]: /docs/components/pipelines/user-guides/data-handling/parameters/#multiple-output-parameters [pythonic-artifact-syntax]: /docs/components/pipelines/user-guides/data-handling/artifacts/#new-pythonic-artifact-syntax \ No newline at end of file diff --git a/content/en/docs/components/pipelines/user-guides/migration.md b/content/en/docs/components/pipelines/user-guides/migration.md index b78c74f831..b86575d382 100644 --- a/content/en/docs/components/pipelines/user-guides/migration.md +++ b/content/en/docs/components/pipelines/user-guides/migration.md @@ -1,258 +1,77 @@ +++ -title = "Migrate from KFP SDK v1" -description = "v1 to v2 migration instructions and breaking changes" +title = "Migrate to Kubeflow Pipelines v2" +description = "Migrate to the new Kubeflow Pipelines v2 backend and SDK." weight = 1 +++ {{% kfp-v2-keywords %}} -If you have existing KFP pipelines, either compiled to [Argo Workflow][argo] (using the SDK v1 main namespace) or to [IR YAML][ir-YAML] (using the SDK v1 v2-namespace), you can run these pipelines on the new [KFP v2 backend][oss-be-v2] without any changes. +## Overview -If you wish to author new pipelines, there are some recommended and required steps to migrate your pipeline authoring code to the KFP SDK v2. +Kubeflow Pipelines V2 is a significant update to the Kubeflow Pipelines (KFP) platform. -## Terminology - -- **SDK v1**: The `1.x.x` versions of the KFP SDK. `1.8` is the highest [minor version][semver-minor-version] of the v1 KFP SDK that will be released. -- **SDK v1 v2-namespace**: Refers to the v1 KFP SDK's `v2` module (i.e., `from kfp.v2 import *`), which permits access to v2 authoring syntax and compilation to [IR YAML][ir-yaml] from the v1 SDK. You should assume that references to the v1 SDK *do not* refer to the v2-namespace unless explicitly stated. Until the release of the [v2 KFP OSS backend][oss-be-v2], these pipelines were only executable on [Google Cloud Vertex AI Pipelines][vertex-pipelines]. -- **SDK v2**: The `2.x.x` versions of the KFP SDK. Uses the v2 authoring syntax and compiles to IR YAML. - -There are two common migration paths: - -1. If your existing `kfp==1.x.x` code imports from the `v2` namespace (i.e., `from kfp.v2 import *`), follow the [SDK v1 v2-namespace to SDK v2](#sdk-v1-v2-namespace-to-sdk-v2) migration instructions. This migration path only affects v1 SDK users that were running pipelines on [Google Cloud Vertex AI Pipelines][vertex-pipelines]. -1. If your existing `kfp==1.x.x` code imports from the main namespace (i.e., `from kfp import *`), follow the [SDK v1 to SDK v2](#sdk-v1-to-sdk-v2) migration instructions. - -## SDK v1 v2-namespace to SDK v2 - -With few exceptions, KFP SDK v2 is backward compatible with user code that uses the KFP SDK v1 v2-namespace. - -### Non-breaking changes - -This section documents non-breaking changes in SDK v2 relative to the SDK v1 v2-namespace. We suggest you migrate your code to the "New usage", even though the "Previous usage" will still work with warnings. - -#### Import namespace - -KFP SDK v1 v2-namespace imports (`from kfp.v2 import *`) should be converted to imports from the primary namespace (`from kfp import *`). - -**Change:** Remove the `.v2` module from any KFP SDK v1 v2-namespace imports. - - - - - - - - - - - - -
Previous usageNew usage
- -```python -from kfp.v2 import dsl -from kfp.v2 import compiler - -@dsl.pipeline(name='my-pipeline') -def pipeline(): - ... - -compiler.Compiler().compile(...) -``` - - - -```python -from kfp import dsl -from kfp import compiler - -@dsl.pipeline(name='my-pipeline') -def pipeline(): - ... - -compiler.Compiler().compile(...) -``` - -
- -#### output_component_file parameter - -In KFP SDK v2, components can be [compiled][compile] to and [loaded][load] from [IR YAML][ir-yaml] in the same way as pipelines. - -KFP SDK v1 v2-namespace supported compiling components via the [`@dsl.component`][dsl-component] decorator's `output_component_file` parameter. This is deprecated in KFP SDK v2. If you choose to still use this parameter, your pipeline will be compiled to [IR YAML][ir-yaml] instead of v1 component YAML. - -**Change:** Remove uses of `output_component_file`. Replace with a call to [`Compiler().compile()`][compiler-compile]. - - - - - - - - - - -
Previous usageNew usage
- -```python -from kfp.v2.dsl import component - -@component(output_component_file='my_component.yaml') -def my_component(input: str): - ... -``` - - - -```python -from kfp.dsl import component -from kfp import compiler - -@component() -def my_component(input: str): - ... - -compiler.Compiler().compile(my_component, 'my_component.yaml') -``` - -
- -#### Pipeline package file extension - -The KFP compiler will compile your pipeline according to the extension provided to the compiler (`.yaml` or `.json`). - -In KFP SDK v2, YAML is the preferred serialization format. - -**Change:** Convert `package_path` arguments that use a `.json` extension to use a `.yaml` extension. - - - - - - - - - - -
Previous usageNew usage
- -```python -from kfp.v2 import compiler -# .json extension, deprecated format -compiler.Compiler().compile(pipeline, package_path='my_pipeline.json') -``` - - - -```python -from kfp import compiler -# .yaml extension, preferred format -compiler.Compiler().compile(pipeline, package_path='my_pipeline.yaml') -``` - -
+The key features introduced by KFP V2 are: -### Breaking changes +- A more pythonic SDK - use decorators like ([`@dsl.pipeline`][dsl-pipeline], [`@dsl.component`][dsl-component], [`@dsl.container_component`][dsl-container-component]) +- Decouple from Argo Workflows - compile pipelines to a generic [IR YAML][ir-yaml] rather than Argo `Workflow` YAML +- Enhanced Workflow GUI - visualize pipelines, sub-DAGs (nested pipelines), loops, and artifacts (datasets, models, and metrics) to understand and debug your pipelines -There are only a few subtle breaking changes in SDK v2 relative to the SDK v1 v2-namespace. +## Version Matrix -#### Drop support for Python 3.6 +The first version of [Kubeflow Platform](/docs/started/introduction/#what-is-kubeflow-platform) to include the Kubeflow Pipelines V2 backend was [Kubeflow 1.8](/docs/releases/kubeflow-1.8/). -KFP SDK v1 supported Python 3.6. KFP SDK v2 supports Python >=3.7.0,\<3.12.0. - -#### CLI output change - -The v2 [KFP CLI][cli] is more consistent, readable, and parsable. Code that parsed the v1 CLI output may fail to parse the v2 CLI output. - -#### .after referencing upstream task in a dsl.ParallelFor loop - -The following pipeline cannot be compiled in KFP SDK v2: - -```python -with dsl.ParallelFor(...): - t1 = comp() -t2 = comp().after(t1) -``` - -This usage was primarily used by KFP SDK v1 users who implemented a custom `dsl.ParallelFor` fan-in. KFP SDK v2 natively supports fan-in from [`dsl.ParallelFor`][dsl-parallelfor] using [`dsl.Collected`][dsl-collected]. See [Control Flow][parallelfor-control-flow] user docs for instructions. - -#### Importer component import statement - -The location of the `importer_node` object has changed. - -**Change:** Import from `kfp.dsl`. - - - - - - - - - - -
Previous usageNew usage
- -```python -from kfp.components import importer_node -``` +The following table shows which versions of KFP backend are included in each version of Kubeflow Platform: - +Release Date | Kubeflow Platform Version | KFP Backend Version | SDK Mode: [`v1`](/docs/components/pipelines/legacy-v1/sdk/) | SDK Mode: [`v2`](/docs/components/pipelines/user-guides/core-functions/compile-a-pipeline/) | SDK Mode: [`v2-compatible`](https://v1-7-branch.kubeflow.org/docs/components/pipelines/v1/sdk-v2/) +--- | --- | --- | --- | --- | --- +2024-07-22 | [Kubeflow 1.9](/docs/releases/kubeflow-1.9/) | [2.2.0](https://github.com/kubeflow/pipelines/releases/tag/2.2.0) | | | +2023-11-01 | [Kubeflow 1.8](/docs/releases/kubeflow-1.8/) | [2.0.3](https://github.com/kubeflow/pipelines/releases/tag/2.0.3) | | | +2023-03-29 | [Kubeflow 1.7](/docs/releases/kubeflow-1.7/) | [2.0.0-alpha.7](https://github.com/kubeflow/pipelines/releases/tag/2.0.0-alpha.7) | | | +2022-10-10 | [Kubeflow 1.6](/docs/releases/kubeflow-1.6/) | [2.0.0-alpha.5](https://github.com/kubeflow/pipelines/releases/tag/2.0.0-alpha.5) | | | +2022-06-15 | [Kubeflow 1.5](/docs/releases/kubeflow-1.5/) | [1.8.2](https://github.com/kubeflow/pipelines/releases/tag/1.8.2) | | | -```python -from kfp.dsl import importer_node -``` +## Backward Compatibility -
+If you have existing KFP Pipelines that you compiled with the V1 SDK, you can run them on the new KFP V2 backend without any changes. +If you wish to author new pipelines, there are some recommended and required steps to migrate which are detailed below. -#### Adding node selector constraint/accelerator +{{% alert title="Warning" color="warning" %}} +Running V1 pipelines on KFP V2 requires that you compile and submit them using the V1 SDK. +The last version of the V1 SDK was [`kfp==1.8.22`](https://pypi.org/project/kfp/1.8.22/), there will be no further releases. +{{% /alert %}} -The task method `.add_node_selector_constraint` is deprecated in favor of `.add_node_selector_constraint`. Compared to the previous implementation of `.add_node_selector_constraint`, both methods have the `label_name` parameter removed and the `value` parameter is replaced by the parameter `accelerator`. +## Terminology -**Change:** Use `task.set_accelerator_type(accelerator=...)`. Provide the previous `value` argument to the `accelerator` parameter. Omit the `label_name`. +Term | Definition +--- | --- +SDK v1 | The `1.x.x` versions of the [`kfp`](https://pypi.org/project/kfp/1.8.22/) Python SDK. +SDK v2 | The `2.x.x` versions of the [`kfp`](https://pypi.org/project/kfp/) Python SDK. +SDK v1 (v2-namespace) | The preview V2 module that was available in the V1 SDK (e.g. `from kfp.v2 import *`).
_Only ever used by [Google Cloud Vertex AI Pipelines][vertex-pipelines] users._ - - - - - - - - - -
Previous usageNew usage
+## Migration Paths -```python -@dsl.pipeline -def my_pipeline(): - task.add_node_selector_constraint( - label_name='cloud.google.com/gke-accelerator', - value='NVIDIA_TESLA_A100', - ) -``` +How you migrate to KFP V2 will depend on your current SDK version and usage. - +There are two common migration paths: -```python -@dsl.pipeline -def my_pipeline(): - task.set_accelerator_type(accelerator="NVIDIA_TESLA_K80") -``` +1. [__SDK v1__ → __SDK v2__](#migrate-from-sdk-v1-to-sdk-v2) +2. [__SDK v1 (v2-namespace) → SDK v2__](#migrate-from-sdk-v1-v2-namespace-to-sdk-v2) -
+
-## SDK v1 to SDK v2 +### **Migrate from 'SDK v1' to 'SDK v2'** KFP SDK v2 is generally not backward compatible with user code that uses the KFP SDK v1 main namespace. This section describes some of the important breaking changes and migration steps to upgrade to KFP SDK v2. We indicate whether each breaking change affects [KFP OSS backend][oss-be-v1] users or [Google Cloud Vertex AI Pipelines][vertex-pipelines] users. -### Breaking changes +#### **Breaking changes** + +
+Click to expand +
-#### create_component_from_func and func_to_container_op support +##### **create_component_from_func and func_to_container_op support** **Affects:** KFP OSS users and Vertex AI Pipelines users @@ -323,7 +142,9 @@ def pipeline(): -#### Keyword arguments required +--- + +##### **Keyword arguments required** **Affects:** KFP OSS users and Vertex AI Pipelines users @@ -356,8 +177,9 @@ def my_pipeline(): +--- -#### ContainerOp support +##### **ContainerOp support** **Affects:** KFP OSS users @@ -403,7 +225,9 @@ def flip_coin(rand: int, result: dsl.OutputPath(str)): -#### VolumeOp and ResourceOp support +--- + +##### **VolumeOp and ResourceOp support** **Affects:** KFP OSS users @@ -411,7 +235,9 @@ def flip_coin(rand: int, result: dsl.OutputPath(str)): KFP v2 enables support for [platform-specific features](/docs/components/pipelines/user-guides/core-functions/platform-specific-features/) via KFP SDK extension libraries. Kubernetes-specific features are supported in KFP v2 via the [`kfp-kubernetes`](https://kfp-kubernetes.readthedocs.io/) extension library. -#### v1 component YAML support +--- + +##### **v1 component YAML support** **Affects:** KFP OSS users and Vertex AI Pipelines users @@ -423,7 +249,9 @@ KFP v2 will continue to support loading existing v1 component YAML using the [`c **Change:** To author components via custom image, command, and args, use the [`@dsl.container_component`][dsl-container-component] decorator as described in [Container Components][container-components]. Note that unlike when authoring v1 component YAML, Container Components do not support setting environment variables on the component itself. Environment variables should be set on the task instantiated from the component within a pipeline definition using the [`.set_env_variable`][dsl-pipelinetask-set-env-variable] task [configuration method][task-configuration-methods]. -#### v1 lightweight component types InputTextFile, InputBinaryFile, OutputTextFile and OutputBinaryFile support +--- + +##### **v1 lightweight component types InputTextFile, InputBinaryFile, OutputTextFile and OutputBinaryFile support** **Affects:** KFP OSS users and Vertex AI Pipelines users @@ -433,7 +261,9 @@ KFP SDK v2 does not support authoring with these types since users can easily do **Change:** Component authors should inputs and outputs using KFP's [artifact][artifacts] and [parameter][parameters] types. -#### AIPlatformClient support +--- + +##### **AIPlatformClient support** **Affects:** Vertex AI Pipelines users @@ -489,7 +319,9 @@ job.submit() -#### run_as_aiplatform_custom_job support +--- + +##### **run_as_aiplatform_custom_job support** **Affects:** Vertex AI Pipelines users @@ -541,7 +373,9 @@ def pipeline(): -#### Typecasting behavior change +--- + +##### **Typecasting behavior change** **Affects:** KFP OSS users and Vertex AI Pipelines users @@ -571,9 +405,274 @@ def training_pipeline(number_of_epochs: int = 1): **Change:** We recommend updating your components and pipelines to use types strictly. +--- + +
+ +
+
+ +### **Migrate from 'SDK v1 (v2-namespace)' to 'SDK v2'** + +With few exceptions, KFP SDK v2 is backward compatible with user code that uses the KFP SDK v1 v2-namespace. + +{{% alert title="Note" color="dark" %}} +This migration path ONLY affects v1 SDK users that were running pipelines on Google Cloud's Vertex AI Pipelines. +{{% /alert %}} + +#### **Non-breaking changes** + +This section documents non-breaking changes in SDK v2 relative to the SDK v1 v2-namespace. +We suggest you migrate your code to the "New usage", even though the "Previous usage" will still work with warnings. + +
+Click to expand +
+ +##### **Import namespace** + +KFP SDK v1 v2-namespace imports (`from kfp.v2 import *`) should be converted to imports from the primary namespace (`from kfp import *`). + +**Change:** Remove the `.v2` module from any KFP SDK v1 v2-namespace imports. + + + + + + + + + + + + +
Previous usageNew usage
+ +```python +from kfp.v2 import dsl +from kfp.v2 import compiler + +@dsl.pipeline(name='my-pipeline') +def pipeline(): + ... + +compiler.Compiler().compile(...) +``` + + + +```python +from kfp import dsl +from kfp import compiler + +@dsl.pipeline(name='my-pipeline') +def pipeline(): + ... + +compiler.Compiler().compile(...) +``` + +
+ +--- + +##### **output_component_file parameter** + +In KFP SDK v2, components can be [compiled][compile] to and [loaded][load] from [IR YAML][ir-yaml] in the same way as pipelines. + +KFP SDK v1 v2-namespace supported compiling components via the [`@dsl.component`][dsl-component] decorator's `output_component_file` parameter. This is deprecated in KFP SDK v2. If you choose to still use this parameter, your pipeline will be compiled to [IR YAML][ir-yaml] instead of v1 component YAML. + +**Change:** Remove uses of `output_component_file`. Replace with a call to [`Compiler().compile()`][compiler-compile]. + + + + + + + + + + +
Previous usageNew usage
+ +```python +from kfp.v2.dsl import component + +@component(output_component_file='my_component.yaml') +def my_component(input: str): + ... +``` + + + +```python +from kfp.dsl import component +from kfp import compiler + +@component() +def my_component(input: str): + ... + +compiler.Compiler().compile(my_component, 'my_component.yaml') +``` + +
+ +--- + +##### **Pipeline package file extension** + +The KFP compiler will compile your pipeline according to the extension provided to the compiler (`.yaml` or `.json`). + +In KFP SDK v2, YAML is the preferred serialization format. + +**Change:** Convert `package_path` arguments that use a `.json` extension to use a `.yaml` extension. + + + + + + + + + + +
Previous usageNew usage
+ +```python +from kfp.v2 import compiler +# .json extension, deprecated format +compiler.Compiler().compile(pipeline, package_path='my_pipeline.json') +``` + + + +```python +from kfp import compiler +# .yaml extension, preferred format +compiler.Compiler().compile(pipeline, package_path='my_pipeline.yaml') +``` + +
+ +--- + +
+ +#### **Breaking changes** + +There are only a few subtle breaking changes in SDK v2 relative to the SDK v1 v2-namespace. + +
+Click to expand +
+ +##### **Drop support for Python 3.6** + +KFP SDK v1 supported Python 3.6. KFP SDK v2 supports Python >=3.7.0,\<3.12.0. + +--- + +##### **CLI output change** + +The v2 [KFP CLI][cli] is more consistent, readable, and parsable. Code that parsed the v1 CLI output may fail to parse the v2 CLI output. + +--- + +##### **.after referencing upstream task in a dsl.ParallelFor loop** + +The following pipeline cannot be compiled in KFP SDK v2: + +```python +with dsl.ParallelFor(...): + t1 = comp() +t2 = comp().after(t1) +``` + +This usage was primarily used by KFP SDK v1 users who implemented a custom `dsl.ParallelFor` fan-in. KFP SDK v2 natively supports fan-in from [`dsl.ParallelFor`][dsl-parallelfor] using [`dsl.Collected`][dsl-collected]. See [Control Flow][parallelfor-control-flow] user docs for instructions. + +--- + +##### **Importer component import statement** + +The location of the `importer_node` object has changed. + +**Change:** Import from `kfp.dsl`. + + + + + + + + + + +
Previous usageNew usage
+ +```python +from kfp.components import importer_node +``` + + + +```python +from kfp.dsl import importer_node +``` + +
+ +--- + +##### **Adding node selector constraint/accelerator** + +The task method `.add_node_selector_constraint` is deprecated in favor of `.add_node_selector_constraint`. Compared to the previous implementation of `.add_node_selector_constraint`, both methods have the `label_name` parameter removed and the `value` parameter is replaced by the parameter `accelerator`. + +**Change:** Use `task.set_accelerator_type(accelerator=...)`. Provide the previous `value` argument to the `accelerator` parameter. Omit the `label_name`. + + + + + + + + + + +
Previous usageNew usage
+ +```python +@dsl.pipeline +def my_pipeline(): + task.add_node_selector_constraint( + label_name='cloud.google.com/gke-accelerator', + value='NVIDIA_TESLA_A100', + ) +``` + + + +```python +@dsl.pipeline +def my_pipeline(): + task.set_accelerator_type(accelerator="NVIDIA_TESLA_K80") +``` + +
+ +--- + +
+ +
+ ## Did we miss something? -If you believe we missed a breaking change or an important migration step, please [create an issue][new-issue] describing the change in the [kubeflow/pipelines repository][pipelines-repo]. +If you believe we missed a breaking change or an important migration step, please [create an issue][new-issue] describing the change in the [`kubeflow/pipelines` repository][pipelines-repo]. [artifacts]: /docs/components/pipelines/user-guides/data-handling/artifacts [cli]: /docs/components/pipelines/user-guides/core-functions/cli/ @@ -586,6 +685,7 @@ If you believe we missed a breaking change or an important migration step, pleas [dsl-collected]: https://kubeflow-pipelines.readthedocs.io/en/stable/source/dsl.html#kfp.dsl.Collected [dsl-component]: https://kubeflow-pipelines.readthedocs.io/en/stable/source/dsl.html#kfp.dsl.component [dsl-container-component]: https://kubeflow-pipelines.readthedocs.io/en/stable/source/dsl.html#kfp.dsl.container_component +[dsl-pipeline]: https://kubeflow-pipelines.readthedocs.io/en/stable/source/dsl.html#kfp.dsl.pipeline [dsl-parallelfor]: https://kubeflow-pipelines.readthedocs.io/en/stable/source/dsl.html#kfp.dsl.ParallelFor [gcpc]: https://cloud.google.com/vertex-ai/docs/pipelines/components-introduction [ir-yaml]: /docs/components/pipelines/user-guides/core-functions/compile-a-pipeline/#ir-yaml @@ -594,7 +694,7 @@ If you believe we missed a breaking change or an important migration step, pleas [new-issue]: https://github.com/kubeflow/pipelines/issues/new [oss-be-v1]: /docs/components/pipelines/legacy-v1/ [oss-be-v2]: /docs/components/pipelines/operator-guides/installation/ -[parallelfor-control-flow]: /docs/components/pipelines/user-guides/core-functions/control-flow/#parallel-looping-dslparallelfor +[parallelfor-control-flow]: /docs/components/pipelines/user-guides/core-functions/control-flow/#dslparallelfor [parameters]: /docs/components/pipelines/user-guides/data-handling/parameters [pipelines-repo]: https://github.com/kubeflow/pipelines [semver-minor-version]: https://semver.org/#:~:text=MINOR%20version%20when%20you%20add%20functionality%20in%20a%20backwards%20compatible%20manner @@ -602,6 +702,5 @@ If you believe we missed a breaking change or an important migration step, pleas [vertex-customjob]: https://cloud.google.com/vertex-ai/docs/training/create-custom-job [vertex-pipelines]: https://cloud.google.com/vertex-ai/docs/pipelines/introduction [vertex-sdk]: https://cloud.google.com/vertex-ai/docs/pipelines/run-pipeline#vertex-ai-sdk-for-python -[argo]: https://argoproj.github.io/argo-workflows/ [dsl-pipelinetask-set-env-variable]: https://kubeflow-pipelines.readthedocs.io/en/2.0.0b13/source/dsl.html#kfp.dsl.PipelineTask.set_env_variable [task-configuration-methods]: /docs/components/pipelines/user-guides/components/compose-components-into-pipelines/#task-configurations \ No newline at end of file