Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support versioning of the underlying dataset with PartitionedDataset #521

Closed
wants to merge 62 commits into from
Closed
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
62 commits
Select commit Hold shift + click to select a range
00e1783
Support versioning of the underlying dataset
deepyaman Sep 20, 2020
61dba32
Remove outdated "Raises" clause
deepyaman Sep 20, 2020
6f347ec
Merge branch 'master' into feature/partition-versioning
deepyaman Sep 21, 2020
a80a799
Merge branch 'master' into feature/partition-versioning
deepyaman Sep 29, 2020
fb6992d
Merge branch 'master' into feature/partition-versioning
deepyaman Oct 1, 2020
eceeb50
Merge branch 'master' into feature/partition-versioning
deepyaman Oct 2, 2020
7ce6bbc
Update kedro/io/partitioned_data_set.py
deepyaman Oct 2, 2020
70ed18d
Merge branch 'master' into feature/partition-versioning
deepyaman Oct 6, 2020
1b8c7fc
Use `Path.parents[1]` instead of `.parent` twice
deepyaman Oct 9, 2020
1232568
Merge branch 'master' into feature/partition-versioning
deepyaman Oct 12, 2020
91ecc39
Use PurePosixPath instead of Path
deepyaman Oct 21, 2020
cc50803
Make meaningful error for malformed versioned path
deepyaman Oct 21, 2020
e34f58b
Call `.as_posix()` for Windows test
deepyaman Oct 21, 2020
f757d26
Merge branch 'master' into feature/partition-versioning
deepyaman Oct 21, 2020
49c1214
Update RELEASE.md
deepyaman Oct 21, 2020
0758334
Rename variable in comprehension due to Pylint bug
deepyaman Oct 21, 2020
f3adbec
Merge branch 'master' into feature/partition-versioning
deepyaman Oct 21, 2020
a8c7bf7
Merge branch 'master' into feature/partition-versioning
deepyaman Oct 22, 2020
10a6afc
Merge branch 'master' into feature/partition-versioning
deepyaman Oct 22, 2020
b486a15
Merge branch 'master' into feature/partition-versioning
deepyaman Oct 23, 2020
5fbf972
Merge branch 'master' into feature/partition-versioning
deepyaman Oct 26, 2020
8e94bbf
Merge branch 'master' into feature/partition-versioning
deepyaman Nov 3, 2020
e39032c
Merge branch 'master' into feature/partition-versioning
deepyaman Nov 9, 2020
b56d201
Merge branch 'master' into feature/partition-versioning
deepyaman Nov 19, 2020
b4390ce
Merge branch 'master' into feature/partition-versioning
deepyaman Nov 23, 2020
5dc8c0f
Merge branch 'master' into feature/partition-versioning
deepyaman Nov 30, 2020
0aeea98
Refactor is-versioned check into a variable :tada:
deepyaman Dec 2, 2020
423c7be
Keep f-string prefix even for unformatted portions
deepyaman Dec 2, 2020
7d7e4c5
Augment error message with expected path structure
deepyaman Dec 2, 2020
bcfe409
Use that VERSION_KEY is only in versioned datasets
deepyaman Dec 2, 2020
207dfb4
Update tests/io/test_partitioned_dataset.py
deepyaman Dec 2, 2020
71e847e
Revert "Augment error message with expected path structure"
deepyaman Dec 2, 2020
519ef7d
Merge branch 'master' into feature/partition-versioning
deepyaman Dec 2, 2020
8ea477f
Update RELEASE.md
deepyaman Dec 3, 2020
8b57e46
Merge branch 'master' into feature/partition-versioning
deepyaman Dec 8, 2020
11965e4
Merge branch 'master' into feature/partition-versioning
deepyaman Dec 11, 2020
e8f5020
Merge branch 'develop' into feature/partition-versioning
DmitriiDeriabinQB Dec 14, 2020
9b5cd46
update IncrementalDataSet
DmitriiDeriabinQB Dec 14, 2020
224343c
Merge branch 'develop' into feature/partition-versioning
DmitriiDeriabinQB Dec 14, 2020
2665e46
Merge branch 'develop' into feature/partition-versioning
deepyaman Mar 8, 2021
7001adb
Merge branch 'develop' into feature/partition-versioning
deepyaman Mar 11, 2021
514a07e
Merge branch 'develop' into feature/partition-versioning
deepyaman Mar 15, 2021
b82c9f6
Merge branch 'develop' into feature/partition-versioning
deepyaman Mar 15, 2021
8619e24
Merge branch 'develop' into feature/partition-versioning
deepyaman Mar 18, 2021
865ad81
Merge branch 'develop' into feature/partition-versioning
deepyaman Apr 5, 2021
1eac08a
Merge branch 'develop' into feature/partition-versioning
deepyaman Apr 15, 2021
841361d
Merge branch 'develop' into feature/partition-versioning
deepyaman Apr 19, 2021
e08a8e5
Merge branch 'develop' into feature/partition-versioning
deepyaman Apr 20, 2021
6d3f5ea
Merge branch 'develop' into feature/partition-versioning
deepyaman Apr 22, 2021
691d9d6
Merge branch 'develop' into feature/partition-versioning
deepyaman May 3, 2021
0cbd9c2
Merge branch 'develop' into feature/partition-versioning
deepyaman May 17, 2021
aa90f92
Merge branch 'develop' into feature/partition-versioning
deepyaman May 31, 2021
364572f
Merge branch 'develop' into feature/partition-versioning
deepyaman Jun 9, 2021
3eb5b6c
Merge branch 'develop' into feature/partition-versioning
deepyaman Jun 21, 2021
1c64f64
Merge branch 'develop' into feature/partition-versioning
deepyaman Jul 9, 2021
aace8ae
Merge branch 'develop' into feature/partition-versioning
deepyaman Jul 15, 2021
de8eb4b
Merge branch 'develop' into feature/partition-versioning
deepyaman Aug 18, 2021
3b801ed
Merge branch 'develop' into feature/partition-versioning
deepyaman Aug 1, 2023
d651cee
Fix PartitionedDataSet->PartitionedDataset in test
deepyaman Aug 1, 2023
8beebf8
Fix deprecated references and blacken missed files
deepyaman Aug 2, 2023
92a9c51
Fix broken link to Prefect website in deploy guide
deepyaman Aug 2, 2023
89dbfc4
Fix more unchanged references to deprecated names
deepyaman Aug 2, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions RELEASE.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
## Bug fixes and other changes
* Fixed `TypeError` when converting dict inputs to a node made from a wrapped `partial` function.
* `PartitionedDataSet` improvements:
- Supported versioning of the underlying dataset.
- Supported passing arguments to the underlying filesystem.
* Improved handling of non-ASCII word characters in dataset names.
- For example, a dataset named `jalapeño` will be accessible as `DataCatalog.datasets.jalapeño` rather than `DataCatalog.datasets.jalape__o`.
Expand Down
25 changes: 14 additions & 11 deletions kedro/io/partitioned_data_set.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
"""
import operator
from copy import deepcopy
from pathlib import PurePosixPath
from typing import Any, Callable, Dict, List, Tuple, Type, Union
from urllib.parse import urlparse
from warnings import warn
Expand Down Expand Up @@ -58,6 +59,18 @@
S3_PROTOCOLS = ("s3", "s3a", "s3n")


def _grandparent(path: str) -> str:
path_obj = PurePosixPath(path)
grandparent = path_obj.parents[1]
if grandparent.name != path_obj.name:
last_three_parts = path_obj.relative_to(*path_obj.parts[:-3])
raise DataSetError(
f"`{path}` is not a well-formed versioned path ending with "
f"`filename/timestamp/filename` (got `{last_three_parts}`)."
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Optional) Maybe also add a hint about that was expected? Something like:

f"`filename/timestamp/filename` (got `{last_three_parts}`, "
f"expected `{path_obj.name}/<version>/{path_obj.name}`)."

deepyaman marked this conversation as resolved.
Show resolved Hide resolved
)
return str(grandparent)
DmitriiDeriabinQB marked this conversation as resolved.
Show resolved Hide resolved


class PartitionedDataSet(AbstractDataSet):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @deepyaman! I believe if we want this to be merged before 0.17, we might need these changes to be implemented for the IncrementalDataSet too 🤔 (since _list_partitions method for the incremental one isn't inherited from the PartitionedDataSet). Please let me know whether it makes sense? 🙂

Copy link
Contributor

@lorenabalan lorenabalan Dec 14, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also should this be merged in develop instead?

# pylint: disable=too-many-instance-attributes,protected-access
"""``PartitionedDataSet`` loads and saves partitioned file-like data using the
Expand Down Expand Up @@ -144,9 +157,6 @@ def __init__( # pylint: disable=too-many-arguments
the filesystem implementation.
fs_args: Extra arguments to pass into underlying filesystem class constructor
(e.g. `{"project": "my-project"}` for ``GCSFileSystem``)

Raises:
DataSetError: If versioning is enabled for the underlying dataset.
"""
# pylint: disable=import-outside-toplevel
from fsspec.utils import infer_storage_options # for performance reasons
Expand All @@ -160,13 +170,6 @@ def __init__( # pylint: disable=too-many-arguments

dataset = dataset if isinstance(dataset, dict) else {"type": dataset}
self._dataset_type, self._dataset_config = parse_dataset_definition(dataset)
if VERSION_KEY in self._dataset_config:
raise DataSetError(
"`{}` does not support versioning of the underlying dataset. "
"Please remove `{}` flag from the dataset definition.".format(
self.__class__.__name__, VERSIONED_FLAG_KEY
)
)

self._credentials, dataset_credentials = _split_credentials(credentials)
if dataset_credentials:
Expand Down Expand Up @@ -217,7 +220,7 @@ def _normalized_path(self) -> str:
@cachedmethod(cache=operator.attrgetter("_partition_cache"))
def _list_partitions(self) -> List[str]:
return [
deepyaman marked this conversation as resolved.
Show resolved Hide resolved
path
_grandparent(path) if self._dataset_config.get(VERSION_KEY) else path
deepyaman marked this conversation as resolved.
Show resolved Hide resolved
for path in self._filesystem.find(self._normalized_path, **self._load_args)
if path.endswith(self._filename_suffix)
]
Expand Down
64 changes: 54 additions & 10 deletions tests/io/test_partitioned_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def partitioned_data_pandas():

@pytest.fixture
def local_csvs(tmp_path, partitioned_data_pandas):
local_dir = Path(str(tmp_path / "csvs"))
local_dir = tmp_path / "csvs"
local_dir.mkdir()

for k, data in partitioned_data_pandas.items():
Expand All @@ -62,6 +62,11 @@ def local_csvs(tmp_path, partitioned_data_pandas):
return local_dir


@pytest.fixture
def filepath_csvs(tmp_path):
return str(tmp_path / "csvs")


LOCAL_DATASET_DEFINITION = [
"pandas.CSVDataSet",
"kedro.extras.datasets.pandas.CSVDataSet",
Expand Down Expand Up @@ -286,17 +291,56 @@ def test_invalid_dataset_config(self, dataset_config, error_pattern):
@pytest.mark.parametrize(
"dataset_config",
[
{"type": CSVDataSet, "versioned": True},
{"type": "pandas.CSVDataSet", "versioned": True},
{**ds_config, "versioned": True}
for ds_config in LOCAL_DATASET_DEFINITION
if isinstance(ds_config, dict)
],
)
def test_versioned_dataset_not_allowed(self, dataset_config):
pattern = (
"`PartitionedDataSet` does not support versioning of the underlying "
"dataset. Please remove `versioned` flag from the dataset definition."
@pytest.mark.parametrize(
"suffix,expected_num_parts", [("", 5), (".csv", 3), ("p4", 1)]
)
def test_versioned_dataset_save_and_load(
self,
filepath_csvs,
dataset_config,
suffix,
expected_num_parts,
partitioned_data_pandas,
):
"""Test that saved and reloaded data matches the original one for
the versioned data set."""
PartitionedDataSet(filepath_csvs, dataset_config).save(partitioned_data_pandas)

pds = PartitionedDataSet(filepath_csvs, dataset_config, filename_suffix=suffix)
loaded_partitions = pds.load()

assert len(loaded_partitions.keys()) == expected_num_parts
for partition_id, load_func in loaded_partitions.items():
df = load_func()
assert_frame_equal(df, partitioned_data_pandas[partition_id + suffix])
if suffix:
assert not partition_id.endswith(suffix)

def test_malformed_versioned_path(self, tmp_path):
local_dir = tmp_path / "files"
local_dir.mkdir()

path = local_dir / "path/to/folder/new/partition/version/partition/file"
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text("content")

pds = PartitionedDataSet(
str(local_dir / "path/to/folder"),
{"type": "pandas.CSVDataSet", "versioned": True},
)
with pytest.raises(DataSetError, match=re.escape(pattern)):
PartitionedDataSet(str(Path.cwd()), dataset_config)

pattern = re.escape(
f"`{path.as_posix()}` is not a well-formed versioned path ending with "
"`filename/timestamp/filename` (got `version/partition/"
"file`)."
deepyaman marked this conversation as resolved.
Show resolved Hide resolved
)
with pytest.raises(DataSetError, match=pattern):
pds.load()

def test_no_partitions(self, tmpdir):
pds = PartitionedDataSet(str(tmpdir), "pandas.CSVDataSet")
Expand Down Expand Up @@ -328,7 +372,7 @@ def test_no_partitions(self, tmpdir):
def test_filepath_arg_warning(self, pds_config, filepath_arg):
pattern = (
f"`{filepath_arg}` key must not be specified in the dataset definition as it "
f"will be overwritten by partition path"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We usually keep an f prefix for all string parts even if they aren't formatted (at least this is how PyCharm does it by default)

"will be overwritten by partition path"
deepyaman marked this conversation as resolved.
Show resolved Hide resolved
)
with pytest.warns(UserWarning, match=re.escape(pattern)):
PartitionedDataSet(**pds_config)
Expand Down