Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support versioning of the underlying dataset with PartitionedDataset #521

Closed
wants to merge 62 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
62 commits
Select commit Hold shift + click to select a range
00e1783
Support versioning of the underlying dataset
deepyaman Sep 20, 2020
61dba32
Remove outdated "Raises" clause
deepyaman Sep 20, 2020
6f347ec
Merge branch 'master' into feature/partition-versioning
deepyaman Sep 21, 2020
a80a799
Merge branch 'master' into feature/partition-versioning
deepyaman Sep 29, 2020
fb6992d
Merge branch 'master' into feature/partition-versioning
deepyaman Oct 1, 2020
eceeb50
Merge branch 'master' into feature/partition-versioning
deepyaman Oct 2, 2020
7ce6bbc
Update kedro/io/partitioned_data_set.py
deepyaman Oct 2, 2020
70ed18d
Merge branch 'master' into feature/partition-versioning
deepyaman Oct 6, 2020
1b8c7fc
Use `Path.parents[1]` instead of `.parent` twice
deepyaman Oct 9, 2020
1232568
Merge branch 'master' into feature/partition-versioning
deepyaman Oct 12, 2020
91ecc39
Use PurePosixPath instead of Path
deepyaman Oct 21, 2020
cc50803
Make meaningful error for malformed versioned path
deepyaman Oct 21, 2020
e34f58b
Call `.as_posix()` for Windows test
deepyaman Oct 21, 2020
f757d26
Merge branch 'master' into feature/partition-versioning
deepyaman Oct 21, 2020
49c1214
Update RELEASE.md
deepyaman Oct 21, 2020
0758334
Rename variable in comprehension due to Pylint bug
deepyaman Oct 21, 2020
f3adbec
Merge branch 'master' into feature/partition-versioning
deepyaman Oct 21, 2020
a8c7bf7
Merge branch 'master' into feature/partition-versioning
deepyaman Oct 22, 2020
10a6afc
Merge branch 'master' into feature/partition-versioning
deepyaman Oct 22, 2020
b486a15
Merge branch 'master' into feature/partition-versioning
deepyaman Oct 23, 2020
5fbf972
Merge branch 'master' into feature/partition-versioning
deepyaman Oct 26, 2020
8e94bbf
Merge branch 'master' into feature/partition-versioning
deepyaman Nov 3, 2020
e39032c
Merge branch 'master' into feature/partition-versioning
deepyaman Nov 9, 2020
b56d201
Merge branch 'master' into feature/partition-versioning
deepyaman Nov 19, 2020
b4390ce
Merge branch 'master' into feature/partition-versioning
deepyaman Nov 23, 2020
5dc8c0f
Merge branch 'master' into feature/partition-versioning
deepyaman Nov 30, 2020
0aeea98
Refactor is-versioned check into a variable :tada:
deepyaman Dec 2, 2020
423c7be
Keep f-string prefix even for unformatted portions
deepyaman Dec 2, 2020
7d7e4c5
Augment error message with expected path structure
deepyaman Dec 2, 2020
bcfe409
Use that VERSION_KEY is only in versioned datasets
deepyaman Dec 2, 2020
207dfb4
Update tests/io/test_partitioned_dataset.py
deepyaman Dec 2, 2020
71e847e
Revert "Augment error message with expected path structure"
deepyaman Dec 2, 2020
519ef7d
Merge branch 'master' into feature/partition-versioning
deepyaman Dec 2, 2020
8ea477f
Update RELEASE.md
deepyaman Dec 3, 2020
8b57e46
Merge branch 'master' into feature/partition-versioning
deepyaman Dec 8, 2020
11965e4
Merge branch 'master' into feature/partition-versioning
deepyaman Dec 11, 2020
e8f5020
Merge branch 'develop' into feature/partition-versioning
DmitriiDeriabinQB Dec 14, 2020
9b5cd46
update IncrementalDataSet
DmitriiDeriabinQB Dec 14, 2020
224343c
Merge branch 'develop' into feature/partition-versioning
DmitriiDeriabinQB Dec 14, 2020
2665e46
Merge branch 'develop' into feature/partition-versioning
deepyaman Mar 8, 2021
7001adb
Merge branch 'develop' into feature/partition-versioning
deepyaman Mar 11, 2021
514a07e
Merge branch 'develop' into feature/partition-versioning
deepyaman Mar 15, 2021
b82c9f6
Merge branch 'develop' into feature/partition-versioning
deepyaman Mar 15, 2021
8619e24
Merge branch 'develop' into feature/partition-versioning
deepyaman Mar 18, 2021
865ad81
Merge branch 'develop' into feature/partition-versioning
deepyaman Apr 5, 2021
1eac08a
Merge branch 'develop' into feature/partition-versioning
deepyaman Apr 15, 2021
841361d
Merge branch 'develop' into feature/partition-versioning
deepyaman Apr 19, 2021
e08a8e5
Merge branch 'develop' into feature/partition-versioning
deepyaman Apr 20, 2021
6d3f5ea
Merge branch 'develop' into feature/partition-versioning
deepyaman Apr 22, 2021
691d9d6
Merge branch 'develop' into feature/partition-versioning
deepyaman May 3, 2021
0cbd9c2
Merge branch 'develop' into feature/partition-versioning
deepyaman May 17, 2021
aa90f92
Merge branch 'develop' into feature/partition-versioning
deepyaman May 31, 2021
364572f
Merge branch 'develop' into feature/partition-versioning
deepyaman Jun 9, 2021
3eb5b6c
Merge branch 'develop' into feature/partition-versioning
deepyaman Jun 21, 2021
1c64f64
Merge branch 'develop' into feature/partition-versioning
deepyaman Jul 9, 2021
aace8ae
Merge branch 'develop' into feature/partition-versioning
deepyaman Jul 15, 2021
de8eb4b
Merge branch 'develop' into feature/partition-versioning
deepyaman Aug 18, 2021
3b801ed
Merge branch 'develop' into feature/partition-versioning
deepyaman Aug 1, 2023
d651cee
Fix PartitionedDataSet->PartitionedDataset in test
deepyaman Aug 1, 2023
8beebf8
Fix deprecated references and blacken missed files
deepyaman Aug 2, 2023
92a9c51
Fix broken link to Prefect website in deploy guide
deepyaman Aug 2, 2023
89dbfc4
Fix more unchanged references to deprecated names
deepyaman Aug 2, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions RELEASE.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Upcoming Release 0.19.0

