From f447df4ede584dbeb6efda1fe97cadcd419901b1 Mon Sep 17 00:00:00 2001 From: Doug Davis Date: Fri, 10 Nov 2023 10:55:37 -0600 Subject: [PATCH] feat: support returning empty arrays in `from_map` IO function calls based on user declared exceptions (#400) --- pyproject.toml | 2 +- src/dask_awkward/layers/layers.py | 24 ++++++- src/dask_awkward/lib/io/columnar.py | 13 +++- src/dask_awkward/lib/io/io.py | 101 +++++++++++++++++++++------- src/dask_awkward/lib/testutils.py | 23 +++++++ tests/test_io.py | 65 +++++++++++++++++- 6 files changed, 197 insertions(+), 31 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index f5e4d29b..3d50be57 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -48,7 +48,7 @@ Homepage = "https://github.com/dask-contrib/dask-awkward" [project.optional-dependencies] io = [ - "pyarrow;python_version<\"3.12\"", + "pyarrow", ] complete = [ "dask-awkward[io]", diff --git a/src/dask_awkward/layers/layers.py b/src/dask_awkward/layers/layers.py index 909dff55..eef1a81a 100644 --- a/src/dask_awkward/layers/layers.py +++ b/src/dask_awkward/layers/layers.py @@ -2,11 +2,12 @@ import copy from collections.abc import Callable, Mapping -from typing import TYPE_CHECKING, Any, Protocol, TypeVar, cast +from typing import TYPE_CHECKING, Any, Literal, Protocol, TypeVar, Union, cast from dask.blockwise import Blockwise, BlockwiseDepDict, blockwise_token from dask.highlevelgraph import MaterializedLayer from dask.layers import DataFrameTreeReduction +from typing_extensions import TypeAlias from dask_awkward.utils import LazyInputsDict @@ -15,6 +16,9 @@ from awkward._nplikes.typetracer import TypeTracerReport +BackendT: TypeAlias = Union[Literal["cpu"], Literal["jax"], Literal["cuda"]] + + class AwkwardBlockwiseLayer(Blockwise): """Just like upstream Blockwise, except we override pickling""" @@ -55,6 +59,9 @@ class ImplementsMocking(Protocol): def mock(self) -> AwkwardArray: ... + def mock_empty(self, backend: BackendT) -> AwkwardArray: + ... + class ImplementsProjection(ImplementsMocking, Protocol[T]): def prepare_for_projection(self) -> tuple[AwkwardArray, TypeTracerReport, T]: @@ -86,13 +93,26 @@ def mock(self) -> AwkwardArray: assert self._meta is not None return self._meta + def mock_empty(self, backend: BackendT = "cpu") -> AwkwardArray: + import awkward as ak + + if backend not in ("cpu", "jax", "cuda"): + raise ValueError( + f"backend must be one of 'cpu', 'jax', or 'cuda', received {backend}" + ) + return ak.to_backend( + self.mock().layout.form.length_zero_array(highlevel=False), + backend=backend, + highlevel=True, + ) + def io_func_implements_projection(func: ImplementsIOFunction) -> bool: return hasattr(func, "prepare_for_projection") def io_func_implements_mocking(func: ImplementsIOFunction) -> bool: - return hasattr(func, "mock") + return hasattr(func, "mock") and hasattr(func, "mock_empty") def io_func_implements_columnar(func: ImplementsIOFunction) -> bool: diff --git a/src/dask_awkward/lib/io/columnar.py b/src/dask_awkward/lib/io/columnar.py index fed93f1f..2fee74f9 100644 --- a/src/dask_awkward/lib/io/columnar.py +++ b/src/dask_awkward/lib/io/columnar.py @@ -7,7 +7,11 @@ from awkward import Array as AwkwardArray from awkward.forms import Form -from dask_awkward.layers.layers import ImplementsIOFunction, ImplementsNecessaryColumns +from dask_awkward.layers.layers import ( + BackendT, + ImplementsIOFunction, + ImplementsNecessaryColumns, +) from dask_awkward.lib.utils import ( METADATA_ATTRIBUTES, FormStructure, @@ -58,6 +62,13 @@ class ColumnProjectionMixin(ImplementsNecessaryColumns[FormStructure]): def mock(self: S) -> AwkwardArray: return ak.typetracer.typetracer_from_form(self.form, behavior=self.behavior) + def mock_empty(self: S, backend: BackendT = "cpu") -> AwkwardArray: + return ak.to_backend( + self.form.length_zero_array(highlevel=False), + backend, + highlevel=True, + ) + def prepare_for_projection( self: S, ) -> tuple[AwkwardArray, TypeTracerReport, FormStructure]: diff --git a/src/dask_awkward/lib/io/io.py b/src/dask_awkward/lib/io/io.py index 6153021b..861cff4a 100644 --- a/src/dask_awkward/lib/io/io.py +++ b/src/dask_awkward/lib/io/io.py @@ -1,26 +1,34 @@ from __future__ import annotations +import functools +import logging import math -import warnings -from collections.abc import Iterable +from collections.abc import Callable, Iterable from dataclasses import dataclass -from typing import TYPE_CHECKING, Any, Callable, cast +from typing import TYPE_CHECKING, Any, cast import awkward as ak import numpy as np from awkward.types.numpytype import primitive_to_dtype +from awkward.typetracer import length_zero_if_typetracer from dask.base import flatten, tokenize from dask.highlevelgraph import HighLevelGraph from dask.utils import funcname, is_integer, parse_bytes from fsspec.utils import infer_compression -from dask_awkward.layers import ( +try: + from distributed.queues import Queue + from distributed.worker import get_worker +except ImportError: + Queue = None + get_worker = None + + +from dask_awkward.layers.layers import ( AwkwardBlockwiseLayer, AwkwardInputLayer, - ImplementsIOFunction, -) -from dask_awkward.layers.layers import ( AwkwardMaterializedLayer, + BackendT, ImplementsMocking, IOFunctionWithMocking, io_func_implements_mocking, @@ -42,12 +50,15 @@ from dask_awkward.lib.core import Array +logger = logging.getLogger(__name__) + + class _FromAwkwardFn: def __init__(self, arr: ak.Array) -> None: self.arr = arr def __call__(self, start: int, stop: int, **kwargs: Any) -> ak.Array: - return self.arr[start:stop] + return cast(ak.Array, self.arr[start:stop]) def from_awkward( @@ -111,7 +122,7 @@ def __call__(self, x: list) -> ak.Array: return ak.Array(x, behavior=self.behavior) -def from_lists(source: list, behavior: dict | None = None) -> Array: +def from_lists(source: list) -> Array: """Create an Array collection from a list of lists. Parameters @@ -140,11 +151,10 @@ def from_lists(source: list, behavior: dict | None = None) -> Array: lists = list(source) divs = (0, *np.cumsum(list(map(len, lists)))) return from_map( - _FromListsFn(behavior=behavior), + _FromListsFn(), lists, meta=typetracer_array(ak.Array(lists[0])), divisions=divs, - behavior=behavior, label="from-lists", ) @@ -409,6 +419,7 @@ def to_dataframe( """ import dask + from dask.dataframe.core import DataFrame as DaskDataFrame from dask.dataframe.core import new_dd_object if optimize_graph: @@ -420,14 +431,15 @@ def to_dataframe( label="to-dataframe", **kwargs, ) - meta = ak.to_dataframe( - ak.typetracer.length_zero_if_typetracer(array._meta), **kwargs - ) - return new_dd_object( - intermediate.dask, - intermediate.name, - meta, - intermediate.divisions, + meta = ak.to_dataframe(length_zero_if_typetracer(array._meta), **kwargs) + return cast( + DaskDataFrame, + new_dd_object( + intermediate.dask, + intermediate.name, + meta, + intermediate.divisions, + ), ) @@ -460,15 +472,41 @@ def __call__(self, packed_arg): ) +def return_empty_on_raise( + fn: Callable, + allowed_exceptions: tuple[type[BaseException], ...], + backend: BackendT, +) -> Callable: + @functools.wraps(fn) + def wrapped(*args, **kwargs): + try: + return fn(*args, **kwargs) + except allowed_exceptions as err: + logmsg = ( + "%s call failed with args %s and kwargs %s; empty array returned. %s" + % ( + str(fn), + str(args), + str(kwargs), + str(err), + ) + ) + logger.info(logmsg) + return fn.mock_empty(backend) + + return wrapped + + def from_map( - func: ImplementsIOFunction, + func: Callable, *iterables: Iterable, args: tuple[Any, ...] | None = None, label: str | None = None, token: str | None = None, divisions: tuple[int, ...] | tuple[None, ...] | None = None, meta: ak.Array | None = None, - behavior: dict | None = None, + empty_on_raise: tuple[type[BaseException], ...] | None = None, + empty_backend: BackendT | None = None, **kwargs: Any, ) -> Array: """Create an Array collection from a custom mapping. @@ -488,11 +526,17 @@ def from_map( collection-key names. token : str, optional String to use as the "token" in the output collection-key names. - divisions : tuple[int | None, ...], optional + divisions : tuple[int, ...] | tuple[None, ...], optional Partition boundaries (if known). meta : Array, optional Collection metadata array, if known (the awkward-array type tracer) + empty_on_raise : tuple[type[BaseException], ...], optional + Set of exceptions that can be caught to return an empty array + at compute time if file IO raises. + empty_backend : str, + The backend for the empty array resulting from a failed read + when `empty_on_raise` is defined. **kwargs : Any Keyword arguments passed to `func`. @@ -574,13 +618,18 @@ def from_map( io_func = func array_meta = None - dsk = AwkwardInputLayer(name=name, inputs=inputs, io_func=io_func) + if (empty_on_raise and not empty_backend) or (empty_backend and not empty_on_raise): + raise ValueError("empty_on_raise and empty_backend must be used together.") - if behavior is not None: - warnings.warn( - "The `behavior` argument is deprecated for `from_map`, and consequently ignored." + if empty_on_raise and empty_backend: + io_func = return_empty_on_raise( + io_func, + allowed_exceptions=empty_on_raise, + backend=empty_backend, ) + dsk = AwkwardInputLayer(name=name, inputs=inputs, io_func=io_func) + hlg = HighLevelGraph.from_collections(name, dsk) if divisions is not None: result = new_array_object(hlg, name, meta=array_meta, divisions=divisions) diff --git a/src/dask_awkward/lib/testutils.py b/src/dask_awkward/lib/testutils.py index 6d08e928..730e3d97 100644 --- a/src/dask_awkward/lib/testutils.py +++ b/src/dask_awkward/lib/testutils.py @@ -5,6 +5,7 @@ import awkward as ak import numpy as np +from awkward.typetracer import typetracer_from_form from dask.base import is_dask_collection from packaging.version import Version @@ -223,3 +224,25 @@ def unnamed_root_ds() -> Array: ], ] return ak.Array(ds * 3) + + +class RandomFailFromListsFn: + def __init__(self, form): + self.form = form + + def __call__(self, x: list) -> ak.Array: + n = random.randint(0, 9) + if n < 5: + raise OSError("BAD!") + + return ak.Array(x) + + def mock(self): + return typetracer_from_form(self.form) + + def mock_empty(self, backend="cpu"): + return ak.to_backend( + self.form.length_zero_array(highlevel=False), + backend=backend, + highlevel=True, + ) diff --git a/tests/test_io.py b/tests/test_io.py index dd149482..3ee3cf2b 100644 --- a/tests/test_io.py +++ b/tests/test_io.py @@ -11,7 +11,8 @@ from numpy.typing import DTypeLike import dask_awkward as dak -from dask_awkward.lib.io.io import _bytes_with_sample +from dask_awkward.lib.core import typetracer_array +from dask_awkward.lib.io.io import _bytes_with_sample, from_map from dask_awkward.lib.testutils import assert_eq @@ -335,3 +336,65 @@ def test_bytes_with_sample( assert sample_bytes == b"" else: assert len(sample_bytes) == 127 + + +def test_random_fail_from_lists(): + from dask_awkward.lib.testutils import RandomFailFromListsFn + + single = [[1, 2, 3], [4, 5], [6], [], [1, 2, 3]] + many = [single] * 30 + divs = (0, *np.cumsum(list(map(len, many)))) + form = ak.Array(many[0]).layout.form + + array = from_map( + RandomFailFromListsFn(form), + many, + meta=typetracer_array(ak.Array(many[0])), + divisions=divs, + label="from-lists", + empty_on_raise=(OSError,), + empty_backend="cpu", + ) + assert len(array.compute()) < (len(single) * len(many)) + + with pytest.raises(OSError, match="BAD"): + array = from_map( + RandomFailFromListsFn(form), + many, + meta=typetracer_array(ak.Array(many[0])), + divisions=divs, + label="from-lists", + empty_on_raise=(RuntimeError,), + empty_backend="cpu", + ) + array.compute() + + with pytest.raises(OSError, match="BAD"): + array = from_map( + RandomFailFromListsFn(form), + many, + meta=typetracer_array(ak.Array(many[0])), + divisions=divs, + label="from-lists", + ) + array.compute() + + with pytest.raises(ValueError, match="must be used together"): + array = from_map( + RandomFailFromListsFn(form), + many, + meta=typetracer_array(ak.Array(many[0])), + divisions=divs, + label="from-lists", + empty_on_raise=(OSError,), + ) + + with pytest.raises(ValueError, match="must be used together"): + array = from_map( + RandomFailFromListsFn(form), + many, + meta=typetracer_array(ak.Array(many[0])), + divisions=divs, + label="from-lists", + empty_backend="cpu", + )