Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main' into feature/save-pd-to-…
Browse files Browse the repository at this point in the history
…snowflaketable
  • Loading branch information
tdhooghe committed Oct 15, 2024
2 parents 60d745e + 216a110 commit 03be4d9
Show file tree
Hide file tree
Showing 7 changed files with 708 additions and 175 deletions.
376 changes: 218 additions & 158 deletions kedro-datasets/RELEASE.md

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions kedro-datasets/docs/source/api/kedro_datasets.rst
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ kedro_datasets
holoviews.HoloviewsWriter
huggingface.HFDataset
huggingface.HFTransformerPipelineDataset
ibis.FileDataset
ibis.TableDataset
json.JSONDataset
matlab.MatlabDataset
Expand Down
4 changes: 3 additions & 1 deletion kedro-datasets/kedro_datasets/ibis/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@
import lazy_loader as lazy

# https://github.com/pylint-dev/pylint/issues/4300#issuecomment-1043601901
FileDataset: Any
TableDataset: Any

__getattr__, __dir__, __all__ = lazy.attach(
__name__, submod_attrs={"table_dataset": ["TableDataset"]}
__name__,
submod_attrs={"file_dataset": ["FileDataset"], "table_dataset": ["TableDataset"]},
)
195 changes: 195 additions & 0 deletions kedro-datasets/kedro_datasets/ibis/file_dataset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
"""Provide file loading and saving functionality for Ibis's backends."""
from __future__ import annotations

from copy import deepcopy
from pathlib import Path, PurePosixPath
from typing import TYPE_CHECKING, Any, ClassVar

import ibis.expr.types as ir
from kedro.io import AbstractVersionedDataset, DatasetError, Version

if TYPE_CHECKING:
from ibis import BaseBackend


class FileDataset(AbstractVersionedDataset[ir.Table, ir.Table]):
"""``FileDataset`` loads/saves data from/to a specified file format.
Example usage for the
`YAML API <https://docs.kedro.org/en/stable/data/data_catalog_yaml_examples.html>`_:
.. code-block:: yaml
cars:
type: ibis.FileDataset
filepath: data/01_raw/company/cars.csv
file_format: csv
table_name: cars
connection:
backend: duckdb
database: company.db
load_args:
sep: ","
nullstr: "#NA"
save_args:
sep: ","
nullstr: "#NA"
motorbikes:
type: ibis.FileDataset
filepath: s3://your_bucket/data/02_intermediate/company/motorbikes/
file_format: delta
table_name: motorbikes
connection:
backend: polars
Example usage for the
`Python API <https://docs.kedro.org/en/stable/data/\
advanced_data_catalog_usage.html>`_:
.. code-block:: pycon
>>> import ibis
>>> from kedro_datasets.ibis import FileDataset
>>>
>>> data = ibis.memtable({"col1": [1, 2], "col2": [4, 5], "col3": [5, 6]})
>>>
>>> dataset = FileDataset(
... filepath=tmp_path / "test.csv",
... file_format="csv",
... table_name="test",
... connection={"backend": "duckdb", "database": tmp_path / "file.db"},
... )
>>> dataset.save(data)
>>> reloaded = dataset.load()
>>> assert data.execute().equals(reloaded.execute())
"""

DEFAULT_LOAD_ARGS: ClassVar[dict[str, Any]] = {}
DEFAULT_SAVE_ARGS: ClassVar[dict[str, Any]] = {}

_connections: ClassVar[dict[tuple[tuple[str, str]], BaseBackend]] = {}

def __init__( # noqa: PLR0913
self,
filepath: str,
file_format: str = "parquet",
*,
table_name: str | None = None,
connection: dict[str, Any] | None = None,
load_args: dict[str, Any] | None = None,
save_args: dict[str, Any] | None = None,
version: Version | None = None,
metadata: dict[str, Any] | None = None,
) -> None:
"""Creates a new ``FileDataset`` pointing to the given filepath.
``FileDataset`` connects to the Ibis backend object constructed
from the connection configuration. The `backend` key provided in
the config can be any of the `supported backends <https://ibis-\
project.org/install>`_. The remaining dictionary entries will be
passed as arguments to the underlying ``connect()`` method (e.g.
`ibis.duckdb.connect() <https://ibis-project.org/backends/duckdb\
#ibis.duckdb.connect>`_).
The read method corresponding to the given ``file_format`` (e.g.
`read_csv() <https://ibis-project.org/backends/\
duckdb#ibis.backends.duckdb.Backend.read_csv>`_) is used to load
the file with the backend. Note that only the data is loaded; no
link to the underlying file exists past ``FileDataset.load()``.
Args:
filepath: Path to a file to register as a table. Most useful
for loading data into your data warehouse (for testing).
On save, the backend exports data to the specified path.
file_format: String specifying the file format for the file.
Defaults to writing execution results to a Parquet file.
table_name: The name to use for the created table (on load).
connection: Configuration for connecting to an Ibis backend.
load_args: Additional arguments passed to the Ibis backend's
`read_{file_format}` method.
save_args: Additional arguments passed to the Ibis backend's
`to_{file_format}` method.
version: If specified, should be an instance of
``kedro.io.core.Version``. If its ``load`` attribute is
None, the latest version will be loaded. If its ``save``
attribute is None, save version will be autogenerated.
metadata: Any arbitrary metadata. This is ignored by Kedro,
but may be consumed by users or external plugins.
"""
self._file_format = file_format
self._table_name = table_name
self._connection_config = connection
self.metadata = metadata

