Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Finalize top-level API #46

Merged
merged 6 commits into from
Jul 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 14 additions & 28 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,6 @@ pip install dask-deltatable
- Parquet filters
- row filter
- partition filter
5. Query Delta commit info and history
6. API to ``vacuum`` the old / unused parquet files
7. Load different versions of data by timestamp or version.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is still true, isn't it?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


### Not supported

Expand All @@ -36,13 +33,13 @@ pip install dask-deltatable
import dask_deltatable as ddt

# read delta table
ddt.read_delta_table("delta_path")
ddt.read_deltalake("delta_path")

# with specific version
ddt.read_delta_table("delta_path", version=3)
ddt.read_deltalake("delta_path", version=3)

# with specific datetime
ddt.read_delta_table("delta_path", datetime="2018-12-19T16:39:57-08:00")
ddt.read_deltalake("delta_path", datetime="2018-12-19T16:39:57-08:00")
```

### Accessing remote file systems
Expand All @@ -54,7 +51,7 @@ or config files. For AWS, you may need `~/.aws/credential`; for gcsfs,
to configure these.

```python
ddt.read_delta_table("s3://bucket_name/delta_path", version=3)
ddt.read_deltalake("s3://bucket_name/delta_path", version=3)
```

### Accessing AWS Glue catalog
Expand All @@ -67,32 +64,21 @@ environment variables, and if those are not available, fall back to
Example:

```python
ddt.read_delta_table(catalog="glue", database_name="science", table_name="physics")
ddt.read_deltalake(catalog="glue", database_name="science", table_name="physics")
```

### Inspecting Delta Table history
### Writing to Delta Lake

One of the features of Delta Lake is preserving the history of changes, which can be is useful
for auditing and debugging. `dask-deltatable` provides APIs to read the commit info and history.
To write a Dask dataframe to Delta Lake, use `to_deltalake` method.

```python
import dask.dataframe as dd
import dask_deltatable as ddt

```python
# read delta complete history
ddt.read_delta_history("delta_path")

# read delta history upto given limit
ddt.read_delta_history("delta_path", limit=5)
df = dd.read_csv("s3://bucket_name/data.csv")
# do some processing on the dataframe...
ddt.to_deltalake(df, "s3://bucket_name/delta_path")
```

### Managing Delta Tables

Vacuuming a table will delete any files that have been marked for deletion. This
may make some past versions of a table invalid, so this can break time travel.
However, it will save storage space. Vacuum will retain files in a certain
window, by default one week, so time travel will still work in shorter ranges.

```python
# read delta history to delete the files
ddt.vacuum("delta_path", dry_run=False)
```
Writing to Delta Lake is still in development, so be aware that some features
may not work.
10 changes: 6 additions & 4 deletions dask_deltatable/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
from __future__ import annotations

__all__ = ["read_delta_history", "read_delta_table", "vacuum"]
__all__ = [
"read_deltalake",
"to_deltalake",
]

from .core import read_delta_history as read_delta_history
from .core import read_delta_table as read_delta_table
from .core import vacuum as vacuum
from .core import read_deltalake as read_deltalake
from .write import to_deltalake as to_deltalake
149 changes: 3 additions & 146 deletions dask_deltatable/core.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,14 @@
from __future__ import annotations

import json
import os
from functools import partial
from typing import Any
from urllib.parse import urlparse

import dask
import dask.dataframe as dd
import pyarrow as pa
import pyarrow.parquet as pq
from dask.base import tokenize
from dask.dataframe.utils import make_meta
from dask.delayed import delayed
from deltalake import DataCatalog, DeltaTable
from fsspec.core import get_fs_token_paths
from packaging.version import Version
Expand Down Expand Up @@ -79,75 +75,6 @@ def read_delta_dataset(self, f: str, **kwargs: dict[Any, Any]):
.to_pandas()
)

def _history_helper(self, log_file_name: str):
log = self.fs.cat(log_file_name).decode().split("\n")
for line in log:
if line:
meta_data = json.loads(line)
if "commitInfo" in meta_data:
return meta_data["commitInfo"]

def history(self, limit: int | None = None, **kwargs) -> dd.core.DataFrame:
delta_log_path = str(self.path).rstrip("/") + "/_delta_log"
log_files = self.fs.glob(f"{delta_log_path}/*.json")
if len(log_files) == 0: # pragma no cover
raise RuntimeError(f"No History (logs) found at:- {delta_log_path}/")
log_files = sorted(log_files, reverse=True)
if limit is None:
last_n_files = log_files
else:
last_n_files = log_files[:limit]
parts = [
delayed(
self._history_helper,
name="read-delta-history" + tokenize(self.fs_token, f, **kwargs),
)(f, **kwargs)
for f in list(last_n_files)
]
return dask.compute(parts)[0]

