From 75fee25835103aaa365e673f0cf519ba34c5e4db Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20R=C3=BCbenach?= Date: Tue, 25 Jul 2023 19:41:07 +0200 Subject: [PATCH 1/8] Fix error when using from_awkward on an empty array (#330) --------- Co-authored-by: Doug Davis --- src/dask_awkward/lib/io/io.py | 7 +++++-- tests/test_io.py | 16 ++++++++++++++++ 2 files changed, 21 insertions(+), 2 deletions(-) diff --git a/src/dask_awkward/lib/io/io.py b/src/dask_awkward/lib/io/io.py index c47bec447..dde47983f 100644 --- a/src/dask_awkward/lib/io/io.py +++ b/src/dask_awkward/lib/io/io.py @@ -89,8 +89,11 @@ def from_awkward( """ nrows = len(source) - chunksize = int(math.ceil(nrows / npartitions)) - locs = list(range(0, nrows, chunksize)) + [nrows] + if nrows == 0: + locs = [None, None] + else: + chunksize = int(math.ceil(nrows / npartitions)) + locs = list(range(0, nrows, chunksize)) + [nrows] starts = locs[:-1] stops = locs[1:] meta = typetracer_array(source) diff --git a/tests/test_io.py b/tests/test_io.py index 1ce6a8735..f8a928ddd 100644 --- a/tests/test_io.py +++ b/tests/test_io.py @@ -394,3 +394,19 @@ def test_to_dataframe_str( df = ak.to_dataframe(caa) assert_eq(dd, df, check_index=False) + + +def test_from_awkward_empty_array(daa) -> None: + # no form + c1 = ak.Array([]) + assert len(c1) == 0 + a1 = dak.from_awkward(c1, npartitions=1) + assert_eq(a1, c1) + assert len(a1) == 0 + + # with a form + c2 = ak.Array(daa.layout.form.length_zero_array(highlevel=False)) + assert len(c2) == 0 + a2 = dak.from_awkward(c2, npartitions=1) + assert len(a2) == 0 + daa.layout.form == a2.layout.form From 0c3493dc5f683d48cc41fae16a8137c9351528e9 Mon Sep 17 00:00:00 2001 From: Doug Davis Date: Tue, 25 Jul 2023 12:52:57 -0500 Subject: [PATCH 2/8] add top level imports (distributed will import things now) (#331) --- src/dask_awkward/__init__.py | 102 +++++++++++++++++++++++++++++------ 1 file changed, 86 insertions(+), 16 deletions(-) diff --git a/src/dask_awkward/__init__.py b/src/dask_awkward/__init__.py index 3b811131c..eb7db140f 100644 --- a/src/dask_awkward/__init__.py +++ b/src/dask_awkward/__init__.py @@ -1,19 +1,89 @@ from dask_awkward import config # isort:skip; load awkward config - +from dask_awkward.lib.core import Array, PartitionCompatibility, Record, Scalar +from dask_awkward.lib.core import _type as type +from dask_awkward.lib.core import ( + compatible_partitions, + map_partitions, + partition_compatibility, + typetracer_from_form, +) +from dask_awkward.lib.describe import fields +from dask_awkward.lib.inspect import necessary_columns, sample +from dask_awkward.lib.io.io import ( + ImplementsFormTransformation, + from_awkward, + from_dask_array, + from_delayed, + from_lists, + from_map, + to_dask_array, + to_dask_bag, + to_dataframe, + to_delayed, +) +from dask_awkward.lib.io.json import from_json, to_json +from dask_awkward.lib.io.parquet import from_parquet, to_parquet +from dask_awkward.lib.operations import concatenate +from dask_awkward.lib.reducers import ( + all, + any, + argmax, + argmin, + corr, + count, + count_nonzero, + covar, + linear_fit, + max, + mean, + min, + moment, + prod, + ptp, + softmax, + std, + sum, + var, +) +from dask_awkward.lib.structure import ( + argcartesian, + argcombinations, + argsort, + broadcast_arrays, + cartesian, + combinations, + copy, + drop_none, + fill_none, + firsts, + flatten, + from_regular, + full_like, + is_none, + isclose, + local_index, + mask, + nan_to_num, + num, + ones_like, + pad_none, + ravel, + run_lengths, + singletons, + sort, + strings_astype, + to_packed, + to_regular, + unflatten, + unzip, + values_astype, + where, + with_field, + with_name, + with_parameter, + without_parameters, + zeros_like, + zip, +) from dask_awkward.version import __version__ - - -def __getattr__(value): - import dask_awkward.lib - - return getattr(dask_awkward.lib, value) - - -original = dir() - - -def __dir__(): - import dask_awkward.lib # pragma: no cover - - return original + dir(dask_awkward.lib) # pragma: no cover From 84e0546b6f5710d4fb74c716d7825adf44b9af4e Mon Sep 17 00:00:00 2001 From: Doug Davis Date: Thu, 27 Jul 2023 11:27:58 -0500 Subject: [PATCH 3/8] fix dak.num(..., axis=0) (#333) --- src/dask_awkward/lib/structure.py | 38 +++++++++++++++++++++++++++---- tests/test_structure.py | 9 ++++---- 2 files changed, 37 insertions(+), 10 deletions(-) diff --git a/src/dask_awkward/lib/structure.py b/src/dask_awkward/lib/structure.py index 3d576cf72..780753ba3 100644 --- a/src/dask_awkward/lib/structure.py +++ b/src/dask_awkward/lib/structure.py @@ -7,12 +7,18 @@ from typing import TYPE_CHECKING, Any import awkward as ak -from dask.base import is_dask_collection +import numpy as np +from awkward._nplikes.typetracer import TypeTracerArray +from dask.base import is_dask_collection, tokenize +from dask.highlevelgraph import HighLevelGraph +from dask_awkward.layers import AwkwardMaterializedLayer from dask_awkward.lib.core import ( Array, PartitionCompatibility, map_partitions, + new_known_scalar, + new_scalar_object, partition_compatibility, ) from dask_awkward.utils import ( @@ -562,6 +568,10 @@ def nan_to_num( raise DaskAwkwardNotImplemented("TODO") +def _numaxis0(*integers): + return np.sum(np.array(integers)) + + @borrow_docstring(ak.num) def num( array: Any, @@ -571,7 +581,28 @@ def num( ) -> Any: if not highlevel: raise ValueError("Only highlevel=True is supported") - if axis and axis != 0: + if axis == 0 or axis == -1 * array.ndim: + if array.known_divisions: + return new_known_scalar(array.defined_divisions[-1], label="num") + + per_axis = map_partitions( + ak.num, + array, + axis=0, + meta=ak.Array(ak.Array([1, 1]).layout.to_typetracer(forget_length=True)), + ) + name = f"numaxis0-{tokenize(array, axis)}" + keys = per_axis.__dask_keys__() + matlayer = AwkwardMaterializedLayer( + {(name, 0): (_numaxis0, *keys)}, previous_layer_names=[per_axis.name] + ) + hlg = HighLevelGraph.from_collections(name, matlayer, dependencies=(per_axis,)) + return new_scalar_object( + hlg, + name, + meta=TypeTracerArray._new(dtype=np.int64, shape=()), + ) + else: return map_partitions( ak.num, array, @@ -580,9 +611,6 @@ def num( behavior=behavior, output_divisions=1, ) - if axis == 0: - return len(array) - raise DaskAwkwardNotImplemented("TODO") @borrow_docstring(ak.ones_like) diff --git a/tests/test_structure.py b/tests/test_structure.py index 296d84c55..afb359dc3 100644 --- a/tests/test_structure.py +++ b/tests/test_structure.py @@ -18,22 +18,21 @@ def test_flatten(caa: ak.Array, daa: dak.Array, axis: int | None) -> None: assert_eq(cr, dr) -@pytest.mark.parametrize("axis", [0, 1, -1]) -def test_num(caa: ak.Array, daa: dak.Array, axis: int | None) -> None: +@pytest.mark.parametrize("axis", [0, 1, -1, -2]) +def test_num(caa: ak.Array, daa: dak.Array, axis: int) -> None: da = daa["points"] ca = caa["points"] if axis == 0: assert_eq(dak.num(da.x, axis=axis), ak.num(ca.x, axis=axis)) da.eager_compute_divisions() - - assert_eq(dak.num(da.x, axis=axis), ak.num(ca.x, axis=axis)) - if axis == 1: c1 = dak.num(da.x, axis=axis) > 2 c2 = ak.num(ca.x, axis=axis) > 2 assert_eq(da[c1], ca[c2]) + assert_eq(dak.num(da.x, axis=axis), ak.num(ca.x, axis=axis)) + def test_zip_dict_input(caa: ak.Array, daa: dak.Array) -> None: da1 = daa["points"]["x"] From a61df67c85ca0ee5d87790e5933bd19e6c290d84 Mon Sep 17 00:00:00 2001 From: Doug Davis Date: Thu, 27 Jul 2023 11:38:08 -0500 Subject: [PATCH 4/8] sync from_parquet function arguments with upstream awkward (#328) --- src/dask_awkward/lib/io/parquet.py | 211 ++++++++++++++++------------- 1 file changed, 117 insertions(+), 94 deletions(-) diff --git a/src/dask_awkward/lib/io/parquet.py b/src/dask_awkward/lib/io/parquet.py index aad3936c1..ad9163b22 100644 --- a/src/dask_awkward/lib/io/parquet.py +++ b/src/dask_awkward/lib/io/parquet.py @@ -8,15 +8,15 @@ from typing import Any, Sequence import awkward as ak -import fsspec +import awkward.operations.ak_from_parquet as ak_from_parquet +import awkward.operations.ak_to_arrow_table as ak_to_arrow_table from awkward.forms.form import Form -from awkward.operations import ak_from_parquet, to_arrow_table from awkward.operations.ak_from_parquet import _load from dask.base import tokenize from dask.blockwise import BlockIndex from dask.highlevelgraph import HighLevelGraph from fsspec import AbstractFileSystem -from fsspec.core import get_fs_token_paths +from fsspec.core import get_fs_token_paths, url_to_fs from dask_awkward.lib.core import ( Array, @@ -40,6 +40,8 @@ def __init__( listsep: str = "list.item", unnamed_root: bool = False, original_form: Form | None = None, + behavior: dict | None = None, + **kwargs: Any, ) -> None: self.fs = fs self.form = form @@ -49,6 +51,8 @@ def __init__( if self.unnamed_root: self.columns = [f".{c}" for c in self.columns] self.original_form = original_form + self.behavior = behavior + self.kwargs = kwargs @abc.abstractmethod def __call__(self, source: Any) -> ak.Array: @@ -69,7 +73,11 @@ def __repr__(self) -> str: f" listsep={self.listsep}\n" f" unnamed_root={self.unnamed_root}\n" f" columns={self.columns}\n" + f" behavior={self.behavior}\n" ) + for key, val in self.kwargs.items(): + s += f" {key}={val}\n" + s = f"{s})" return s def __str__(self) -> str: @@ -85,6 +93,8 @@ def __init__( listsep: str = "list.item", unnamed_root: bool = False, original_form: Form | None = None, + behavior: dict | None = None, + **kwargs: Any, ) -> None: super().__init__( fs=fs, @@ -92,14 +102,20 @@ def __init__( listsep=listsep, unnamed_root=unnamed_root, original_form=original_form, + behavior=behavior, + **kwargs, ) def __call__(self, source: Any) -> Any: - array = _file_to_partition( - source, - self.fs, - self.columns, - self.form, + array = _load( + [source], + parquet_columns=self.columns, + subrg=[None], + subform=self.form, + highlevel=True, + fs=self.fs, + behavior=self.behavior, + **self.kwargs, ) return ak.Array(unproject_layout(self.original_form, array.layout)) @@ -110,7 +126,6 @@ def project_columns( ) -> _FromParquetFileWiseFn: if columns is None: return self - new_form = self.form.select_columns(columns) new = _FromParquetFileWiseFn( fs=self.fs, @@ -118,13 +133,9 @@ def project_columns( listsep=self.listsep, unnamed_root=self.unnamed_root, original_form=original_form, + behavior=self.behavior, + **self.kwargs, ) - - log.debug(f"project_columns received: {columns}") - log.debug(f"new form is {repr(new_form)}") - log.debug(f"new form columns are: {new_form.columns(self.listsep)}") - log.debug(new) - return new @@ -137,6 +148,8 @@ def __init__( listsep: str = "list.item", unnamed_root: bool = False, original_form: Form | None = None, + behavior: dict | None = None, + **kwargs: Any, ) -> None: super().__init__( fs=fs, @@ -144,19 +157,31 @@ def __init__( listsep=listsep, unnamed_root=unnamed_root, original_form=original_form, + behavior=behavior, + **kwargs, ) def __call__(self, pair: Any) -> ak.Array: subrg, source = pair if isinstance(subrg, int): subrg = [[subrg]] - array = _file_to_partition( - source, - self.fs, - self.columns, - self.form, + array = _load( + [source], + parquet_columns=self.columns, subrg=subrg, + subform=self.form, + highlevel=True, + fs=self.fs, + behavior=self.behavior, + **self.kwargs, ) + # array = _file_to_partition( + # source, + # self.fs, + # self.columns, + # self.form, + # subrg=subrg, + # ) return ak.Array(unproject_layout(self.original_form, array.layout)) def project_columns( @@ -171,57 +196,58 @@ def project_columns( form=self.form.select_columns(columns), unnamed_root=self.unnamed_root, original_form=original_form, + behavior=self.behavior, + **self.kwargs, ) def from_parquet( - path: Any, - storage_options: dict | None = None, + path: str | list[str], + *, + columns: str | list[str] | None = None, + storage_options: dict[str, Any] | None = None, + max_gap: int = 64_000, + max_block: int = 256_000_000, + footer_sample_size: int = 1_000_000, + generate_bitmasks: bool = False, + highlevel: bool = True, + behavior: dict | None = None, ignore_metadata: bool = True, scan_files: bool = False, - columns: Sequence[str] | None = None, - filters: Any | None = None, - split_row_groups: Any | None = None, + split_row_groups: bool = False, ) -> Array: - """Create an Array collection from a Parquet dataset. + if not highlevel: + raise ValueError("dask-awkward only supports highlevel=True") - Parameters - ---------- - url : str - Location of data, including protocol (e.g. ``s3://``) - storage_options : dict - For creating filesystem (see ``fsspec`` documentation). - ignore_metadata : bool - Ignore parquet metadata associated with the input dataset (the - ``_metadata`` file). - scan_files : bool - TBD - columns : list[str], optional - Select columns to load - filters : list[list[tuple]], optional - Parquet-style filters for excluding row groups based on column statistics - split_row_groups: bool, optional - If True, each row group becomes a partition. If False, each - file becomes a partition. If None, the existence of a - ``_metadata`` file and ignore_metadata=False implies True, - else False. - - Returns - ------- - Array - Array collection from the parquet dataset. - - """ - fs, tok, paths = get_fs_token_paths( - path, mode="rb", storage_options=storage_options + fs, token, paths = get_fs_token_paths( + path, + mode="rb", + storage_options=storage_options, ) label = "from-parquet" token = tokenize( - tok, paths, ignore_metadata, columns, filters, scan_files, split_row_groups + token, + paths, + columns, + max_gap, + max_block, + footer_sample_size, + generate_bitmasks, + behavior, + ignore_metadata, + scan_files, + split_row_groups, ) - # same as ak_metadata_from_parquet - results = ak_from_parquet.metadata( + ( + parquet_columns, + subform, + actual_paths, + fs, + subrg, + row_counts, + metadata, + ) = ak_from_parquet.metadata( path, storage_options, row_groups=None, @@ -229,7 +255,6 @@ def from_parquet( ignore_metadata=ignore_metadata, scan_files=scan_files, ) - parquet_columns, subform, actual_paths, fs, subrg, row_counts, metadata = results listsep = "list.item" unnamed_root = False @@ -244,18 +269,27 @@ def from_parquet( split_row_groups = row_counts is not None and len(row_counts) > 1 meta = ak.Array( - subform.length_zero_array(highlevel=False).to_typetracer(forget_length=True) + subform.length_zero_array(highlevel=False).to_typetracer(forget_length=True), + behavior=behavior, ) if split_row_groups is False or subrg is None: # file-wise + + fn = _FromParquetFileWiseFn( + fs=fs, + form=subform, + listsep=listsep, + unnamed_root=unnamed_root, + max_gap=max_gap, + max_block=max_block, + footer_sample_size=footer_sample_size, + generate_bitmasks=generate_bitmasks, + behavior=behavior, + ) + return from_map( - _FromParquetFileWiseFn( - fs=fs, - form=subform, - listsep=listsep, - unnamed_root=unnamed_root, - ), + fn, actual_paths, label=label, token=token, @@ -263,7 +297,6 @@ def from_parquet( ) else: # row-group wise - if set(subrg) == {None}: rgs_paths = {path: 0 for path in actual_paths} for i in range(metadata.num_row_groups): @@ -283,13 +316,21 @@ def from_parquet( for isubrg, path in zip(subrg, actual_paths): pairs.extend([(irg, path) for irg in isubrg]) + + fn = _FromParquetFragmentWiseFn( + fs=fs, + form=subform, + listsep=listsep, + unnamed_root=unnamed_root, + max_gap=max_gap, + max_block=max_block, + footer_sample_size=footer_sample_size, + generate_bitmasks=generate_bitmasks, + behavior=behavior, + ) + return from_map( - _FromParquetFragmentWiseFn( - fs=fs, - form=subform, - listsep=listsep, - unnamed_root=unnamed_root, - ), + fn, pairs, label=label, token=token, @@ -298,24 +339,6 @@ def from_parquet( ) -def _file_to_partition(path, fs, columns, form, subrg=None): - """read a whole parquet file to awkward""" - return _load( - actual_paths=[path], - fs=fs, - parquet_columns=columns, - subrg=subrg or [None], - footer_sample_size=2**15, - max_gap=2**10, - max_block=2**22, - generate_bitmasks=False, - metadata=None, - highlevel=True, - subform=form, - behavior=None, - ) - - def _metadata_file_from_data_files(path_list, fs, out_path): """ Aggregate _metadata and _common_metadata from data files @@ -377,7 +400,7 @@ def _write_partition( ): import pyarrow.parquet as pq - t = to_arrow_table( + t = ak_to_arrow_table.to_arrow_table( data, list_to32=True, string_to32=True, @@ -414,7 +437,7 @@ def __init__( path: Any, return_metadata: bool = False, compression: Any | None = None, - head: Any | None = None, + head: bool = False, npartitions: int | None = None, prefix: str | None = None, ): @@ -484,7 +507,7 @@ def to_parquet( # - parquet 2 for full set of time and int types # - v2 data page (for possible later fastparquet implementation) # - dict encoding always off - fs, _ = fsspec.core.url_to_fs(path, **(storage_options or {})) + fs, _ = url_to_fs(path, **(storage_options or {})) name = f"write-parquet-{tokenize(fs, data, path)}" map_res = map_partitions( From 122ca4c56f6c8c910031bfa1863b7864af180baf Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Tue, 1 Aug 2023 10:47:00 -0500 Subject: [PATCH 5/8] [pre-commit.ci] pre-commit autoupdate (#335) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit updates: - [github.com/astral-sh/ruff-pre-commit: v0.0.280 → v0.0.281](https://github.com/astral-sh/ruff-pre-commit/compare/v0.0.280...v0.0.281) - [github.com/asottile/pyupgrade: v3.9.0 → v3.10.1](https://github.com/asottile/pyupgrade/compare/v3.9.0...v3.10.1) Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> --- .pre-commit-config.yaml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index a344ace4e..2e66c351c 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -21,7 +21,7 @@ repos: - --target-version=py38 - repo: https://github.com/astral-sh/ruff-pre-commit - rev: v0.0.280 + rev: v0.0.281 hooks: - id: ruff @@ -32,7 +32,7 @@ repos: language_version: python3 - repo: https://github.com/asottile/pyupgrade - rev: v3.9.0 + rev: v3.10.1 hooks: - id: pyupgrade args: From fcdc8da582d1aff839715c3b02584ee7cb6094b9 Mon Sep 17 00:00:00 2001 From: Doug Davis Date: Tue, 1 Aug 2023 21:33:10 -0500 Subject: [PATCH 6/8] fix: add `lib` modules to top level import; enable simple downstream test in CI (#336) --- .github/envs/environment-3.10.yml | 1 + .github/envs/environment-3.8.yml | 1 + .github/envs/environment-3.9.yml | 1 + pyproject.toml | 3 +++ src/dask_awkward/__init__.py | 7 +++++++ 5 files changed, 13 insertions(+) diff --git a/.github/envs/environment-3.10.yml b/.github/envs/environment-3.10.yml index 03d9154d7..cfdbec760 100644 --- a/.github/envs/environment-3.10.yml +++ b/.github/envs/environment-3.10.yml @@ -12,3 +12,4 @@ dependencies: - awkward - dask-histogram - uproot + - hist diff --git a/.github/envs/environment-3.8.yml b/.github/envs/environment-3.8.yml index 5af35db43..87cb7f700 100644 --- a/.github/envs/environment-3.8.yml +++ b/.github/envs/environment-3.8.yml @@ -12,3 +12,4 @@ dependencies: - awkward - dask-histogram - uproot + - hist diff --git a/.github/envs/environment-3.9.yml b/.github/envs/environment-3.9.yml index a0d60d843..f6ad15baf 100644 --- a/.github/envs/environment-3.9.yml +++ b/.github/envs/environment-3.9.yml @@ -12,3 +12,4 @@ dependencies: - awkward - dask-histogram - uproot + - hist diff --git a/pyproject.toml b/pyproject.toml index 7aa2577e4..b259033b6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -53,11 +53,14 @@ docs = [ ] test = [ "dask-awkward[complete]", + "dask-histogram", "distributed", + "hist", "pandas", "pytest >=6.0", "pytest-cov >=3.0.0", "requests >=2.27.1", + "uproot", ] [project.entry-points."dask.sizeof"] diff --git a/src/dask_awkward/__init__.py b/src/dask_awkward/__init__.py index eb7db140f..267fdf693 100644 --- a/src/dask_awkward/__init__.py +++ b/src/dask_awkward/__init__.py @@ -1,5 +1,12 @@ from dask_awkward import config # isort:skip; load awkward config +import dask_awkward.lib.core as core +import dask_awkward.lib.describe as describe +import dask_awkward.lib.inspect as inspect +import dask_awkward.lib.operations as operations +import dask_awkward.lib.optimize as optimize +import dask_awkward.lib.reducers as reducers +import dask_awkward.lib.structure as structure from dask_awkward.lib.core import Array, PartitionCompatibility, Record, Scalar from dask_awkward.lib.core import _type as type from dask_awkward.lib.core import ( From 483ed21c145ab0708b6760c2602dfc0469cbd9cb Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Tue, 8 Aug 2023 09:24:24 -0500 Subject: [PATCH 7/8] [pre-commit.ci] pre-commit autoupdate (#337) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit updates: - [github.com/astral-sh/ruff-pre-commit: v0.0.281 → v0.0.282](https://github.com/astral-sh/ruff-pre-commit/compare/v0.0.281...v0.0.282) Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> --- .pre-commit-config.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 2e66c351c..e9ef2775e 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -21,7 +21,7 @@ repos: - --target-version=py38 - repo: https://github.com/astral-sh/ruff-pre-commit - rev: v0.0.281 + rev: v0.0.282 hooks: - id: ruff From 74bd201b60372620a4bfd1d0b648b8fb1c7ecfca Mon Sep 17 00:00:00 2001 From: Doug Davis Date: Wed, 9 Aug 2023 12:49:03 -0500 Subject: [PATCH 8/8] fix: sync `to_parquet` arguments (#334) --- docs/conf.py | 2 +- src/dask_awkward/lib/io/parquet.py | 428 ++++++++++++++++++++--------- 2 files changed, 298 insertions(+), 132 deletions(-) diff --git a/docs/conf.py b/docs/conf.py index a919ce4e5..a5541a8d7 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -79,7 +79,7 @@ "awkward": ("https://awkward-array.org/doc/stable/", None), } -autodoc_typehints = "none" +autodoc_typehints = "description" pygments_style = "default" diff --git a/src/dask_awkward/lib/io/parquet.py b/src/dask_awkward/lib/io/parquet.py index ad9163b22..3f32eb91e 100644 --- a/src/dask_awkward/lib/io/parquet.py +++ b/src/dask_awkward/lib/io/parquet.py @@ -5,13 +5,12 @@ import logging import math import operator -from typing import Any, Sequence +from collections.abc import Sequence +from typing import Any, Literal, overload import awkward as ak import awkward.operations.ak_from_parquet as ak_from_parquet -import awkward.operations.ak_to_arrow_table as ak_to_arrow_table from awkward.forms.form import Form -from awkward.operations.ak_from_parquet import _load from dask.base import tokenize from dask.blockwise import BlockIndex from dask.highlevelgraph import HighLevelGraph @@ -107,7 +106,7 @@ def __init__( ) def __call__(self, source: Any) -> Any: - array = _load( + array = ak_from_parquet._load( [source], parquet_columns=self.columns, subrg=[None], @@ -165,7 +164,7 @@ def __call__(self, pair: Any) -> ak.Array: subrg, source = pair if isinstance(subrg, int): subrg = [[subrg]] - array = _load( + array = ak_from_parquet._load( [source], parquet_columns=self.columns, subrg=subrg, @@ -175,13 +174,6 @@ def __call__(self, pair: Any) -> ak.Array: behavior=self.behavior, **self.kwargs, ) - # array = _file_to_partition( - # source, - # self.fs, - # self.columns, - # self.form, - # subrg=subrg, - # ) return ak.Array(unproject_layout(self.original_form, array.layout)) def project_columns( @@ -205,7 +197,6 @@ def from_parquet( path: str | list[str], *, columns: str | list[str] | None = None, - storage_options: dict[str, Any] | None = None, max_gap: int = 64_000, max_block: int = 256_000_000, footer_sample_size: int = 1_000_000, @@ -214,8 +205,51 @@ def from_parquet( behavior: dict | None = None, ignore_metadata: bool = True, scan_files: bool = False, - split_row_groups: bool = False, + split_row_groups: bool | None = False, + storage_options: dict[str, Any] | None = None, ) -> Array: + """Create an Array collection from a Parquet dataset. + + See :func:`ak.from_parquet` for more information. + + Parameters + ---------- + path + Local directory containing parquet files, remote URL directory + containing Parquet files, or explicit list of Parquet files, + passed to fsspec for resolution. May contain glob patterns. + columns + See :func:`ak.from_parquet` + max_gap + See :func:`ak.from_parquet` + max_block + See :func:`ak.from_parquet` + footer_sample_size + See :func:`ak.from_parquet` + generate_bitmasks + See :func:`ak.from_parquet` + highlevel + See :func:`ak.from_parquet` + behavior + See :func:`ak.from_parquet` + ignore_metadata + If ``True``, ignore Parquet metadata file (if it exists). + scan_files + Scan files when parsing metadata. + split_row_groups + If True, each row group becomes a partition. If False, each + file becomes a partition. If None, the existence of a + ``_metadata`` file and ignore_metadata=False implies True, + else ``False``. + storage_options + Storage options passed to fsspec. + + Returns + ------- + Array + Collection represented by the Parquet data on disk. + + """ if not highlevel: raise ValueError("dask-awkward only supports highlevel=True") @@ -275,21 +309,18 @@ def from_parquet( if split_row_groups is False or subrg is None: # file-wise - - fn = _FromParquetFileWiseFn( - fs=fs, - form=subform, - listsep=listsep, - unnamed_root=unnamed_root, - max_gap=max_gap, - max_block=max_block, - footer_sample_size=footer_sample_size, - generate_bitmasks=generate_bitmasks, - behavior=behavior, - ) - return from_map( - fn, + _FromParquetFileWiseFn( + fs=fs, + form=subform, + listsep=listsep, + unnamed_root=unnamed_root, + max_gap=max_gap, + max_block=max_block, + footer_sample_size=footer_sample_size, + generate_bitmasks=generate_bitmasks, + behavior=behavior, + ), actual_paths, label=label, token=token, @@ -317,20 +348,18 @@ def from_parquet( for isubrg, path in zip(subrg, actual_paths): pairs.extend([(irg, path) for irg in isubrg]) - fn = _FromParquetFragmentWiseFn( - fs=fs, - form=subform, - listsep=listsep, - unnamed_root=unnamed_root, - max_gap=max_gap, - max_block=max_block, - footer_sample_size=footer_sample_size, - generate_bitmasks=generate_bitmasks, - behavior=behavior, - ) - return from_map( - fn, + _FromParquetFragmentWiseFn( + fs=fs, + form=subform, + listsep=listsep, + unnamed_root=unnamed_root, + max_gap=max_gap, + max_block=max_block, + footer_sample_size=footer_sample_size, + generate_bitmasks=generate_bitmasks, + behavior=behavior, + ), pairs, label=label, token=token, @@ -387,90 +416,132 @@ def _write_metadata(fs, out_path, meta): meta.write_metadata_file(fil) -def _write_partition( - data, - path, # dataset root - fs, - filename, # relative path within the dataset - # partition_on=Fa, # must be top-level leaf (i.e., a simple column) - return_metadata=False, # whether making global _metadata - compression=None, # TBD - head=False, # is this the first piece - # custom_metadata=None, -): - import pyarrow.parquet as pq - - t = ak_to_arrow_table.to_arrow_table( - data, - list_to32=True, - string_to32=True, - bytestring_to32=True, - categorical_as_dictionary=True, - extensionarray=False, - ) - md_list = [] - with fs.open(fs.sep.join([path, filename]), "wb") as fil: - pq.write_table( - t, - fil, - compression=compression, - metadata_collector=md_list, - ) - - # Return the schema needed to write global _metadata - if return_metadata: - _meta = md_list[0] - _meta.set_file_path(filename) - d = {"meta": _meta} - if head: - # Only return schema if this is the "head" partition - d["schema"] = t.schema - return [d] - else: - return [] - - class _ToParquetFn: def __init__( self, fs: AbstractFileSystem, - path: Any, - return_metadata: bool = False, - compression: Any | None = None, - head: bool = False, - npartitions: int | None = None, + path: str, + npartitions: int, prefix: str | None = None, + storage_options: dict | None = None, + **kwargs: Any, ): self.fs = fs self.path = path - self.return_metadata = return_metadata - self.compression = compression - self.head = head self.prefix = prefix - self.zfill = ( - math.ceil(math.log(npartitions, 10)) if npartitions is not None else 1 - ) - + self.zfill = math.ceil(math.log(npartitions, 10)) + self.storage_options = storage_options self.fs.mkdirs(self.path, exist_ok=True) + self.protocol = ( + self.fs.protocol + if isinstance(self.fs.protocol, str) + else self.fs.protocol[0] + ) + self.kwargs = kwargs def __call__(self, data, block_index): filename = f"part{str(block_index[0]).zfill(self.zfill)}.parquet" if self.prefix is not None: filename = f"{self.prefix}-{filename}" - return _write_partition( - data, - self.path, - self.fs, - filename, - return_metadata=self.return_metadata, - compression=self.compression, - head=self.head, + filename = f"{self.protocol}://{self.path}/{filename}" + return ak.to_parquet( + data, filename, **self.kwargs, storage_options=self.storage_options ) +@overload def to_parquet( - data: Array, - path: Any, + array: Array, + destination: str, + *, + list_to32: bool, + string_to32: bool, + bytestring_to32: bool, + emptyarray_to: Any | None, + categorical_as_dictionary: bool, + extensionarray: bool, + count_nulls: bool, + compression: str | dict | None, + compression_level: int | dict | None, + row_group_size: int | None, + data_page_size: int | None, + parquet_flavor: Literal["spark"] | None, + parquet_version: Literal["1.0"] | Literal["2.4"] | Literal["2.6"], + parquet_page_version: Literal["1.0"] | Literal["2.0"], + parquet_metadata_statistics: bool | dict, + parquet_dictionary_encoding: bool | dict, + parquet_byte_stream_split: bool | dict, + parquet_coerce_timestamps: Literal["ms"] | Literal["us"] | None, + parquet_old_int96_timestamps: bool | None, + parquet_compliant_nested: bool, + parquet_extra_options: dict | None, + storage_options: dict[str, Any] | None, + write_metadata: bool, + compute: Literal[True], + prefix: str | None, +) -> None: + ... + + +@overload +def to_parquet( + array: Array, + destination: str, + *, + list_to32: bool, + string_to32: bool, + bytestring_to32: bool, + emptyarray_to: Any | None, + categorical_as_dictionary: bool, + extensionarray: bool, + count_nulls: bool, + compression: str | dict | None, + compression_level: int | dict | None, + row_group_size: int | None, + data_page_size: int | None, + parquet_flavor: Literal["spark"] | None, + parquet_version: Literal["1.0"] | Literal["2.4"] | Literal["2.6"], + parquet_page_version: Literal["1.0"] | Literal["2.0"], + parquet_metadata_statistics: bool | dict, + parquet_dictionary_encoding: bool | dict, + parquet_byte_stream_split: bool | dict, + parquet_coerce_timestamps: Literal["ms"] | Literal["us"] | None, + parquet_old_int96_timestamps: bool | None, + parquet_compliant_nested: bool, + parquet_extra_options: dict | None, + storage_options: dict[str, Any] | None, + write_metadata: bool, + compute: Literal[False], + prefix: str | None, +) -> Scalar: + ... + + +def to_parquet( + array: Array, + destination: str, + *, + list_to32: bool = False, + string_to32: bool = True, + bytestring_to32: bool = True, + emptyarray_to: Any | None = None, + categorical_as_dictionary: bool = False, + extensionarray: bool = False, + count_nulls: bool = True, + compression: str | dict | None = "zstd", + compression_level: int | dict | None = None, + row_group_size: int | None = 64 * 1024 * 1024, + data_page_size: int | None = None, + parquet_flavor: Literal["spark"] | None = None, + parquet_version: Literal["1.0"] | Literal["2.4"] | Literal["2.6"] = "2.4", + parquet_page_version: Literal["1.0"] | Literal["2.0"] = "1.0", + parquet_metadata_statistics: bool | dict = True, + parquet_dictionary_encoding: bool | dict = False, + parquet_byte_stream_split: bool | dict = False, + parquet_coerce_timestamps: Literal["ms"] | Literal["us"] | None = None, + parquet_old_int96_timestamps: bool | None = None, + parquet_compliant_nested: bool = False, + parquet_extra_options: dict | None = None, storage_options: dict[str, Any] | None = None, write_metadata: bool = False, compute: bool = True, @@ -478,44 +549,139 @@ def to_parquet( ) -> Scalar | None: """Write data to Parquet format. + This will create one output file per partition. + + See the documentation for :func:`ak.to_parquet` for more + information; there are many optional function arguments that are + described in that documentation. + Parameters ---------- - data : dask_awkward.Array - Array to write to parquet. - path : str - Root directory of location to write to - storage_options : dict - Arguments to pass to fsspec for creating the filesystem (see - ``fsspec`` documentation). - write_metadata : bool - Whether to create _metadata and _common_metadata files - compute : bool - Whether to immediately start writing or to return the dask - collection which can be computed at the user's discression. + array + The :obj:`dask_awkward.Array` collection to write to disk. + destination + Where to store the output; this can be a local filesystem path + or a remote filesystem path. + list_to32 + See :func:`ak.to_parquet` + string_to32 + See :func:`ak.to_parquet` + bytestring_to32 + See :func:`ak.to_parquet` + emptyarray_to + See :func:`ak.to_parquet` + categorical_as_dictionary + See :func:`ak.to_parquet` + extensionarray + See :func:`ak.to_parquet` + count_nulls + See :func:`ak.to_parquet` + compression + See :func:`ak.to_parquet` + compression_level + See :func:`ak.to_parquet` + row_group_size + See :func:`ak.to_parquet` + data_page_size + See :func:`ak.to_parquet` + parquet_flavor + See :func:`ak.to_parquet` + parquet_version + See :func:`ak.to_parquet` + parquet_page_version + See :func:`ak.to_parquet` + parquet_metadata_statistics + See :func:`ak.to_parquet` + parquet_dictionary_encoding + See :func:`ak.to_parquet` + parquet_byte_stream_split + See :func:`ak.to_parquet` + parquet_coerce_timestamps + See :func:`ak.to_parquet` + parquet_old_int96_timestamps + See :func:`ak.to_parquet` + parquet_compliant_nested + See :func:`ak.to_parquet` + parquet_extra_options + See :func:`ak.to_parquet` + storage_options + Storage options passed to ``fsspec``. + write_metadata + Write Parquet metadata. + compute + If ``True``, immediately compute the result (write data to + disk). If ``False`` a Scalar collection will be returned such + that ``compute`` can be explicitly called. + prefix + An addition prefix for output files. If ``None`` all parts + inside the destination directory will be named + ``"partN.parquet"``; if defined, the names will be + ``f"{prefix}-partN.parquet"``. Returns ------- - None or dask_awkward.Scalar - If `compute` is ``False``, a :py:class:`dask_awkward.Scalar` - representing the process will be returned, if `compute` is - ``True`` then the return is ``None``. + Scalar | None + If ``compute`` is ``False`` a :obj:`dask_awkward.Scalar` + object is returned such that it can be computed later. If + ``compute`` is ``True``, the collection is immediately + computed (and data will be written to disk) and ``None`` is + returned. + + Examples + -------- + + >>> import awkward as ak + >>> import dask_awkward as dak + >>> a = ak.Array([{"a": [1, 2, 3]}, {"a": [4, 5]}]) + >>> d = dak.from_awkward(a, npartitions=2) + >>> d.npartitions + 2 + >>> dak.to_parquet(d, "/tmp/my-output", prefix="data") + >>> import os + >>> os.listdir("/tmp/my-output") + ['data-part0.parquet', 'data-part1.parquet'] + + """ # TODO options we need: - # - compression per data type or per leaf column ("path.to.leaf": "zstd" format) # - byte stream split for floats if compression is not None or lzma # - partitioning - # - parquet 2 for full set of time and int types - # - v2 data page (for possible later fastparquet implementation) # - dict encoding always off - fs, _ = url_to_fs(path, **(storage_options or {})) - name = f"write-parquet-{tokenize(fs, data, path)}" + fs, path = url_to_fs(destination, **(storage_options or {})) + name = f"write-parquet-{tokenize(fs, array, destination)}" map_res = map_partitions( - _ToParquetFn(fs, path=path, npartitions=data.npartitions, prefix=prefix), - data, - BlockIndex((data.npartitions,)), + _ToParquetFn( + fs=fs, + path=path, + npartitions=array.npartitions, + prefix=prefix, + list_to32=list_to32, + string_to32=string_to32, + bytestring_to32=bytestring_to32, + emptyarray_to=emptyarray_to, + categorical_as_dictionary=categorical_as_dictionary, + extensionarray=extensionarray, + count_nulls=count_nulls, + compression=compression, + compression_level=compression_level, + row_group_size=row_group_size, + data_page_size=data_page_size, + parquet_flavor=parquet_flavor, + parquet_version=parquet_version, + parquet_page_version=parquet_page_version, + parquet_metadata_statistics=parquet_metadata_statistics, + parquet_dictionary_encoding=parquet_dictionary_encoding, + parquet_byte_stream_split=parquet_byte_stream_split, + parquet_coerce_timestamps=parquet_coerce_timestamps, + parquet_old_int96_timestamps=parquet_old_int96_timestamps, + parquet_compliant_nested=parquet_compliant_nested, + parquet_extra_options=parquet_extra_options, + ), + array, + BlockIndex((array.npartitions,)), label="to-parquet", - meta=data._meta, + meta=array._meta, ) map_res.dask.layers[map_res.name].annotations = {"ak_output": True}