super().__init__(
filepath=PurePosixPath(filepath),
version=version,
exists_function=lambda filepath: Path(filepath).exists(),
)

# Set load and save arguments, overwriting defaults if provided.
self._load_args = deepcopy(self.DEFAULT_LOAD_ARGS)
if load_args is not None:
self._load_args.update(load_args)

self._save_args = deepcopy(self.DEFAULT_SAVE_ARGS)
if save_args is not None:
self._save_args.update(save_args)

@property
def connection(self) -> BaseBackend:
"""The ``Backend`` instance for the connection configuration."""

def hashable(value):
"""Return a hashable key for a potentially-nested object."""
if isinstance(value, dict):
return tuple((k, hashable(v)) for k, v in sorted(value.items()))
if isinstance(value, list):
return tuple(hashable(x) for x in value)
return value

cls = type(self)
key = hashable(self._connection_config)
if key not in cls._connections:
import ibis

config = deepcopy(self._connection_config)
backend_attr = config.pop("backend") if config else None
backend = getattr(ibis, backend_attr)
cls._connections[key] = backend.connect(**config)

return cls._connections[key]

def load(self) -> ir.Table:
load_path = self._get_load_path()
reader = getattr(self.connection, f"read_{self._file_format}")
return reader(load_path, self._table_name, **self._load_args)

def save(self, data: ir.Table) -> None:
save_path = self._get_save_path()
Path(save_path).parent.mkdir(parents=True, exist_ok=True)
writer = getattr(self.connection, f"to_{self._file_format}")
writer(data, save_path, **self._save_args)

def _describe(self) -> dict[str, Any]:
return {
"filepath": self._filepath,
"file_format": self._file_format,
"table_name": self._table_name,
"backend": self._connection_config.get("backend")
if self._connection_config
else None,
"load_args": self._load_args,
"save_args": self._save_args,
"version": self._version,
}

def _exists(self) -> bool:
try:
load_path = self._get_load_path()
except DatasetError:
return False

return Path(load_path).exists()
29 changes: 15 additions & 14 deletions kedro-datasets/kedro_datasets/ibis/table_dataset.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
"""Provide data loading and saving functionality for Ibis's backends."""
from __future__ import annotations

import warnings
from copy import deepcopy
from typing import TYPE_CHECKING, Any, ClassVar

import ibis.expr.types as ir
from kedro.io import AbstractDataset, DatasetError

from kedro_datasets import KedroDeprecationWarning

if TYPE_CHECKING:
from ibis import BaseBackend

Expand All @@ -21,15 +24,10 @@ class TableDataset(AbstractDataset[ir.Table, ir.Table]):
cars:
type: ibis.TableDataset
filepath: data/01_raw/company/cars.csv
file_format: csv
table_name: cars
connection:
backend: duckdb
database: company.db
load_args:
sep: ","
nullstr: "#NA"
save_args:
materialized: table
Expand Down Expand Up @@ -91,12 +89,6 @@ def __init__( # noqa: PLR0913
`ibis.duckdb.connect() <https://ibis-project.org/backends/duckdb\
#ibis.duckdb.connect>`_).
If ``filepath`` and ``file_format`` are given, the corresponding
read method (e.g. `read_csv() <https://ibis-project.org/backends/\
duckdb#ibis.backends.duckdb.Backend.read_csv>`_) is used to load
the file with the backend. Note that only the data is loaded; no
link to the underlying file exists past ``TableDataset.load()``.
If ``table_name`` is given (and ``filepath`` isn't), the dataset
establishes a connection to the relevant table for the execution
backend. Therefore, Ibis doesn't fetch data on load; all compute
Expand All @@ -105,9 +97,6 @@ def __init__( # noqa: PLR0913
is saved, after running code defined across one more more nodes.
Args:
filepath: Path to a file to register as a table. Most useful
for loading data into your data warehouse (for testing).
file_format: Specifies the input file format for `filepath`.
table_name: The name of the table or view to read or create.
connection: Configuration for connecting to an Ibis backend.
load_args: Additional arguments passed to the Ibis backend's
Expand All @@ -125,6 +114,15 @@ def __init__( # noqa: PLR0913
"Must provide at least one of `filepath` or `table_name`."
)

if filepath is not None or file_format is not None:
warnings.warn(
"Use 'FileDataset' to load and save files with an Ibis "
"backend; the functionality will be removed from 'Table"
"Dataset' in Kedro-Datasets 6.0.0",
KedroDeprecationWarning,
stacklevel=2,
)

self._filepath = filepath
self._file_format = file_format
self._table_name = table_name
Expand All @@ -144,7 +142,10 @@ def __init__( # noqa: PLR0913

@property
def connection(self) -> BaseBackend:
"""The ``Backend`` instance for the connection configuration."""

def hashable(value):
"""Return a hashable key for a potentially-nested object."""
if isinstance(value, dict):
return tuple((k, hashable(v)) for k, v in sorted(value.items()))
if isinstance(value, list):
Expand Down
Loading

0 comments on commit 03be4d9

Please sign in to comment.