def _vacuum_helper(self, filename_to_delete: str) -> None:
full_path = urlparse(self.path)
if full_path.scheme and full_path.netloc: # pragma no cover
# for different storage backend, delta-rs vacuum gives path to the file
# it will not provide bucket name and scheme s3 or gcfs etc. so adding
# manually
filename_to_delete = (
f"{full_path.scheme}://{full_path.netloc}/{filename_to_delete}"
)
self.fs.rm_file(self.path + "/" + filename_to_delete)

def vacuum(self, retention_hours: int = 168, dry_run: bool = True) -> list[str]:
"""
Run the Vacuum command on the Delta Table: list and delete files no
longer referenced by the Delta table and are older than the
retention threshold.

retention_hours: the retention threshold in hours, if none then
the value from `configuration.deletedFileRetentionDuration` is used
or default of 1 week otherwise.
dry_run: when activated, list only the files, delete otherwise

Returns
-------
the list of files no longer referenced by the Delta Table and are
older than the retention threshold.
"""

tombstones = self.dt.vacuum(retention_hours=retention_hours)
if dry_run:
return tombstones
else:
parts = [
delayed(
self._vacuum_helper,
name="delta-vacuum"
+ tokenize(self.fs_token, f, retention_hours, dry_run),
)(f)
for f in tombstones
]
return dask.compute(parts)[0]

def get_pq_files(self, filter: Filters = None) -> list[str]:
"""
Get the list of parquet files after loading the
Expand Down Expand Up @@ -217,7 +144,7 @@ def _read_from_catalog(
return df


def read_delta_table(
def read_deltalake(
path: str | None = None,
catalog: str | None = None,
database_name: str | None = None,
Expand Down Expand Up @@ -292,7 +219,8 @@ def read_delta_table(

Examples
--------
>>> df = dd.read_delta_table('s3://bucket/my-delta-table') # doctest: +SKIP
>>> import dask_deltatable as ddt
>>> df = ddt.read_deltalake('s3://bucket/my-delta-table') # doctest: +SKIP

"""
if catalog is not None:
Expand All @@ -317,74 +245,3 @@ def read_delta_table(
)
resultdf = dtw.read_delta_table(columns=columns, **kwargs)
return resultdf


def read_delta_history(
path: str,
limit: int | None = None,
storage_options: dict[str, str] | None = None,
delta_storage_options: dict[str, str] | None = None,
) -> dd.core.DataFrame:
"""
Run the history command on the DeltaTable.
The operations are returned in reverse chronological order.

Parallely reads delta log json files using dask delayed and gathers the
list of commit_info (history)

Parameters
----------
path: str
path of Delta table directory
limit: int, default None
the commit info limit to return, defaults to return all history

Returns
-------
list of the commit infos registered in the transaction log
"""

dtw = DeltaTableWrapper(
path=path,
version=None,
columns=None,
storage_options=storage_options,
delta_storage_options=delta_storage_options,
)
return dtw.history(limit=limit)


def vacuum(
path: str,
retention_hours: int = 168,
dry_run: bool = True,
storage_options: dict[str, str] | None = None,
delta_storage_options: dict[str, str] | None = None,
) -> list[str]:
"""
Run the Vacuum command on the Delta Table: list and delete
files no longer referenced by the Delta table and are
older than the retention threshold.

retention_hours: int, default 168
the retention threshold in hours, if none then the value
from `configuration.deletedFileRetentionDuration` is used
or default of 1 week otherwise.
dry_run: bool, default True
when activated, list only the files, delete otherwise

Returns
-------
List of tombstones
i.e the list of files no longer referenced by the Delta Table
and are older than the retention threshold.
"""

dtw = DeltaTableWrapper(
path=path,
version=None,
columns=None,
storage_options=storage_options,
delta_storage_options=delta_storage_options,
)
return dtw.vacuum(retention_hours=retention_hours, dry_run=dry_run)
8 changes: 7 additions & 1 deletion dask_deltatable/write.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ def to_deltalake(
overwrite_schema: bool = False,
storage_options: dict[str, str] | None = None,
partition_filters: list[tuple[str, str, Any]] | None = None,
compute: bool = True,
):
"""Write a given dask.DataFrame to a delta table. The returned value is a Dask Scalar,
and the writing operation is only triggered when calling ``.compute()``
Expand Down Expand Up @@ -115,6 +116,8 @@ def to_deltalake(
Options passed to the native delta filesystem. Unused if 'filesystem' is defined
partition_filters : list[tuple[str, str, Any]] | None. Default None
The partition filters that will be used for partition overwrite.
compute : bool. Default True
Whether to trigger the writing operation immediately

Returns
-------
Expand Down Expand Up @@ -217,7 +220,10 @@ def to_deltalake(
)
}
graph = HighLevelGraph.from_collections(final_name, dsk, dependencies=(written,))
return Scalar(graph, final_name, "")
result = Scalar(graph, final_name, "")
if compute:
result = result.compute()
return result


def _commit(
Expand Down
Binary file removed tests/data/vacuum.zip
Binary file not shown.
Loading
Loading