## Major features and improvements
* `PartitionedDataSet` and `IncrementalDataSet` now both support versioning of the underlying dataset.

## Bug fixes and other changes

Expand Down Expand Up @@ -900,8 +901,7 @@ from kedro.framework.session import KedroSession
* We've added a `DeprecationWarning` to the Transformers API when adding a transformer to the catalog. These will be removed in release 0.18.0. Use Hooks to customise the `load` and `save` methods.

## Thanks for supporting contributions
[Deepyaman Datta](https://github.com/deepyaman),
[Zach Schuster](https://github.com/zschuster)
[Deepyaman Datta](https://github.com/deepyaman), [Zach Schuster](https://github.com/zschuster)

## Migration guide from Kedro 0.16.* to 0.17.*

Expand Down
2 changes: 1 addition & 1 deletion docs/source/deployment/prefect.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Prefect

This page explains how to run your Kedro pipeline using [Prefect 2.0](https://www.prefect.io/products/core/), an open-source workflow management system.
This page explains how to run your Kedro pipeline using [Prefect 2.0](https://www.prefect.io/opensource), an open-source workflow management system.

The scope of this documentation is the deployment to a self hosted [Prefect Server](https://docs.prefect.io/2.10.17/host/), which is an open-source backend that makes it easy to monitor and execute your Prefect flows and automatically extends Prefect 2.0. We will use an [Agent that dequeues submitted flow runs from a Work Queue](https://docs.prefect.io/2.10.17/tutorial/deployments/#why-workpools-and-workers).

Expand Down
33 changes: 21 additions & 12 deletions kedro/io/partitioned_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import operator
import warnings
from copy import deepcopy
from pathlib import PurePosixPath
from typing import Any, Callable
from urllib.parse import urlparse

Expand Down Expand Up @@ -36,6 +37,18 @@
IncrementalDataSet: type[IncrementalDataset]


def _grandparent(path: str) -> str:
path_obj = PurePosixPath(path)
grandparent = path_obj.parents[1]
if grandparent.name != path_obj.name:
last_three_parts = path_obj.relative_to(*path_obj.parts[:-3])
raise DatasetError(
f"`{path}` is not a well-formed versioned path ending with "
f"`filename/timestamp/filename` (got `{last_three_parts}`)."
)
return str(grandparent)


class PartitionedDataset(AbstractDataSet):
# noqa: too-many-instance-attributes,protected-access
"""``PartitionedDataset`` loads and saves partitioned file-like data using the
Expand Down Expand Up @@ -182,7 +195,7 @@ def __init__( # noqa: too-many-arguments
load_args: Keyword arguments to be passed into ``find()`` method of
the filesystem implementation.
fs_args: Extra arguments to pass into underlying filesystem class constructor
(e.g. `{"project": "my-project"}` for ``GCSFileSystem``)
(e.g. `{"project": "my-project"}` for ``GCSFileSystem``).
overwrite: If True, any existing partitions will be removed.
metadata: Any arbitrary metadata.
This is ignored by Kedro, but may be consumed by users or external plugins.
Expand All @@ -204,12 +217,6 @@ def __init__( # noqa: too-many-arguments

dataset = dataset if isinstance(dataset, dict) else {"type": dataset}
self._dataset_type, self._dataset_config = parse_dataset_definition(dataset)
if VERSION_KEY in self._dataset_config:
raise DatasetError(
f"'{self.__class__.__name__}' does not support versioning of the "
f"underlying dataset. Please remove '{VERSIONED_FLAG_KEY}' flag from "
f"the dataset definition."
)

if credentials:
if CREDENTIALS_KEY in self._dataset_config:
Expand Down Expand Up @@ -260,8 +267,9 @@ def _normalized_path(self) -> str:

@cachedmethod(cache=operator.attrgetter("_partition_cache"))
def _list_partitions(self) -> list[str]:
dataset_is_versioned = VERSION_KEY in self._dataset_config
return [
path
_grandparent(path) if dataset_is_versioned else path
for path in self._filesystem.find(self._normalized_path, **self._load_args)
if path.endswith(self._filename_suffix)
]
Expand Down Expand Up @@ -442,7 +450,7 @@ def __init__( # noqa: too-many-arguments
This is ignored by Kedro, but may be consumed by users or external plugins.

Raises:
DatasetError: If versioning is enabled for the underlying dataset.
DatasetError: If versioning is enabled for the checkpoint dataset.
"""

super().__init__(
Expand Down Expand Up @@ -502,6 +510,7 @@ def _list_partitions(self) -> list[str]:
checkpoint_path = self._filesystem._strip_protocol( # noqa: protected-access
self._checkpoint_config[self._filepath_arg]
)
dataset_is_versioned = VERSION_KEY in self._dataset_config

def _is_valid_partition(partition) -> bool:
if not partition.endswith(self._filename_suffix):
Expand All @@ -515,9 +524,9 @@ def _is_valid_partition(partition) -> bool:
return self._comparison_func(partition_id, checkpoint)

return sorted(
part
for part in self._filesystem.find(self._normalized_path, **self._load_args)
if _is_valid_partition(part)
_grandparent(path) if dataset_is_versioned else path
for path in self._filesystem.find(self._normalized_path, **self._load_args)
if _is_valid_partition(path)
)

@property
Expand Down
66 changes: 65 additions & 1 deletion tests/io/test_incremental_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,11 +255,75 @@ def test_checkpoint_type(
),
],
)
def test_version_not_allowed(self, tmp_path, checkpoint_config, error_pattern):
def test_checkpoint_versioning_not_allowed(
self, tmp_path, checkpoint_config, error_pattern
):
"""Test that invalid checkpoint configurations raise expected errors"""
with pytest.raises(DatasetError, match=re.escape(error_pattern)):
IncrementalDataset(str(tmp_path), DATASET, checkpoint=checkpoint_config)

@pytest.mark.parametrize("dataset_config", [{"type": DATASET, "versioned": True}])
@pytest.mark.parametrize(
"suffix,expected_num_parts", [("", 5), (".csv", 5), ("bad", 0)]
)
def test_versioned_dataset_save_and_load(
self,
mocker,
tmp_path,
partitioned_data_pandas,
dataset_config,
suffix,
expected_num_parts,
):
"""Test that saved and reloaded data matches the original one for
the versioned data set."""
save_version = "2020-01-01T00.00.00.000Z"
mock_ts = mocker.patch(
"kedro.io.core.generate_timestamp", return_value=save_version
)
IncrementalDataset(str(tmp_path), dataset_config).save(partitioned_data_pandas)
mock_ts.assert_called_once()

dataset = IncrementalDataset(
str(tmp_path), dataset_config, filename_suffix=suffix
)
loaded_partitions = dataset.load()

assert len(loaded_partitions) == expected_num_parts

actual_save_versions = set()
for part in loaded_partitions:
partition_dir = tmp_path / (part + suffix)
actual_save_versions |= {each.name for each in partition_dir.iterdir()}
assert partition_dir.is_dir()
assert_frame_equal(
loaded_partitions[part], partitioned_data_pandas[part + suffix]
)

if expected_num_parts:
# all partitions were saved using the same version string
assert actual_save_versions == {save_version}

def test_malformed_versioned_path(self, tmp_path):
local_dir = tmp_path / "files"
local_dir.mkdir()

path = local_dir / "path/to/folder/new/partition/version/partition/file"
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text("content")

dataset = IncrementalDataset(
str(local_dir / "path/to/folder"),
{"type": "pandas.CSVDataSet", "versioned": True},
)

pattern = re.escape(
f"`{path.as_posix()}` is not a well-formed versioned path ending with "
f"`filename/timestamp/filename` (got `version/partition/file`)."
)
with pytest.raises(DatasetError, match=pattern):
dataset.load()

@pytest.mark.parametrize(
"pds_config,fs_creds,dataset_creds,checkpoint_creds",
[
Expand Down
74 changes: 65 additions & 9 deletions tests/io/test_partitioned_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def partitioned_data_pandas():

@pytest.fixture
def local_csvs(tmp_path, partitioned_data_pandas):
local_dir = Path(str(tmp_path / "csvs"))
local_dir = tmp_path / "csvs"
local_dir.mkdir()

for k, data in partitioned_data_pandas.items():
Expand All @@ -37,6 +37,11 @@ def local_csvs(tmp_path, partitioned_data_pandas):
return local_dir


@pytest.fixture
def filepath_csvs(tmp_path):
return str(tmp_path / "csvs")


LOCAL_DATASET_DEFINITION = [
"pandas.CSVDataSet",
"kedro.extras.datasets.pandas.CSVDataSet",
Expand Down Expand Up @@ -278,17 +283,68 @@ def test_invalid_dataset_config(self, dataset_config, error_pattern):
@pytest.mark.parametrize(
"dataset_config",
[
{"type": CSVDataSet, "versioned": True},
{"type": "pandas.CSVDataSet", "versioned": True},
{**ds_config, "versioned": True}
for ds_config in LOCAL_DATASET_DEFINITION
if isinstance(ds_config, dict)
],
)
def test_versioned_dataset_not_allowed(self, dataset_config):
pattern = (
"'PartitionedDataset' does not support versioning of the underlying "
"dataset. Please remove 'versioned' flag from the dataset definition."
@pytest.mark.parametrize(
"suffix,expected_num_parts", [("", 5), (".csv", 3), ("p4", 1)]
)
def test_versioned_dataset_save_and_load(
self,
mocker,
filepath_csvs,
dataset_config,
suffix,
expected_num_parts,
partitioned_data_pandas,
): # pylint: disable=too-many-locals
"""Test that saved and reloaded data matches the original one for
the versioned data set."""
save_version = "2020-01-01T00.00.00.000Z"
mock_ts = mocker.patch(
"kedro.io.core.generate_timestamp", return_value=save_version
)
with pytest.raises(DatasetError, match=re.escape(pattern)):
PartitionedDataset(str(Path.cwd()), dataset_config)
PartitionedDataset(filepath_csvs, dataset_config).save(partitioned_data_pandas)
mock_ts.assert_called_once()

pds = PartitionedDataset(filepath_csvs, dataset_config, filename_suffix=suffix)
loaded_partitions = pds.load()

assert len(loaded_partitions) == expected_num_parts
actual_save_versions = set()
for partition_id, load_func in loaded_partitions.items():
partition_dir = Path(filepath_csvs, partition_id + suffix)
actual_save_versions |= {each.name for each in partition_dir.iterdir()}
df = load_func()
assert_frame_equal(df, partitioned_data_pandas[partition_id + suffix])
if suffix:
assert not partition_id.endswith(suffix)

if expected_num_parts:
# all partitions were saved using the same version string
assert actual_save_versions == {save_version}

def test_malformed_versioned_path(self, tmp_path):
local_dir = tmp_path / "files"
local_dir.mkdir()

path = local_dir / "path/to/folder/new/partition/version/partition/file"
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text("content")

pds = PartitionedDataset(
str(local_dir / "path/to/folder"),
{"type": "pandas.CSVDataSet", "versioned": True},
)

pattern = re.escape(
f"`{path.as_posix()}` is not a well-formed versioned path ending with "
f"`filename/timestamp/filename` (got `version/partition/file`)."
)
with pytest.raises(DatasetError, match=pattern):
pds.load()

def test_no_partitions(self, tmpdir):
pds = PartitionedDataset(str(tmpdir), "pandas.CSVDataSet")
Expand Down