diff --git a/.readthedocs.yml b/.readthedocs.yml index 4b1c5f4824..2df6853225 100644 --- a/.readthedocs.yml +++ b/.readthedocs.yml @@ -16,6 +16,7 @@ build: post_create_environment: - npm install -g @mermaid-js/mermaid-cli pre_build: + - pip freeze - python -m sphinx -WETan -j auto -D language=en -b linkcheck -d _build/doctrees docs/source _build/linkcheck # Build documentation in the docs/ directory with Sphinx diff --git a/CITATION.cff b/CITATION.cff index 191a56a8f7..256c577eb8 100644 --- a/CITATION.cff +++ b/CITATION.cff @@ -45,6 +45,6 @@ authors: - family-names: Theisen given-names: Merel title: Kedro -version: 0.18.10 -date-released: 2023-06-08 +version: 0.18.11 +date-released: 2023-07-03 url: https://github.com/kedro-org/kedro diff --git a/RELEASE.md b/RELEASE.md index c23eb0ab42..ecf54c43d7 100644 --- a/RELEASE.md +++ b/RELEASE.md @@ -9,7 +9,25 @@ ## Migration guide from Kedro 0.18.* to 0.19.* -# Upcoming Release 0.18.11 +# Upcoming Release 0.18.12 + +## Major features and improvements + +## Bug fixes and other changes + +## Documentation changes + +## Breaking changes to the API + +## Upcoming deprecations for Kedro 0.19.0 +* Renamed abstract dataset classes, in accordance with the [Kedro lexicon](https://github.com/kedro-org/kedro/wiki/Kedro-documentation-style-guide#kedro-lexicon). Dataset classes ending with "DataSet" are deprecated and will be removed in 0.19.0. Note that all of the below classes are also importable from `kedro.io`; only the module where they are defined is listed as the location. + +| Type | Deprecated Alias | Location | +| -------------------------- | -------------------------- | --------------- | +| `AbstractDataset` | `AbstractDataSet` | `kedro.io.core` | +| `AbstractVersionedDataset` | `AbstractVersionedDataSet` | `kedro.io.core` | + +# Release 0.18.11 ## Major features and improvements * Added `databricks-iris` as an official starter. @@ -19,12 +37,31 @@ * Make `kedro micropkg package` accept `--verbose`. ## Documentation changes -* Added documentation for developing a Kedro project using a Databricks workspace. - -## Breaking changes to the API +* Significant improvements to the documentation that covers working with Databricks and Kedro, including a new page for workspace-only development, and a guide to choosing the best workflow for your use case. +* Updated documentation for deploying with Prefect for version 2.0. ## Upcoming deprecations for Kedro 0.19.0 -* Renamed `AbstractDataSet` and `AbstractVersionedDataSet` to `AbstractDataset` and `AbstractVersionedDataset`, respectively. +* Renamed dataset and error classes, in accordance with the [Kedro lexicon](https://github.com/kedro-org/kedro/wiki/Kedro-documentation-style-guide#kedro-lexicon). Dataset classes ending with "DataSet" and error classes starting with "DataSet" are deprecated and will be removed in 0.19.0. Note that all of the below classes are also importable from `kedro.io`; only the module where they are defined is listed as the location. + +| Type | Deprecated Alias | Location | +| --------------------------- | --------------------------- | ------------------------------ | +| `CachedDataset` | `CachedDataSet` | `kedro.io.cached_dataset` | +| `LambdaDataset` | `LambdaDataSet` | `kedro.io.lambda_dataset` | +| `IncrementalDataset` | `IncrementalDataSet` | `kedro.io.partitioned_dataset` | +| `MemoryDataset` | `MemoryDataSet` | `kedro.io.memory_dataset` | +| `PartitionedDataset` | `PartitionedDataSet` | `kedro.io.partitioned_dataset` | +| `DatasetError` | `DataSetError` | `kedro.io.core` | +| `DatasetAlreadyExistsError` | `DataSetAlreadyExistsError` | `kedro.io.core` | +| `DatasetNotFoundError` | `DataSetNotFoundError` | `kedro.io.core` | + +## Community contributions +Many thanks to the following Kedroids for contributing PRs to this release: + +* [jmalovera10](https://github.com/jmalovera10) +* [debugger24](https://github.com/debugger24) +* [juliushetzel](https://github.com/juliushetzel) +* [jacobweiss2305](https://github.com/jacobweiss2305) +* [eduardoconto](https://github.com/eduardoconto) # Release 0.18.10 diff --git a/docs/source/deployment/prefect.md b/docs/source/deployment/prefect.md index 556097faa6..64d1018984 100644 --- a/docs/source/deployment/prefect.md +++ b/docs/source/deployment/prefect.md @@ -1,40 +1,70 @@ # Prefect -This page explains how to run your Kedro pipeline using [Prefect Core](https://www.prefect.io/products/core/), an open-source workflow management system. +This page explains how to run your Kedro pipeline using [Prefect 2.0](https://www.prefect.io/products/core/), an open-source workflow management system. -In scope of this deployment, we are interested in [Prefect Server](https://docs.prefect.io/orchestration/server/overview.html#what-is-prefect-server), an open-source backend that makes it easy to monitor and execute your Prefect flows and automatically extends the Prefect Core. Prefect Server ships out-of-the-box with a fully featured user interface. +The scope of this documentation is the deployment to a self hosted [Prefect Server](https://docs.prefect.io/2.10.17/host/), which is an open-source backend that makes it easy to monitor and execute your Prefect flows and automatically extends Prefect 2.0. We will use an [Agent that dequeues submitted flow runs from a Work Queue](https://docs.prefect.io/2.10.17/tutorial/deployments/#why-workpools-and-workers). ```{note} -This deployment has been tested using kedro 0.17.6, 0.17.7 and 0.18.2 with prefect version 1.1.0. - -The current implementation has not been tested with prefect 2.0.0. +This deployment has been tested using Kedro 0.18.10 with Prefect version 2.10.17. If you want to deploy with Prefect 1.0, we recommend you review [earlier versions of Kedro's Prefect deployment documentation](https://docs.kedro.org/en/0.18.9/deployment/prefect.html). ``` ## Prerequisites -To use Prefect Core and Prefect Server, ensure you have the following prerequisites in place: +To use Prefect 2.0 and Prefect Server, ensure you have the following prerequisites in place: + +- [Prefect 2.0 is installed](https://docs.prefect.io/2.10.17/getting-started/installation/#installing-the-latest-version) on your machine + +## Setup + +Configure your `PREFECT_API_URL` to point to your local Prefect instance: + +```bash +prefect config set PREFECT_API_URL="http://127.0.0.1:4200/api" +``` + +For each new Kedro project you create, you need to decide whether to opt into [usage analytics](https://github.com/kedro-org/kedro-plugins/tree/main/kedro-telemetry). Your decision is recorded in the `.telemetry` file stored in the project root. + +```{important} +When you run a Kedro project locally, you are asked on the first `kedro` command for the project, but in this use case, the project will hang unless you follow these instructions. +``` + +Create a `.telemetry` file manually and put it in the **root of your Kedro project** and add your preference to give or decline consent. To do this, specify either `true` (to give consent) or `false`. The example given below accepts Kedro's usage analytics. + +```text +consent: true +``` -- [Prefect Core is installed](https://docs.prefect.io/core/getting_started/install.html) on your machine -- [Docker](https://www.docker.com/) and [Docker Compose](https://docs.docker.com/compose/) are installed and Docker Engine is running -- [Prefect Server is up and running](https://docs.prefect.io/orchestration/Server/deploy-local.html) -- `PREFECT__LOGGING__EXTRA_LOGGERS` environment variable is set (this is required to get Kedro logs published): +Run a Prefect Server instance: -```console -export PREFECT__LOGGING__EXTRA_LOGGERS="['kedro']" +```bash +prefect server start ``` -## How to run your Kedro pipeline using Prefect +In a separate terminal, [create a work pool](https://docs.prefect.io/2.10.17/concepts/work-pools/#work-pool-configuration) to organize the work and [create a work queue](https://docs.prefect.io/2.10.17/concepts/work-pools/#work-queues) for your agent to pull from: -### Convert your Kedro pipeline to Prefect flow +```bash +prefect work-pool create --type prefect-agent +prefect work-queue create --pool +``` + +Now run a Prefect Agent that subscribes to a work queue inside the work pool you created: + +```bash +prefect agent start --pool --work-queue +``` -To build a [Prefect flow](https://docs.prefect.io/core/concepts/flows.html) for your Kedro pipeline programmatically and register it with the Prefect API, use the following Python script, which should be stored in your project’s root directory: +## How to run your Kedro pipeline using Prefect 2.0 + +### Convert your Kedro pipeline to Prefect 2.0 flow + +To build a [Prefect flow](https://docs.prefect.io/core/concepts/flows.html) for your Kedro pipeline programmatically and register it with the Prefect API, use the following Python script, which should be stored in your project’s **root directory**: ```python # /register_prefect_flow.py +import click from pathlib import Path -from typing import Any, Dict, List, Tuple, Union +from typing import Dict, List, Union, Callable -import click from kedro.framework.hooks.manager import _create_hook_manager from kedro.framework.project import pipelines from kedro.framework.session import KedroSession @@ -42,204 +72,194 @@ from kedro.framework.startup import bootstrap_project from kedro.io import DataCatalog, MemoryDataSet from kedro.pipeline.node import Node from kedro.runner import run_node -from prefect import Client, Flow, Task -from prefect.exceptions import ClientError + +from prefect import flow, task, get_run_logger +from prefect.deployments import Deployment @click.command() -@click.option("-p", "--pipeline", "pipeline_name", default=None) -@click.option("--env", "-e", type=str, default=None) -@click.option("--package_name", "package_name", default="kedro_prefect") -def prefect_deploy(pipeline_name, env, package_name): +@click.option("-p", "--pipeline", "pipeline_name", default="__default__") +@click.option("--env", "-e", type=str, default="base") +@click.option("--deployment_name", "deployment_name", default="example") +@click.option("--work_pool_name", "work_pool_name", default="default") +@click.option("--work_queue_name", "work_queue_name", default="default") +@click.option("--version", "version", default="1.0") +def prefect_deploy( + pipeline_name, env, deployment_name, work_pool_name, work_queue_name, version +): """Register a Kedro pipeline as a Prefect flow.""" - # Project path and metadata required for session initialization task. + # Pipeline name to execute + pipeline_name = pipeline_name or "__default__" + + # Use standard deployment configuration for local execution. If you require a different + # infrastructure, check the API docs for Deployments at: https://docs.prefect.io/latest/api-ref/prefect/deployments/ + deployment = Deployment.build_from_flow( + flow=my_flow, + name=deployment_name, + path=str(Path.cwd()), + version=version, + parameters={ + "pipeline_name": pipeline_name, + "env": env, + }, + infra_overrides={"env": {"PREFECT_LOGGING_LEVEL": "DEBUG"}}, + work_pool_name=work_pool_name, + work_queue_name=work_queue_name, + ) + + deployment.apply() + + +@flow(name="my_flow") +def my_flow(pipeline_name: str, env: str): + logger = get_run_logger() project_path = Path.cwd() + metadata = bootstrap_project(project_path) + logger.info("Project name: %s", metadata.project_name) - pipeline_name = pipeline_name or "__default__" - pipeline = pipelines.get(pipeline_name) + logger.info("Initializing Kedro...") + execution_config = kedro_init( + pipeline_name=pipeline_name, project_path=project_path, env=env + ) + + logger.info("Building execution layers...") + execution_layers = init_kedro_tasks_by_execution_layer( + pipeline_name, execution_config + ) + + for layer in execution_layers: + logger.info("Running layer...") + for node_task in layer: + logger.info("Running node...") + node_task() - tasks = {} - for node, parent_nodes in pipeline.node_dependencies.items(): - # Use a function for task instantiation which avoids duplication of - # tasks - _, tasks = instantiate_task(node, tasks) - parent_tasks = [] - for parent in parent_nodes: - parent_task, tasks = instantiate_task(parent, tasks) - parent_tasks.append(parent_task) +@task() +def kedro_init( + pipeline_name: str, + project_path: Path, + env: str, +): + """ + Initializes a Kedro session and returns the DataCatalog and + KedroSession + """ + # bootstrap project within task / flow scope - tasks[node._unique_key]["parent_tasks"] = parent_tasks + logger = get_run_logger() + logger.info("Bootstrapping project") + bootstrap_project(project_path) - # Below task is used to instantiate a KedroSession within the scope of a - # Prefect flow - init_task = KedroInitTask( - pipeline_name=pipeline_name, + session = KedroSession.create( project_path=project_path, - package_name=package_name, env=env, ) + # Note that for logging inside a Prefect task logger is used. + logger.info("Session created with ID %s", session.session_id) + pipeline = pipelines.get(pipeline_name) + logger.info("Loading context...") + context = session.load_context() + catalog = context.catalog + logger.info("Registering datasets...") + unregistered_ds = pipeline.data_sets() - set(catalog.list()) # NOQA + for ds_name in unregistered_ds: + catalog.add(ds_name, MemoryDataSet()) + return {"catalog": catalog, "sess_id": session.session_id} + + +def init_kedro_tasks_by_execution_layer( + pipeline_name: str, + execution_config: Union[None, Dict[str, Union[DataCatalog, str]]] = None, +) -> List[List[Callable]]: + """ + Inits the Kedro tasks ordered topologically in groups, which implies that an earlier group + is the dependency of later one. - with Flow(pipeline_name) as flow: - generate_flow(init_task, tasks) - instantiate_client(metadata.project_name) - - # Register the flow with the server - flow.register(project_name=metadata.project_name) - - # Start a local agent that can communicate between the server - # and your flow code - flow.run_agent() - - -class KedroInitTask(Task): - """Task to initialize KedroSession""" - - def __init__( - self, - pipeline_name: str, - package_name: str, - project_path: Union[Path, str] = None, - env: str = None, - extra_params: Dict[str, Any] = None, - *args, - **kwargs, - ): - self.project_path = Path(project_path or Path.cwd()).resolve() - self.extra_params = extra_params - self.pipeline_name = pipeline_name - self.env = env - super().__init__(name=f"{package_name}_init", *args, **kwargs) - - def run(self) -> Dict[str, Union[DataCatalog, str]]: - """ - Initializes a Kedro session and returns the DataCatalog and - KedroSession - """ - # bootstrap project within task / flow scope - bootstrap_project(self.project_path) - - session = KedroSession.create( - project_path=self.project_path, - env=self.env, - extra_params=self.extra_params, # noqa: E501 - ) - # Note that for logging inside a Prefect task self.logger is used. - self.logger.info("Session created with ID %s", session.session_id) - pipeline = pipelines.get(self.pipeline_name) - context = session.load_context() - catalog = context.catalog - unregistered_ds = pipeline.data_sets() - set(catalog.list()) # NOQA - for ds_name in unregistered_ds: - catalog.add(ds_name, MemoryDataSet()) - return {"catalog": catalog, "sess_id": session.session_id} - - -class KedroTask(Task): - """Kedro node as a Prefect task.""" - - def __init__(self, node: Node): - self._node = node - super().__init__(name=node.name, tags=node.tags) - - def run(self, task_dict: Dict[str, Union[DataCatalog, str]]): - run_node( - self._node, - task_dict["catalog"], - _create_hook_manager(), - task_dict["sess_id"], - ) - + Args: + pipeline_name (str): The pipeline name to execute + execution_config (Union[None, Dict[str, Union[DataCatalog, str]]], optional): + The required execution config for each node. Defaults to None. -def instantiate_task( - node: Node, - tasks: Dict[str, Dict[str, Union[KedroTask, List[KedroTask]]]], -) -> Tuple[KedroTask, Dict[str, Dict[str, Union[KedroTask, List[KedroTask]]]]]: + Returns: + List[List[Callable]]: A list of topologically ordered task groups """ - Function pulls node task from dictionary. If node task not - available in the function instantiates the tasks and adds - it to . In this way we avoid duplicate instantiations of - the same node task. - Args: - node: Kedro node for which a Prefect task is being created. - tasks: dictionary mapping node names to a dictionary containing - node tasks and parent node tasks. + pipeline = pipelines.get(pipeline_name) - Returns: Prefect task for the passed node and task dictionary. + execution_layers = [] - """ - if tasks.get(node._unique_key) is not None: - node_task = tasks[node._unique_key]["task"] - else: - node_task = KedroTask(node) - tasks[node._unique_key] = {"task": node_task} + # Return a list of the pipeline nodes in topologically ordered groups, + # i.e. if node A needs to be run before node B, it will appear in an + # earlier group. + for layer in pipeline.grouped_nodes: + execution_layer = [] + for node in layer: + # Use a function for task instantiation which avoids duplication of + # tasks + task = instantiate_task(node, execution_config) + execution_layer.append(task) + execution_layers.append(execution_layer) - # return tasks as it is mutated. We want to make this obvious to the user. - return node_task, tasks # type: ignore[return-value] + return execution_layers -def generate_flow( - init_task: KedroInitTask, - tasks: Dict[str, Dict[str, Union[KedroTask, List[KedroTask]]]], +def kedro_task( + node: Node, task_dict: Union[None, Dict[str, Union[DataCatalog, str]]] = None ): + run_node( + node, + task_dict["catalog"], + _create_hook_manager(), + task_dict["sess_id"], + ) + + +def instantiate_task( + node: Node, + execution_config: Union[None, Dict[str, Union[DataCatalog, str]]] = None, +) -> Callable: """ - Constructs a Prefect flow given a task dictionary. Task dictionary - maps Kedro node names to a dictionary containing a node task and its - parents. + Function that wraps a Node inside a task for future execution Args: - init_task: Prefect initialisation tasks. Used to instantiate a Kedro - session within the scope of a Prefect flow. - tasks: dictionary mapping Kedro node names to a dictionary - containing a corresponding node task and its parents. + node: Kedro node for which a Prefect task is being created. + execution_config: The configurations required for the node to execute + that includes catalogs and session id + + Returns: Prefect task for the passed node - Returns: None """ - child_task_dict = init_task - for task in tasks.values(): - node_task = task["task"] - if len(task["parent_tasks"]) == 0: - # When a task has no parent only the session init task should - # precede it. - parent_tasks = [init_task] - else: - parent_tasks = task["parent_tasks"] - # Set upstream tasks and bind required kwargs. - # Note: Unpacking the return from init tasks will generate two - # sub-tasks in the prefect graph. To avoid this we pass the init - # return on unpacked. - node_task.bind(upstream_tasks=parent_tasks, task_dict=child_task_dict) - - -def instantiate_client(project_name: str): - """Initiates Prefect client""" - client = Client() - try: - client.create_project(project_name=project_name) - except ClientError: - raise + return task(lambda: kedro_task(node, execution_config)).with_options(name=node.name) if __name__ == "__main__": prefect_deploy() ``` -```{note} -The script launches a [local agent](https://docs.prefect.io/orchestration/agents/local.html). Remember to stop the agent with Ctrl-C when you complete. +Then, run the deployment script in other terminal: + +```bash +python register_prefect_flow.py --work_pool_name --work_queue_name ``` +```{note} +Be sure that your Prefect Server is up and running. Verify that the deployment script arguments match the work pool and work queue names. +``` ### Run Prefect flow -Now, having the flow registered, you can use [Prefect UI](https://docs.prefect.io/orchestration/ui/dashboard.html) to orchestrate and monitor it. +Now, having the flow registered, you can use [Prefect Server UI](https://docs.prefect.io/2.10.17/host/) to orchestrate and monitor it. -Navigate to http://localhost:8080/default?flows= to see your registered flow. +Navigate to http://localhost:4200/deployments to see your registered flow. -![](../meta/images/prefect_flows.png) +![prefect_2_flow_deployment](../meta/images/prefect_2_flow_deployment.png) -Click on the flow to open it and then trigger your flow using the "RUN"/"QUICK RUN" button. +Click on the flow to open it and then trigger your flow using the "RUN" > "QUICK RUN" button and leave the parameters by default. If you want to run a specific pipeline you can replace the `__default__` value. + +```{note} +Be sure that both your Prefect Server and Agent are up and running. +``` -![](../meta/images/prefect_flow_details.png) +![prefect_2_flow_details](../meta/images/prefect_2_flow_details.png) diff --git a/docs/source/development/commands_reference.md b/docs/source/development/commands_reference.md index 1745aee8b9..ae2933e256 100644 --- a/docs/source/development/commands_reference.md +++ b/docs/source/development/commands_reference.md @@ -114,7 +114,7 @@ Returns output similar to the following, depending on the version of Kedro used | |/ / _ \/ _` | '__/ _ \ | < __/ (_| | | | (_) | |_|\_\___|\__,_|_| \___/ -v0.18.10 +v0.18.11 Kedro is a Python framework for creating reproducible, maintainable diff --git a/docs/source/extend_kedro/plugins.md b/docs/source/extend_kedro/plugins.md index a34c28ee3d..81cd139a7c 100644 --- a/docs/source/extend_kedro/plugins.md +++ b/docs/source/extend_kedro/plugins.md @@ -84,7 +84,7 @@ setup( After that you can use this starter with `kedro new --starter=test_plugin_starter`. ```{note} -If your starter lives on a git repository, by default Kedro attempts to use a tag or branch labelled with your version of Kedro, e.g. `0.18.10`. This means that you can host different versions of your starter template on the same repository, and the correct one will automatically be used. If you do not wish to follow this structure, you should override it with the `checkout` flag, e.g. `kedro new --starter=test_plugin_starter --checkout=main`. +If your starter lives on a git repository, by default Kedro attempts to use a tag or branch labelled with your version of Kedro, e.g. `0.18.11`. This means that you can host different versions of your starter template on the same repository, and the correct one will automatically be used. If you do not wish to follow this structure, you should override it with the `checkout` flag, e.g. `kedro new --starter=test_plugin_starter --checkout=main`. ``` ## Working with `click` diff --git a/docs/source/kedro_datasets.rst b/docs/source/kedro_datasets.rst index b8e9f5d442..b3d3ab328b 100644 --- a/docs/source/kedro_datasets.rst +++ b/docs/source/kedro_datasets.rst @@ -14,6 +14,7 @@ kedro_datasets kedro_datasets.api.APIDataSet kedro_datasets.biosequence.BioSequenceDataSet kedro_datasets.dask.ParquetDataSet + kedro_datasets.databricks.ManagedTableDataSet kedro_datasets.email.EmailMessageDataSet kedro_datasets.geopandas.GeoJSONDataSet kedro_datasets.holoviews.HoloviewsWriter @@ -45,6 +46,7 @@ kedro_datasets kedro_datasets.spark.SparkDataSet kedro_datasets.spark.SparkHiveDataSet kedro_datasets.spark.SparkJDBCDataSet + kedro_datasets.spark.SparkStreamingDataSet kedro_datasets.svmlight.SVMLightDataSet kedro_datasets.tensorflow.TensorFlowModelDataSet kedro_datasets.text.TextDataSet diff --git a/docs/source/meta/images/prefect_2_flow_deployment.png b/docs/source/meta/images/prefect_2_flow_deployment.png new file mode 100644 index 0000000000..b3f725447f Binary files /dev/null and b/docs/source/meta/images/prefect_2_flow_deployment.png differ diff --git a/docs/source/meta/images/prefect_2_flow_details.png b/docs/source/meta/images/prefect_2_flow_details.png new file mode 100644 index 0000000000..c935e974a8 Binary files /dev/null and b/docs/source/meta/images/prefect_2_flow_details.png differ diff --git a/kedro/__init__.py b/kedro/__init__.py index ac4b0c5332..be9febd329 100644 --- a/kedro/__init__.py +++ b/kedro/__init__.py @@ -3,4 +3,4 @@ configuration and pipeline assembly. """ -__version__ = "0.18.10" +__version__ = "0.18.11" diff --git a/setup.py b/setup.py index 1d63d779cf..92eca04e5a 100644 --- a/setup.py +++ b/setup.py @@ -104,7 +104,7 @@ def _collect_requirements(requires): "sphinxcontrib-mermaid~=0.7.1", "myst-parser~=1.0.0", "Jinja2<3.1.0", - "kedro-datasets[all]~=1.4.0", + "kedro-datasets[all]~=1.4.2", ], "geopandas": _collect_requirements(geopandas_require), "matplotlib": _collect_requirements(matplotlib_require),