From 5a5bed1ff7563262f5aa9818e7be20838dee1ac3 Mon Sep 17 00:00:00 2001 From: Irina Truong Date: Mon, 10 Jul 2023 15:27:59 -0700 Subject: [PATCH 1/4] Readme improvements. --- README.md | 91 ++++++++++++++++++++++++++++++++++++++----------------- 1 file changed, 63 insertions(+), 28 deletions(-) diff --git a/README.md b/README.md index 68aa5cb..ce62b9c 100644 --- a/README.md +++ b/README.md @@ -1,58 +1,93 @@ -## Dask Deltatable Reader +## Dask-DeltaTable -Reads a Delta Table from directory using Dask engine. +Reading and writing Delta Tables using Dask engine. -To Try out the package: +### Installation + +To install the package: ``` pip install dask-deltatable ``` ### Features: -1. Reads the parquet files based on delta logs parallely using dask engine -2. Supports all three filesystem like s3, azurefs, gcsfs -3. Supports some delta features like + +1. Read the parquet files in parallel using Dask engine +2. Write Dask dataframes to Delta Lake (limited support) +3. Supports multiple filesystems (s3, azurefs, gcsfs) +4. Subset of Delta Lake features: - Time Travel - Schema evolution - - parquet filters + - Parquet filters - row filter - partition filter -4. Query Delta commit info - History -5. vacuum the old/ unused parquet files -6. load different versions of data using datetime. +5. Query Delta commit info and history +6. API to ``vacuum`` the old / unused parquet files +7. Load different versions of data by timestamp or version. -### Usage: +### Not supported -``` +1. Writing to Delta Table is still in development. +2. `optimize` API to run a bin-packing operation on a Delta Table. + +### Reading Delta Tables + +```python import dask_deltatable as ddt # read delta table ddt.read_delta_table("delta_path") -# read delta table for specific version -ddt.read_delta_table("delta_path",version=3) +# with specific version +ddt.read_delta_table("delta_path", version=3) -# read delta table for specific datetime -ddt.read_delta_table("delta_path",datetime="2018-12-19T16:39:57-08:00") +# with specific datetime +ddt.read_delta_table("delta_path", datetime="2018-12-19T16:39:57-08:00") +``` + +### Accessing remote file systems + +To be able to read from S3, azure, gcsfs, and other remote filesystems, +you ensure the credentials are properly configured in environment variables +or config files. For AWS, you may need `~/.aws/credential`; for gcsfs, +`GOOGLE_APPLICATION_CREDENTIALS`. Refer to your cloud provider documentation +to configure these. + +```python +ddt.read_delta_table("s3://bucket_name/delta_path", version=3) +``` +### Accessing AWS Glue catalog +`dask-deltatable` can connect to AWS Glue catalog to read the delta table. +The method will look for `AWS_ACCESS_KEY_ID` and `AWS_SECRET_ACCESS_KEY` +environment variables, and if those are not available, fall back to +`~/.aws/credentials`. + +Example: + +```python +ddt.read_delta_table(catalog="glue", database_name="science", table_name="physics") +``` + +### Inspecting Delta Table history + +```python # read delta complete history ddt.read_delta_history("delta_path") # read delta history upto given limit -ddt.read_delta_history("delta_path",limit=5) - -# read delta history to delete the files -ddt.vacuum("delta_path",dry_run=False) +ddt.read_delta_history("delta_path", limit=5) +``` -# Can read from S3,azure,gcfs etc. -ddt.read_delta_table("s3://bucket_name/delta_path",version=3) -# please ensure the credentials are properly configured as environment variable or -# configured as in ~/.aws/credential +### Managing Delta Tables -# can connect with AWS Glue catalog and read the complete delta table (currently only AWS catalog available) -# will take expilicit AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY from environment -# variables if available otherwise fallback to ~/.aws/credential -ddt.read_delta_table(catalog=glue,database_name="science",table_name="physics") +Vacuuming a table will delete any files that have been marked for deletion. This +may make some past versions of a table invalid, so this can break time travel. +However, it will save storage space. Vacuum will retain files in a certain +window, by default one week, so time travel will still work in shorter ranges. +```python +# read delta history to delete the files +ddt.vacuum("delta_path", dry_run=False) ``` From f822b157e04066a160d8ac94a3edf806ac8bc1ff Mon Sep 17 00:00:00 2001 From: Irina Truong Date: Tue, 11 Jul 2023 09:48:53 -0700 Subject: [PATCH 2/4] More Delta Lake keywords. --- README.md | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index ce62b9c..71e2944 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ ## Dask-DeltaTable -Reading and writing Delta Tables using Dask engine. +Reading and writing to Delta Lake using Dask engine. ### Installation @@ -12,7 +12,7 @@ pip install dask-deltatable ### Features: -1. Read the parquet files in parallel using Dask engine +1. Read the parquet files from Delta Lake and parallelize with Dask 2. Write Dask dataframes to Delta Lake (limited support) 3. Supports multiple filesystems (s3, azurefs, gcsfs) 4. Subset of Delta Lake features: @@ -27,10 +27,10 @@ pip install dask-deltatable ### Not supported -1. Writing to Delta Table is still in development. +1. Writing to Delta Lake is still in development. 2. `optimize` API to run a bin-packing operation on a Delta Table. -### Reading Delta Tables +### Reading from Delta Lake ```python import dask_deltatable as ddt @@ -72,6 +72,11 @@ ddt.read_delta_table(catalog="glue", database_name="science", table_name="physic ### Inspecting Delta Table history +One of the features of Delta Lake is preserving the history of changes, which can be is useful +for auditing and debugging. `dask-deltatable` provides APIs to read the commit info and history. + +```python + ```python # read delta complete history ddt.read_delta_history("delta_path") From 69555cdf933df569074cf80fe46da77880d006ca Mon Sep 17 00:00:00 2001 From: Florian Jetter Date: Tue, 11 Jul 2023 13:18:46 +0200 Subject: [PATCH 3/4] Drop python 3.8 (#35) --- .github/workflows/deploy.yaml | 4 ++-- .github/workflows/tests.yaml | 8 +++++--- .pre-commit-config.yaml | 2 +- continous_integeration/environment-3.8.yaml | 9 --------- dask_deltatable/_schema.py | 2 +- dask_deltatable/write.py | 3 ++- setup.py | 2 +- 7 files changed, 12 insertions(+), 18 deletions(-) delete mode 100644 continous_integeration/environment-3.8.yaml diff --git a/.github/workflows/deploy.yaml b/.github/workflows/deploy.yaml index 329936c..374a5cb 100644 --- a/.github/workflows/deploy.yaml +++ b/.github/workflows/deploy.yaml @@ -2,7 +2,7 @@ name: Upload Python Package on: release: - types: [created,updated] + types: [created, updated] jobs: deploy: @@ -15,7 +15,7 @@ jobs: with: miniforge-variant: Mambaforge use-mamba: true - python-version: 3.8 + python-version: 3.9 - name: Install dependencies shell: bash -l {0} run: | diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml index 7e9596b..8e482b1 100644 --- a/.github/workflows/tests.yaml +++ b/.github/workflows/tests.yaml @@ -5,7 +5,7 @@ on: [push, pull_request] # When this workflow is queued, automatically cancel any previous running # or pending jobs from the same branch concurrency: - group: ${{ github.ref }} + group: tests-${{ github.ref }} cancel-in-progress: true jobs: @@ -15,10 +15,9 @@ jobs: run: shell: bash -l {0} strategy: - fail-fast: false matrix: os: ["windows-latest", "ubuntu-latest", "macos-latest"] - python-version: ["3.8", "3.9", "3.10", "3.11"] + python-version: ["3.9", "3.10", "3.11"] steps: - name: Checkout source @@ -39,6 +38,9 @@ jobs: - name: Install dask-deltatable run: python -m pip install -e ".[dev]" + - name: conda list + run: conda list + - name: Run tests id: run_tests run: | diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 9f691b6..87b52e8 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -20,7 +20,7 @@ repos: hooks: - id: pyupgrade args: - - --py38-plus + - --py39-plus - repo: https://github.com/pre-commit/mirrors-mypy rev: v1.3.0 hooks: diff --git a/continous_integeration/environment-3.8.yaml b/continous_integeration/environment-3.8.yaml deleted file mode 100644 index c3ad542..0000000 --- a/continous_integeration/environment-3.8.yaml +++ /dev/null @@ -1,9 +0,0 @@ -name: test-environment -channels: - - conda-forge -dependencies: - - python=3.8 - - dask - - pyarrow - - pytest - - pytest-cov diff --git a/dask_deltatable/_schema.py b/dask_deltatable/_schema.py index c85d7d5..0e9ac7b 100644 --- a/dask_deltatable/_schema.py +++ b/dask_deltatable/_schema.py @@ -40,8 +40,8 @@ import json import logging import pprint +from collections.abc import Iterable from copy import deepcopy -from typing import Iterable import pandas as pd import pyarrow as pa diff --git a/dask_deltatable/write.py b/dask_deltatable/write.py index 80bdeeb..9cc4678 100644 --- a/dask_deltatable/write.py +++ b/dask_deltatable/write.py @@ -2,9 +2,10 @@ import json import uuid +from collections.abc import Mapping from datetime import datetime from pathlib import Path -from typing import Any, Literal, Mapping +from typing import Any, Literal import dask.dataframe as dd import pyarrow as pa diff --git a/setup.py b/setup.py index 5842ef7..3b72630 100644 --- a/setup.py +++ b/setup.py @@ -17,7 +17,7 @@ packages=["dask_deltatable"], long_description=long_description, long_description_content_type="text/markdown", - python_requires=">=3.8", + python_requires=">=3.9", install_requires=open("requirements.txt").read().strip().split("\n"), extras_require={ "dev": ["pytest", "requests", "pytest-cov>=2.10.1"], From 0a5a55b5aea62dcbae6fb7dcbc6ea6193662a824 Mon Sep 17 00:00:00 2001 From: Irina Truong Date: Tue, 11 Jul 2023 04:59:07 -0700 Subject: [PATCH 4/4] Propagate `filter` to `file_uris` to skip partitions (#30) --- dask_deltatable/core.py | 48 ++++++++++++++++++++++-------- dask_deltatable/types.py | 6 ++++ dask_deltatable/utils.py | 64 ++++++++++++++++++++++++++++++++++++++++ pyproject.toml | 8 +++++ tests/test_core.py | 29 +++++++++++++----- tests/test_utils.py | 33 +++++++++++++++++++++ 6 files changed, 167 insertions(+), 21 deletions(-) create mode 100644 dask_deltatable/types.py create mode 100644 dask_deltatable/utils.py create mode 100644 tests/test_utils.py diff --git a/dask_deltatable/core.py b/dask_deltatable/core.py index 7f080bb..48c241a 100644 --- a/dask_deltatable/core.py +++ b/dask_deltatable/core.py @@ -8,14 +8,25 @@ import dask import dask.dataframe as dd +import pyarrow as pa import pyarrow.parquet as pq from dask.base import tokenize from dask.dataframe.utils import make_meta from dask.delayed import delayed from deltalake import DataCatalog, DeltaTable from fsspec.core import get_fs_token_paths +from packaging.version import Version from pyarrow import dataset as pa_ds +from .types import Filters +from .utils import get_partition_filters + +if Version(pa.__version__) >= Version("10.0.0"): + filters_to_expression = pq.filters_to_expression +else: + # fallback to older internal method + filters_to_expression = pq._filters_to_expression + class DeltaTableWrapper: path: str @@ -55,14 +66,7 @@ def __init__( def read_delta_dataset(self, f: str, **kwargs: dict[Any, Any]): schema = kwargs.pop("schema", None) or self.schema filter = kwargs.pop("filter", None) - if filter: - try: - filter_expression = pq.filters_to_expression(filter) - except AttributeError: - # fallback to older internal method - filter_expression = pq._filters_to_expression(filter) - else: - filter_expression = None + filter_expression = filters_to_expression(filter) if filter else None return ( pa_ds.dataset( source=f, @@ -144,22 +148,40 @@ def vacuum(self, retention_hours: int = 168, dry_run: bool = True) -> list[str]: ] return dask.compute(parts)[0] - def get_pq_files(self) -> list[str]: + def get_pq_files(self, filter: Filters = None) -> list[str]: """ - get the list of parquet files after loading the + Get the list of parquet files after loading the current datetime version + + Parameters + ---------- + filter : list[tuple[str, str, Any]] | list[list[tuple[str, str, Any]]] | None + Filters in DNF form. + + Returns + ------- + list[str] + List of files matching optional filter. """ __doc__ == self.dt.load_with_datetime.__doc__ - if self.datetime is not None: self.dt.load_with_datetime(self.datetime) - return self.dt.file_uris() + partition_filters = get_partition_filters( + self.dt.metadata().partition_columns, filter + ) + if not partition_filters: + # can't filter + return self.dt.file_uris() + file_uris = set() + for filter_set in partition_filters: + file_uris.update(self.dt.file_uris(partition_filters=filter_set)) + return sorted(list(file_uris)) def read_delta_table(self, **kwargs) -> dd.core.DataFrame: """ Reads the list of parquet files in parallel """ - pq_files = self.get_pq_files() + pq_files = self.get_pq_files(filter=kwargs.get("filter", None)) if len(pq_files) == 0: raise RuntimeError("No Parquet files are available") diff --git a/dask_deltatable/types.py b/dask_deltatable/types.py new file mode 100644 index 0000000..ae74c6e --- /dev/null +++ b/dask_deltatable/types.py @@ -0,0 +1,6 @@ +from __future__ import annotations + +from typing import Any, List, Tuple, Union + +Filter = Tuple[str, str, Any] +Filters = Union[List[Filter], List[List[Filter]], None] diff --git a/dask_deltatable/utils.py b/dask_deltatable/utils.py new file mode 100644 index 0000000..58bd0d1 --- /dev/null +++ b/dask_deltatable/utils.py @@ -0,0 +1,64 @@ +from __future__ import annotations + +from typing import List, cast + +from .types import Filter, Filters + + +def get_partition_filters( + partition_columns: list[str], filters: Filters +) -> list[list[Filter]] | None: + """Retrieve only filters on partition columns. If there are any row filters in the outer + list (the OR list), return None, because we have to search through all partitions to apply + row filters + + Parameters + ---------- + partition_columns : List[str] + List of partitioned columns + + filters : List[Tuple[str, str, Any]] | List[List[Tuple[str, str, Any]]] + List of filters. Examples: + 1) (x == a) and (y == 3): + [("x", "==", "a"), ("y", "==", 3)] + 2) (x == a) or (y == 3) + [[("x", "==", "a")], [("y", "==", 3)]] + + Returns + ------- + List[List[Tuple[str, str, Any]]] | None + List of partition filters, None if we can't apply a filter on partitions because + row filters are present + """ + if filters is None or len(filters) == 0: + return None + + if isinstance(filters[0][0], str): + filters = cast(List[List[Filter]], [filters]) + + allowed_ops = { + "=": "=", + "==": "=", + "!=": "!=", + "!==": "!=", + "in": "in", + "not in": "not in", + ">": ">", + "<": "<", + ">=": ">=", + "<=": "<=", + } + + expressions = [] + for disjunction in filters: + inner_expressions = [] + for col, op, val in disjunction: + if col in partition_columns: + normalized_op = allowed_ops[op] + inner_expressions.append((col, normalized_op, val)) + if inner_expressions: + expressions.append(inner_expressions) + else: + return None + + return expressions if expressions else None diff --git a/pyproject.toml b/pyproject.toml index 2565cd9..0f0abab 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -7,6 +7,14 @@ warn_return_any = false disallow_untyped_calls = false ignore_missing_imports = true +[[tool.mypy.overrides]] +module = "pyarrow.*" +ignore_missing_imports = true + [tool.isort] profile = "black" add_imports = ["from __future__ import annotations"] + +[tool.black] +target-version = ['py38'] +include = '\.pyi?$' diff --git a/tests/test_core.py b/tests/test_core.py index 47108e9..25a5269 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -5,7 +5,9 @@ import zipfile from unittest.mock import MagicMock, patch +import pyarrow.parquet as pq import pytest +from deltalake import DeltaTable import dask_deltatable as ddt @@ -107,15 +109,26 @@ def test_different_schema(simple_table): assert df.columns.tolist() == ["id", "count", "temperature", "newColumn"] -def test_partition_filter(partition_table): - # partition filter - df = ddt.read_delta_table(partition_table, version=0, filter=[("col1", "==", 1)]) - assert df.compute().shape == (21, 3) - - df = ddt.read_delta_table( - partition_table, filter=[[("col1", "==", 1)], [("col1", "==", 2)]] +@pytest.mark.parametrize( + "kwargs,shape", + [ + (dict(version=0, filter=[("col1", "==", 1)]), (21, 3)), + (dict(filter=[("col1", "==", 1), ("col2", "<", 0.5)]), (11, 4)), + (dict(filter=[[("col1", "==", 1)], [("col1", "==", 2)]]), (39, 4)), + (dict(filter=[("col1", "!=", 1), ("id", "<", 5)]), (6, 4)), + (dict(filter=[[("col1", "!=", 1)], [("id", "<", 5)]]), (99, 4)), + ], +) +def test_partition_filter(partition_table, kwargs, shape): + """partition filter""" + df = ddt.read_delta_table(partition_table, **kwargs) + filter_expr = pq.filters_to_expression(kwargs["filter"]) + dt = DeltaTable(partition_table, version=kwargs.get("version")) + expected_partitions = len( + list(dt.to_pyarrow_dataset().get_fragments(filter=filter_expr)) ) - assert df.compute().shape == (39, 4) + assert df.npartitions == expected_partitions + assert df.compute().shape == shape def test_empty(empty_table1, empty_table2): diff --git a/tests/test_utils.py b/tests/test_utils.py new file mode 100644 index 0000000..d8b49dd --- /dev/null +++ b/tests/test_utils.py @@ -0,0 +1,33 @@ +from __future__ import annotations + +import pytest + +from dask_deltatable.utils import get_partition_filters + + +@pytest.mark.parametrize( + "cols,filters,expected", + [ + [[], None, None], + [[], [("part", ">", "a")], None], + [["part"], [("part", ">", "a"), ("x", "==", 1)], [[("part", ">", "a")]]], + [["part"], [[("part", ">", "a")], [("x", "==", 1)]], None], + [ + ["m", "d"], + [("m", ">", 5), ("d", "=", 1), ("x", "==", "a")], + [[("m", ">", 5), ("d", "=", 1)]], + ], + [ + ["m", "d"], + [[("m", ">", 5)], [("d", "=", 1)], [("x", "==", "a")]], + None, + ], + ], +) +def test_partition_filters(cols, filters, expected): + res = get_partition_filters(cols, filters) + assert res == expected + if isinstance(filters, list): + # make sure it works with additional level of wrapping + res = get_partition_filters(cols, filters) + assert res == expected