Skip to content

Commit

Permalink
Naming read_deltalake / to_deltalake.
Browse files Browse the repository at this point in the history
  • Loading branch information
j-bennet committed Jul 12, 2023
1 parent 574039f commit 29dbee8
Show file tree
Hide file tree
Showing 6 changed files with 60 additions and 40 deletions.
10 changes: 5 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,13 @@ pip install dask-deltatable
import dask_deltatable as ddt

# read delta table
ddt.read_delta_table("delta_path")
ddt.read_deltalake("delta_path")

# with specific version
ddt.read_delta_table("delta_path", version=3)
ddt.read_deltalake("delta_path", version=3)

# with specific datetime
ddt.read_delta_table("delta_path", datetime="2018-12-19T16:39:57-08:00")
ddt.read_deltalake("delta_path", datetime="2018-12-19T16:39:57-08:00")
```

### Accessing remote file systems
Expand All @@ -54,7 +54,7 @@ or config files. For AWS, you may need `~/.aws/credential`; for gcsfs,
to configure these.

```python
ddt.read_delta_table("s3://bucket_name/delta_path", version=3)
ddt.read_deltalake("s3://bucket_name/delta_path", version=3)
```

### Accessing AWS Glue catalog
Expand All @@ -67,7 +67,7 @@ environment variables, and if those are not available, fall back to
Example:

```python
ddt.read_delta_table(catalog="glue", database_name="science", table_name="physics")
ddt.read_deltalake(catalog="glue", database_name="science", table_name="physics")
```

### Inspecting Delta Table history
Expand Down
11 changes: 9 additions & 2 deletions dask_deltatable/__init__.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,15 @@
from __future__ import annotations

__all__ = ["read_delta_history", "read_delta_table", "to_delta_table", "vacuum"]
__all__ = [
"read_deltalake",
"to_deltalake",
"read_delta_table",
"read_delta_history",
"vacuum",
]

from .core import read_delta_history as read_delta_history
from .core import read_delta_table as read_delta_table
from .core import read_deltalake as read_deltalake
from .core import vacuum as vacuum
from .write import to_delta_table as to_delta_table
from .write import to_deltalake as to_deltalake
17 changes: 15 additions & 2 deletions dask_deltatable/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import json
import os
import warnings
from functools import partial
from typing import Any
from urllib.parse import urlparse
Expand Down Expand Up @@ -217,7 +218,7 @@ def _read_from_catalog(
return df


def read_delta_table(
def read_deltalake(
path: str | None = None,
catalog: str | None = None,
database_name: str | None = None,
Expand Down Expand Up @@ -292,7 +293,8 @@ def read_delta_table(
Examples
--------
>>> df = dd.read_delta_table('s3://bucket/my-delta-table') # doctest: +SKIP
>>> import dask_deltatable as ddt
>>> df = ddt.read_deltalake('s3://bucket/my-delta-table') # doctest: +SKIP
"""
if catalog is not None:
Expand All @@ -319,6 +321,17 @@ def read_delta_table(
return resultdf


def read_delta_table(*args, **kwargs):
warnings.warn(
"`read_delta_table` was renamed, use `read_deltalake` instead",
category=DeprecationWarning,
)
return read_deltalake(*args, **kwargs)


read_delta_table.__doc__ = read_deltalake.__doc__


def read_delta_history(
path: str,
limit: int | None = None,
Expand Down
2 changes: 1 addition & 1 deletion dask_deltatable/write.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
from ._schema import pyarrow_to_deltalake, validate_compatible


def to_delta_table(
def to_deltalake(
table_or_uri: str | Path | DeltaTable,
df: dd.DataFrame,
*,
Expand Down
50 changes: 25 additions & 25 deletions tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,24 +69,24 @@ def vacuum_table(tmpdir):


def test_read_delta(simple_table):
df = ddt.read_delta_table(simple_table)
df = ddt.read_deltalake(simple_table)

assert df.columns.tolist() == ["id", "count", "temperature", "newColumn"]
assert df.compute().shape == (200, 4)


def test_read_delta_with_different_versions(simple_table):
print(simple_table)
df = ddt.read_delta_table(simple_table, version=0)
df = ddt.read_deltalake(simple_table, version=0)
assert df.compute().shape == (100, 3)

df = ddt.read_delta_table(simple_table, version=1)
df = ddt.read_deltalake(simple_table, version=1)
assert df.compute().shape == (200, 4)


def test_row_filter(simple_table):
# row filter
df = ddt.read_delta_table(
df = ddt.read_deltalake(
simple_table,
version=0,
filter=[("count", ">", 30)],
Expand All @@ -95,17 +95,17 @@ def test_row_filter(simple_table):


def test_different_columns(simple_table):
df = ddt.read_delta_table(simple_table, columns=["count", "temperature"])
df = ddt.read_deltalake(simple_table, columns=["count", "temperature"])
assert df.columns.tolist() == ["count", "temperature"]


def test_different_schema(simple_table):
# testing schema evolution

df = ddt.read_delta_table(simple_table, version=0)
df = ddt.read_deltalake(simple_table, version=0)
assert df.columns.tolist() == ["id", "count", "temperature"]

df = ddt.read_delta_table(simple_table, version=1)
df = ddt.read_deltalake(simple_table, version=1)
assert df.columns.tolist() == ["id", "count", "temperature", "newColumn"]


Expand All @@ -121,7 +121,7 @@ def test_different_schema(simple_table):
)
def test_partition_filter(partition_table, kwargs, shape):
"""partition filter"""
df = ddt.read_delta_table(partition_table, **kwargs)
df = ddt.read_deltalake(partition_table, **kwargs)
filter_expr = pq.filters_to_expression(kwargs["filter"])
dt = DeltaTable(partition_table, version=kwargs.get("version"))
expected_partitions = len(
Expand All @@ -132,38 +132,38 @@ def test_partition_filter(partition_table, kwargs, shape):


def test_empty(empty_table1, empty_table2):
df = ddt.read_delta_table(empty_table1, version=4)
df = ddt.read_deltalake(empty_table1, version=4)
assert df.compute().shape == (0, 2)

df = ddt.read_delta_table(empty_table1, version=0)
df = ddt.read_deltalake(empty_table1, version=0)
assert df.compute().shape == (5, 2)

with pytest.raises(RuntimeError):
# No Parquet files found
_ = ddt.read_delta_table(empty_table2)
_ = ddt.read_deltalake(empty_table2)


def test_checkpoint(checkpoint_table):
df = ddt.read_delta_table(checkpoint_table, checkpoint=0, version=4)
df = ddt.read_deltalake(checkpoint_table, checkpoint=0, version=4)
assert df.compute().shape[0] == 25

df = ddt.read_delta_table(checkpoint_table, checkpoint=10, version=12)
df = ddt.read_deltalake(checkpoint_table, checkpoint=10, version=12)
assert df.compute().shape[0] == 65

df = ddt.read_delta_table(checkpoint_table, checkpoint=20, version=22)
df = ddt.read_deltalake(checkpoint_table, checkpoint=20, version=22)
assert df.compute().shape[0] == 115

with pytest.raises(Exception):
# Parquet file with the given checkpoint 30 does not exists:
# File {checkpoint_path} not found"
_ = ddt.read_delta_table(checkpoint_table, checkpoint=30, version=33)
_ = ddt.read_deltalake(checkpoint_table, checkpoint=30, version=33)


def test_out_of_version_error(simple_table):
# Cannot time travel Delta table to version 4 , Available versions for given
# checkpoint 0 are [0,1]
with pytest.raises(Exception):
_ = ddt.read_delta_table(simple_table, version=4)
_ = ddt.read_deltalake(simple_table, version=4)


def test_load_with_datetime(simple_table2):
Expand All @@ -179,21 +179,21 @@ def test_load_with_datetime(simple_table2):
file_path = os.path.join(log_dir, file_name)
os.utime(file_path, (dt_epoch, dt_epoch))

expected = ddt.read_delta_table(simple_table2, version=0).compute()
result = ddt.read_delta_table(
expected = ddt.read_deltalake(simple_table2, version=0).compute()
result = ddt.read_deltalake(
simple_table2, datetime="2020-05-01T00:47:31-07:00"
).compute()
assert expected.equals(result)
# assert_frame_equal(expected,result)

expected = ddt.read_delta_table(simple_table2, version=1).compute()
result = ddt.read_delta_table(
expected = ddt.read_deltalake(simple_table2, version=1).compute()
result = ddt.read_deltalake(
simple_table2, datetime="2020-05-02T22:47:31-07:00"
).compute()
assert expected.equals(result)

expected = ddt.read_delta_table(simple_table2, version=4).compute()
result = ddt.read_delta_table(
expected = ddt.read_deltalake(simple_table2, version=4).compute()
result = ddt.read_deltalake(
simple_table2, datetime="2020-05-25T22:47:31-07:00"
).compute()
assert expected.equals(result)
Expand Down Expand Up @@ -243,13 +243,13 @@ def test_vacuum(vacuum_table):

def test_read_delta_with_error():
with pytest.raises(ValueError) as exc_info:
ddt.read_delta_table()
ddt.read_deltalake()
assert str(exc_info.value) == "Please Provide Delta Table path"


def test_catalog_with_error():
with pytest.raises(ValueError) as exc_info:
ddt.read_delta_table(catalog="glue")
ddt.read_deltalake(catalog="glue")
assert (
str(exc_info.value)
== "Since Catalog was provided, please provide Database and table name"
Expand All @@ -267,7 +267,7 @@ def delta_mock(**kwargs):
with patch("deltalake.DeltaTable.from_data_catalog", side_effect=delta_mock):
os.environ["AWS_ACCESS_KEY_ID"] = "apple"
os.environ["AWS_SECRET_ACCESS_KEY"] = "greatsecret"
df = ddt.read_delta_table(
df = ddt.read_deltalake(
catalog="glue", database_name="stores", table_name="orders"
)
assert df.compute().shape == (200, 3)
10 changes: 5 additions & 5 deletions tests/test_write.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
from dask.dataframe.utils import assert_eq
from dask.datasets import timeseries

from dask_deltatable import read_delta_table
from dask_deltatable.write import to_delta_table
from dask_deltatable import read_deltalake
from dask_deltatable.write import to_deltalake


@pytest.mark.parametrize(
Expand Down Expand Up @@ -48,12 +48,12 @@ def test_roundtrip(tmpdir, with_index, freq, partition_freq):
if with_index:
ddf = ddf.set_index("timestamp")

out = to_delta_table(tmpdir, ddf, compute=False)
out = to_deltalake(tmpdir, ddf)
assert not os.listdir(tmpdir)
out.compute()
assert len(os.listdir(tmpdir)) > 0

ddf_read = read_delta_table(tmpdir)
ddf_read = read_deltalake(tmpdir)
# FIXME: The index is not recovered
if with_index:
ddf = ddf.reset_index()
Expand All @@ -74,6 +74,6 @@ def test_datetime(tmpdir, unit):
ddf = dd.from_pandas(df, npartitions=2)
out = to_deltalake(tmpdir, ddf)
out.compute()
ddf_read = read_delta_table(tmpdir)
ddf_read = read_deltalake(tmpdir)
# arrow reads back with ns
assert ddf_read.ts.dtype == "datetime64[ns]"

0 comments on commit 29dbee8

Please sign in to comment.