From ecd859ea10f40fb1aac508de383fca4663d360ea Mon Sep 17 00:00:00 2001 From: Doug Davis Date: Thu, 7 Dec 2023 16:00:26 -0600 Subject: [PATCH] OOP based report implementation --- src/dask_awkward/layers/layers.py | 44 ++++++------ src/dask_awkward/lib/io/columnar.py | 22 +++--- src/dask_awkward/lib/io/io.py | 100 +++++++++----------------- src/dask_awkward/lib/io/json.py | 56 +++++++++------ src/dask_awkward/lib/io/parquet.py | 106 +++++++++++++++++++++------- src/dask_awkward/lib/io/text.py | 21 +++--- src/dask_awkward/lib/testutils.py | 64 +++++++++++++++-- src/dask_awkward/utils.py | 6 ++ tests/conftest.py | 3 +- tests/test_core.py | 2 +- tests/test_io.py | 80 +++++++++------------ tests/test_parquet.py | 13 ++++ 12 files changed, 314 insertions(+), 203 deletions(-) diff --git a/src/dask_awkward/layers/layers.py b/src/dask_awkward/layers/layers.py index c33a54a62..d33814829 100644 --- a/src/dask_awkward/layers/layers.py +++ b/src/dask_awkward/layers/layers.py @@ -48,21 +48,29 @@ def __repr__(self) -> str: class ImplementsIOFunction(Protocol): - def __call__(self, *args: Any, **kwargs: Any) -> AwkwardArray: + def __call__(self, *args, **kwargs): ... T = TypeVar("T") -class ImplementsMocking(Protocol): +class ImplementsMocking(ImplementsIOFunction, Protocol): def mock(self) -> AwkwardArray: ... + +class ImplementsMockEmpty(ImplementsIOFunction, Protocol): def mock_empty(self, backend: BackendT) -> AwkwardArray: ... +class ImplementsReport(ImplementsIOFunction, Protocol): + @property + def return_report(self) -> bool: + ... + + class ImplementsProjection(ImplementsMocking, Protocol[T]): def prepare_for_projection(self) -> tuple[AwkwardArray, TypeTracerReport, T]: ... @@ -93,19 +101,6 @@ def mock(self) -> AwkwardArray: assert self._meta is not None return self._meta - def mock_empty(self, backend: BackendT = "cpu") -> AwkwardArray: - import awkward as ak - - if backend not in ("cpu", "jax", "cuda"): - raise ValueError( - f"backend must be one of 'cpu', 'jax', or 'cuda', received {backend}" - ) - return ak.to_backend( - self.mock().layout.form.length_zero_array(highlevel=False), - backend=backend, - highlevel=True, - ) - def io_func_implements_projection(func: ImplementsIOFunction) -> bool: return hasattr(func, "prepare_for_projection") @@ -123,6 +118,10 @@ def io_func_implements_columnar(func: ImplementsIOFunction) -> bool: return hasattr(func, "necessary_columns") +def io_func_implements_report(func: ImplementsIOFunction) -> bool: + return hasattr(func, "return_report") + + class AwkwardInputLayer(AwkwardBlockwiseLayer): """A layer known to perform IO and produce Awkward arrays @@ -183,7 +182,6 @@ def is_columnar(self) -> bool: def mock(self) -> AwkwardInputLayer: assert self.is_mockable - return AwkwardInputLayer( name=self.name, inputs=[None][: int(list(self.numblocks.values())[0][0])], @@ -229,10 +227,15 @@ def prepare_for_projection(self) -> tuple[AwkwardInputLayer, TypeTracerReport, T ImplementsProjection, self.io_func ).prepare_for_projection() + new_return = new_meta_array + if io_func_implements_report(self.io_func): + if cast(ImplementsReport, self.io_func).return_report: + new_return = (new_meta_array, type(new_meta_array)([])) + new_input_layer = AwkwardInputLayer( name=self.name, inputs=[None][: int(list(self.numblocks.values())[0][0])], - io_func=lambda *_, **__: new_meta_array, + io_func=lambda *_, **__: new_return, label=self.label, produces_tasks=self.produces_tasks, creation_info=self.creation_info, @@ -246,12 +249,13 @@ def project( state: T, ) -> AwkwardInputLayer: assert self.is_projectable + io_func = cast(ImplementsProjection, self.io_func).project( + report=report, state=state + ) return AwkwardInputLayer( name=self.name, inputs=self.inputs, - io_func=cast(ImplementsProjection, self.io_func).project( - report=report, state=state - ), + io_func=io_func, label=self.label, produces_tasks=self.produces_tasks, creation_info=self.creation_info, diff --git a/src/dask_awkward/lib/io/columnar.py b/src/dask_awkward/lib/io/columnar.py index 6644ea3f7..0fd955925 100644 --- a/src/dask_awkward/lib/io/columnar.py +++ b/src/dask_awkward/lib/io/columnar.py @@ -1,11 +1,12 @@ from __future__ import annotations import logging -from typing import TYPE_CHECKING, Any, Protocol, TypeVar, cast +from typing import TYPE_CHECKING, Protocol, TypeVar, cast import awkward as ak from awkward import Array as AwkwardArray from awkward.forms import Form +from awkward.typetracer import typetracer_from_form, typetracer_with_report from dask_awkward.layers.layers import ( BackendT, @@ -43,7 +44,7 @@ def behavior(self) -> dict | None: def project_columns(self: T, columns: frozenset[str]) -> T: ... - def __call__(self, *args: Any, **kwargs: Any) -> AwkwardArray: + def __call__(self, *args, **kwargs): ... @@ -60,13 +61,18 @@ class ColumnProjectionMixin(ImplementsNecessaryColumns[FormStructure]): """ def mock(self: S) -> AwkwardArray: - return ak.typetracer.typetracer_from_form(self.form, behavior=self.behavior) + return cast( + AwkwardArray, typetracer_from_form(self.form, behavior=self.behavior) + ) def mock_empty(self: S, backend: BackendT = "cpu") -> AwkwardArray: - return ak.to_backend( - self.form.length_zero_array(highlevel=False, behavior=self.behavior), - backend, - highlevel=True, + return cast( + AwkwardArray, + ak.to_backend( + self.form.length_zero_array(highlevel=False, behavior=self.behavior), + backend, + highlevel=True, + ), ) def prepare_for_projection( @@ -75,7 +81,7 @@ def prepare_for_projection( form = form_with_unique_keys(self.form, "@") # Build typetracer and associated report object - (meta, report) = ak.typetracer.typetracer_with_report( + (meta, report) = typetracer_with_report( form, highlevel=True, behavior=self.behavior, diff --git a/src/dask_awkward/lib/io/io.py b/src/dask_awkward/lib/io/io.py index 246de43eb..715c36231 100644 --- a/src/dask_awkward/lib/io/io.py +++ b/src/dask_awkward/lib/io/io.py @@ -1,6 +1,5 @@ from __future__ import annotations -import functools import logging import math from collections.abc import Callable, Iterable, Mapping @@ -20,18 +19,20 @@ AwkwardBlockwiseLayer, AwkwardInputLayer, AwkwardMaterializedLayer, - BackendT, ImplementsMocking, + ImplementsReport, IOFunctionWithMocking, - io_func_implements_mock_empty, io_func_implements_mocking, + io_func_implements_report, ) from dask_awkward.lib.core import ( + Array, empty_typetracer, map_partitions, new_array_object, typetracer_array, ) +from dask_awkward.utils import first, second if TYPE_CHECKING: from dask.array.core import Array as DaskArray @@ -40,8 +41,6 @@ from dask.delayed import Delayed from fsspec.spec import AbstractFileSystem - from dask_awkward.lib.core import Array - logger = logging.getLogger(__name__) @@ -100,16 +99,19 @@ def from_awkward( starts = locs[:-1] stops = locs[1:] meta = typetracer_array(source) - return from_map( - _FromAwkwardFn(source), - starts, - stops, - label=label or "from-awkward", - token=tokenize(source, npartitions), - divisions=locs, - meta=meta, - behavior=behavior, - attrs=attrs, + return cast( + Array, + from_map( + _FromAwkwardFn(source), + starts, + stops, + label=label or "from-awkward", + token=tokenize(source, npartitions), + divisions=locs, + meta=meta, + behavior=behavior, + attrs=attrs, + ), ) @@ -158,12 +160,15 @@ def from_lists( """ lists = list(source) divs = (0, *np.cumsum(list(map(len, lists)))) - return from_map( - _FromListsFn(behavior=behavior, attrs=attrs), - lists, - meta=typetracer_array(ak.Array(lists[0], attrs=attrs, behavior=behavior)), - divisions=divs, - label="from-lists", + return cast( + Array, + from_map( + _FromListsFn(behavior=behavior, attrs=attrs), + lists, + meta=typetracer_array(ak.Array(lists[0], attrs=attrs, behavior=behavior)), + divisions=divs, + label="from-lists", + ), ) @@ -496,31 +501,6 @@ def __call__(self, packed_arg): ) -def return_empty_on_raise( - fn: Callable, - allowed_exceptions: tuple[type[BaseException], ...], - backend: BackendT, -) -> Callable: - @functools.wraps(fn) - def wrapped(*args, **kwargs): - try: - return fn(*args, **kwargs) - except allowed_exceptions as err: - logmsg = ( - "%s call failed with args %s and kwargs %s; empty array returned. %s" - % ( - str(fn), - str(args), - str(kwargs), - str(err), - ) - ) - logger.info(logmsg) - return fn.mock_empty(backend) - - return wrapped - - def from_map( func: Callable, *iterables: Iterable, @@ -529,10 +509,8 @@ def from_map( token: str | None = None, divisions: tuple[int, ...] | tuple[None, ...] | None = None, meta: ak.Array | None = None, - empty_on_raise: tuple[type[BaseException], ...] | None = None, - empty_backend: BackendT | None = None, **kwargs: Any, -) -> Array: +) -> Array | tuple[Array, Array]: """Create an Array collection from a custom mapping. Parameters @@ -557,12 +535,6 @@ def from_map( meta : Array, optional Collection metadata array, if known (the awkward-array type tracer) - empty_on_raise : tuple[type[BaseException], ...], optional - Set of exceptions that can be caught to return an empty array - at compute time if file IO raises. - empty_backend : str, - The backend for the empty array resulting from a failed read - when `empty_on_raise` is defined. **kwargs : Any Keyword arguments passed to `func`. @@ -644,18 +616,6 @@ def from_map( io_func = func array_meta = None - if (empty_on_raise and not empty_backend) or (empty_backend and not empty_on_raise): - raise ValueError("empty_on_raise and empty_backend must be used together.") - - if empty_on_raise and empty_backend: - if not io_func_implements_mock_empty(io_func): - raise ValueError("io_func must implement mock_empty method.") - io_func = return_empty_on_raise( - io_func, - allowed_exceptions=empty_on_raise, - backend=empty_backend, - ) - dsk = AwkwardInputLayer(name=name, inputs=inputs, io_func=io_func) hlg = HighLevelGraph.from_collections(name, dsk) @@ -664,6 +624,12 @@ def from_map( else: result = new_array_object(hlg, name, meta=array_meta, npartitions=len(inputs)) + if io_func_implements_report(io_func): + if cast(ImplementsReport, io_func).return_report: + res = result.map_partitions(first, meta=array_meta, output_divisions=1) + rep = result.map_partitions(second, meta=empty_typetracer()) + return res, rep + return result diff --git a/src/dask_awkward/lib/io/json.py b/src/dask_awkward/lib/io/json.py index c0fbf54f5..701ad2e0f 100644 --- a/src/dask_awkward/lib/io/json.py +++ b/src/dask_awkward/lib/io/json.py @@ -4,7 +4,7 @@ import logging import math from collections.abc import Callable, Mapping -from typing import TYPE_CHECKING, Any, Literal, overload +from typing import TYPE_CHECKING, Any, Literal, cast, overload import awkward as ak import dask @@ -18,7 +18,13 @@ from fsspec.utils import infer_compression, read_block from dask_awkward.layers.layers import AwkwardMaterializedLayer -from dask_awkward.lib.core import map_partitions, new_scalar_object, typetracer_array +from dask_awkward.lib.core import ( + Array, + Scalar, + map_partitions, + new_scalar_object, + typetracer_array, +) from dask_awkward.lib.io.columnar import ColumnProjectionMixin from dask_awkward.lib.io.io import ( _bytes_with_sample, @@ -30,7 +36,6 @@ from awkward.contents.content import Content from fsspec.spec import AbstractFileSystem - from dask_awkward.lib.core import Array, Scalar log = logging.getLogger(__name__) @@ -297,12 +302,15 @@ def _from_json_files( **kwargs, ) - return from_map( - f, - paths, - label="from-json-files", - token=token, - meta=meta, + return cast( + Array, + from_map( + f, + paths, + label="from-json-files", + token=token, + meta=meta, + ), ) @@ -334,12 +342,15 @@ def _from_json_sopf( **kwargs, ) - return from_map( - f, - paths, - label="from-json-sopf", - token=token, - meta=meta, + return cast( + Array, + from_map( + f, + paths, + label="from-json-sopf", + token=token, + meta=meta, + ), ) @@ -394,12 +405,15 @@ def _from_json_bytes( **kwargs, ) - return from_map( - fn, - list(flatten(bytes_ingredients)), - label="from-json-bytes", - token=token, - meta=meta, + return cast( + Array, + from_map( + fn, + list(flatten(bytes_ingredients)), + label="from-json-bytes", + token=token, + meta=meta, + ), ) diff --git a/src/dask_awkward/lib/io/parquet.py b/src/dask_awkward/lib/io/parquet.py index 26e83a527..977d82fbc 100644 --- a/src/dask_awkward/lib/io/parquet.py +++ b/src/dask_awkward/lib/io/parquet.py @@ -6,7 +6,7 @@ import math import operator from collections.abc import Mapping -from typing import TYPE_CHECKING, Any, Literal, TypeVar +from typing import TYPE_CHECKING, Any, Literal, TypeVar, cast import awkward as ak import awkward.operations.ak_from_parquet as ak_from_parquet @@ -33,7 +33,35 @@ T = TypeVar("T") -class _FromParquetFn(ColumnProjectionMixin): +def report_failure(exception, *args, **kwargs): + return ak.Array( + [ + { + "columns": [], + "args": [repr(a) for a in args], + "kwargs": [[k, repr(v)] for k, v in kwargs.items()], + "exception": type(exception).__name__, + "message": str(exception), + } + ] + ) + + +def report_success(columns, *args, **kwargs): + return ak.Array( + [ + { + "columns": columns, + "args": [repr(a) for a in args], + "kwargs": [[k, repr(v)] for k, v in kwargs.items()], + "exception": None, + "message": None, + } + ] + ) + + +class FromParquetFn(ColumnProjectionMixin): def __init__( self, *, @@ -42,6 +70,8 @@ def __init__( listsep: str = "list.item", unnamed_root: bool = False, original_form: Form | None = None, + report: bool = False, + allowed_exceptions: tuple[type[BaseException], ...] = (OSError,), behavior: Mapping | None = None, attrs: Mapping[str, Any] | None = None, **kwargs: Any, @@ -54,18 +84,24 @@ def __init__( if self.unnamed_root: self.columns = [f".{c}" for c in self.columns] self.original_form = original_form + self.report = report + self.allowed_exceptions = allowed_exceptions self.behavior = behavior self.attrs = attrs self.kwargs = kwargs @abc.abstractmethod - def __call__(self, source: Any) -> ak.Array: + def __call__(self, *args, **kwargs): ... @abc.abstractmethod def project_columns(self, columns): ... + @property + def return_report(self) -> bool: + return self.report + @property def use_optimization(self) -> bool: return "parquet" in dask.config.get( @@ -91,7 +127,7 @@ def __str__(self) -> str: return self.__repr__() -class _FromParquetFileWiseFn(_FromParquetFn): +class FromParquetFileWiseFn(FromParquetFn): def __init__( self, *, @@ -113,7 +149,7 @@ def __init__( **kwargs, ) - def __call__(self, source: Any) -> Any: + def read_fn(self, source: Any) -> Any: layout = ak_from_parquet._load( [source], parquet_columns=self.columns, @@ -131,20 +167,32 @@ def __call__(self, source: Any) -> Any: behavior=self.behavior, ) + def __call__(self, *args, **kwargs): + source = args[0] + if self.return_report: + try: + result = self.read_fn(source) + return result, report_success(self.columns, source) + except self.allowed_exceptions as err: + return self.mock_empty(), report_failure(err, source) + + return self.read_fn(source) + def project_columns(self, columns): - return _FromParquetFileWiseFn( + return FromParquetFileWiseFn( fs=self.fs, form=self.form.select_columns(columns), listsep=self.listsep, unnamed_root=self.unnamed_root, original_form=self.form, + report=self.report, attrs=self.attrs, behavior=self.behavior, **self.kwargs, ) -class _FromParquetFragmentWiseFn(_FromParquetFn): +class FromParquetFragmentWiseFn(FromParquetFn): def __init__( self, *, @@ -190,11 +238,12 @@ def __call__(self, pair: Any) -> ak.Array: ) def project_columns(self, columns): - return _FromParquetFragmentWiseFn( + return FromParquetFragmentWiseFn( fs=self.fs, form=self.form.select_columns(columns), unnamed_root=self.unnamed_root, original_form=self.form, + report=self.report, behavior=self.behavior, attrs=self.attrs, **self.kwargs, @@ -216,7 +265,8 @@ def from_parquet( scan_files: bool = False, split_row_groups: bool | None = False, storage_options: dict[str, Any] | None = None, -) -> Array: + report: bool = False, +) -> Array | tuple[Array, Array]: """Create an Array collection from a Parquet dataset. See :func:`ak.from_parquet` for more information. @@ -317,7 +367,7 @@ def from_parquet( if split_row_groups is False or subrg is None: # file-wise return from_map( - _FromParquetFileWiseFn( + FromParquetFileWiseFn( fs=fs, form=subform, listsep=listsep, @@ -328,6 +378,7 @@ def from_parquet( generate_bitmasks=generate_bitmasks, behavior=behavior, attrs=attrs, + report=report, ), actual_paths, label=label, @@ -355,23 +406,26 @@ def from_parquet( for isubrg, path in zip(subrg, actual_paths): pairs.extend([(irg, path) for irg in isubrg]) - return from_map( - _FromParquetFragmentWiseFn( - fs=fs, - form=subform, - listsep=listsep, - unnamed_root=unnamed_root, - max_gap=max_gap, - max_block=max_block, - footer_sample_size=footer_sample_size, - generate_bitmasks=generate_bitmasks, - behavior=behavior, - attrs=attrs, + return cast( + Array, + from_map( + FromParquetFragmentWiseFn( + fs=fs, + form=subform, + listsep=listsep, + unnamed_root=unnamed_root, + max_gap=max_gap, + max_block=max_block, + footer_sample_size=footer_sample_size, + generate_bitmasks=generate_bitmasks, + behavior=behavior, + attrs=attrs, + ), + pairs, + label=label, + token=token, + divisions=tuple(divisions), ), - pairs, - label=label, - token=token, - divisions=tuple(divisions), ) diff --git a/src/dask_awkward/lib/io/text.py b/src/dask_awkward/lib/io/text.py index a45996baa..f38c9f352 100644 --- a/src/dask_awkward/lib/io/text.py +++ b/src/dask_awkward/lib/io/text.py @@ -1,6 +1,6 @@ from __future__ import annotations -from typing import TYPE_CHECKING +from typing import cast import awkward as ak import numpy as np @@ -12,15 +12,13 @@ from fsspec.core import get_fs_token_paths from fsspec.utils import infer_compression, read_block +from dask_awkward.lib.core import Array from dask_awkward.lib.io.io import ( _bytes_with_sample, _BytesReadingInstructions, from_map, ) -if TYPE_CHECKING: - from dask_awkward.lib.core import Array - def _string_array_from_bytestring(bytestring: bytes, delimiter: bytes) -> ak.Array: buffer = np.frombuffer(bytestring, dtype=np.uint8) @@ -117,10 +115,13 @@ def from_text( ) ) - return from_map( - _from_text_on_block, - list(flatten(bytes_ingredients)), - label="from-text", - token=token, - meta=meta, + return cast( + Array, + from_map( + _from_text_on_block, + list(flatten(bytes_ingredients)), + label="from-text", + token=token, + meta=meta, + ), ) diff --git a/src/dask_awkward/lib/testutils.py b/src/dask_awkward/lib/testutils.py index 4af3cc4a3..cc55d8580 100644 --- a/src/dask_awkward/lib/testutils.py +++ b/src/dask_awkward/lib/testutils.py @@ -1,6 +1,8 @@ from __future__ import annotations +import functools import random +import time from typing import Any import awkward as ak @@ -182,7 +184,7 @@ def lists() -> Array: return from_lists([list1(), list2(), list3()]) # pragma: no cover -def unnamed_root_ds() -> Array: +def unnamed_root_ds() -> ak.Array: ds = [ [ { @@ -227,17 +229,71 @@ def unnamed_root_ds() -> Array: return ak.Array(ds * 3) +def time_it(f): + @functools.wraps(f) + def wrapper(*args, **kwargs): + start = time.time() + result = f(*args, **kwargs) + end = time.time() + return result, (end - start) + + return wrapper + + class RandomFailFromListsFn: - def __init__(self, form): + def __init__(self, form, report=False, allowed_exceptions=(OSError,)): self.form = form + self.allowed_exceptions = allowed_exceptions + self.report = report + + @property + def return_report(self) -> bool: + return self.report + + @staticmethod + def make_fail_report(exception, *args, **kwargs): + return ak.Array( + [ + { + "duration": None, + "args": [repr(a) for a in args], + "kwargs": [[k, repr(v)] for k, v in kwargs.items()], + "exception": type(exception).__name__, + "message": str(exception), + } + ] + ) - def __call__(self, x: list) -> ak.Array: + @staticmethod + def make_success_report(duration, *args, **kwargs): + return ak.Array( + [ + { + "duration": duration, + "args": [repr(a) for a in args], + "kwargs": [[k, repr(v)] for k, v in kwargs.items()], + "exception": None, + "message": None, + } + ] + ) + + def read_fn(self, x: list) -> ak.Array: n = random.randint(0, 9) if n < 5: raise OSError("BAD!") - return ak.Array(x) + def __call__(self, *args, **kwargs): + if self.return_report: + try: + result, time = time_it(self.read_fn)(*args, **kwargs) + return result, self.make_success_report(time, *args, **kwargs) + except self.allowed_exceptions as err: + return self.mock_empty(), self.make_fail_report(err, *args, **kwargs) + + return self.read_fn(*args, **kwargs) + def mock(self): return typetracer_from_form(self.form) diff --git a/src/dask_awkward/utils.py b/src/dask_awkward/utils.py index 19bb69ce4..750ada95d 100644 --- a/src/dask_awkward/utils.py +++ b/src/dask_awkward/utils.py @@ -143,3 +143,9 @@ def first(seq: Iterable[T]) -> T: """ return next(iter(seq)) + + +def second(seq: Iterable[T]) -> T: + the_iter = iter(seq) + next(the_iter) + return next(the_iter) diff --git a/tests/conftest.py b/tests/conftest.py index b2ba4d0f5..142980de9 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,6 +1,7 @@ from __future__ import annotations import json +from typing import cast import awkward as ak import fsspec @@ -73,7 +74,7 @@ def pq_points_dir(daa: dak.Array, tmp_path_factory: pytest.TempPathFactory) -> s @pytest.fixture(scope="session") def daa_parquet(pq_points_dir: str) -> dak.Array: - return dak.from_parquet(pq_points_dir) + return cast(dak.Array, dak.from_parquet(pq_points_dir)) @pytest.fixture(scope="session") diff --git a/tests/test_core.py b/tests/test_core.py index 04d96bd3d..bcb98efc7 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -883,6 +883,6 @@ def test_shape_only_ops(fn: Callable, tmp_path_factory: pytest.TempPathFactory) p = tmp_path_factory.mktemp("zeros-like-flat") ak.to_parquet(a, str(p / "file.parquet")) lazy = dak.from_parquet(str(p)) - result = fn(lazy.b) + result = fn(lazy.b) # type: ignore with dask.config.set({"awkward.optimization.enabled": True}): result.compute() diff --git a/tests/test_io.py b/tests/test_io.py index 580ad5c15..c1af1a58d 100644 --- a/tests/test_io.py +++ b/tests/test_io.py @@ -1,8 +1,10 @@ from __future__ import annotations from pathlib import Path +from typing import cast import awkward as ak +import dask import numpy as np import pytest from dask.array.utils import assert_eq as da_assert_eq @@ -344,7 +346,7 @@ def test_bytes_with_sample( assert len(sample_bytes) == 127 -def test_random_fail_from_lists(): +def test_from_map_random_fail_from_lists(): from dask_awkward.lib.testutils import RandomFailFromListsFn single = [[1, 2, 3], [4, 5], [6], [], [1, 2, 3]] @@ -352,26 +354,34 @@ def test_random_fail_from_lists(): divs = (0, *np.cumsum(list(map(len, many)))) form = ak.Array(many[0]).layout.form - array = from_map( - RandomFailFromListsFn(form), + array, report = from_map( + RandomFailFromListsFn(form, report=True, allowed_exceptions=(OSError,)), many, meta=typetracer_array(ak.Array(many[0])), divisions=divs, label="from-lists", - empty_on_raise=(OSError,), - empty_backend="cpu", ) assert len(array.compute()) < (len(single) * len(many)) + computed_report = report.compute() + + # we expect the 'args' field in the report to be empty if the + # from_map node succeded; so we use ak.num(..., axis=1) to filter + # those out. + succ = ak.num(computed_report["args"], axis=1) == 0 + fail = np.invert(succ) + assert len(computed_report[succ]) < len(computed_report) + assert ak.all(computed_report[fail].exception == "OSError") + with pytest.raises(OSError, match="BAD"): - array = from_map( - RandomFailFromListsFn(form), + array, report = from_map( + RandomFailFromListsFn( + form, report=True, allowed_exceptions=(RuntimeError,) + ), many, meta=typetracer_array(ak.Array(many[0])), divisions=divs, label="from-lists", - empty_on_raise=(RuntimeError,), - empty_backend="cpu", ) array.compute() @@ -383,45 +393,25 @@ def test_random_fail_from_lists(): divisions=divs, label="from-lists", ) - array.compute() + cast(dak.Array, array).compute() - with pytest.raises(ValueError, match="must be used together"): - array = from_map( - RandomFailFromListsFn(form), - many, - meta=typetracer_array(ak.Array(many[0])), - divisions=divs, - label="from-lists", - empty_on_raise=(OSError,), - ) - with pytest.raises(ValueError, match="must be used together"): - array = from_map( - RandomFailFromListsFn(form), - many, - meta=typetracer_array(ak.Array(many[0])), - divisions=divs, - label="from-lists", - empty_backend="cpu", - ) +def test_from_map_fail_with_callbacks(): + from dask_awkward.lib.testutils import RandomFailFromListsFn - class NoMockEmpty: - def __init__(self, x): - self.x = x + single = [[1, 2, 3], [4, 5], [6], [], [1, 2, 3]] + many = [single] * 30 + divs = (0, *np.cumsum(list(map(len, many)))) + form = ak.Array(many[0]).layout.form - def mock(self): - return 5 + array, report = from_map( + RandomFailFromListsFn(form, report=True, allowed_exceptions=(OSError,)), + many, + meta=typetracer_array(ak.Array(many[0])), + divisions=divs, + label="from-lists", + ) - def __call__(self, *args): - return self.x * args[0] + _, rep = dask.compute(array, report) - with pytest.raises(ValueError, match="must implement"): - array = from_map( - NoMockEmpty(5), - many, - meta=typetracer_array(ak.Array(many[0])), - divisions=divs, - label="from-lists", - empty_on_raise=(RuntimeError,), - empty_backend="cpu", - ) + assert "OSError" in rep.exception.tolist() diff --git a/tests/test_parquet.py b/tests/test_parquet.py index f26cd2e9a..3206703ae 100644 --- a/tests/test_parquet.py +++ b/tests/test_parquet.py @@ -9,6 +9,7 @@ pytest.importorskip("aiohttp") import awkward as ak +import dask import fsspec import pyarrow as pa import pyarrow.dataset as pad @@ -201,3 +202,15 @@ def test_to_parquet_with_prefix( assert fname.startswith(f"{prefix}") else: assert fname.startswith("part") + + +def test_from_parquet_with_report( + daa: dak.Array, + tmp_path_factory: pytest.TempPathFactory, +) -> None: + p = str(tmp_path_factory.mktemp("from_pq_with_report")) + dak.to_parquet(daa, p) + ds, report = dak.from_parquet(p, report=True) + c_ds, c_report = dask.compute(dak.max(ds.points.x, axis=1), report) + assert len(c_ds) + assert c_report.columns.tolist()[0] == ["points.list.item.x"]