Skip to content

Commit

Permalink
feat: support returning empty arrays in from_map IO function calls …
Browse files Browse the repository at this point in the history
…based on user declared exceptions (#400)
  • Loading branch information
douglasdavis authored Nov 10, 2023
1 parent 5ddc7b7 commit f447df4
Show file tree
Hide file tree
Showing 6 changed files with 197 additions and 31 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ Homepage = "https://github.com/dask-contrib/dask-awkward"

[project.optional-dependencies]
io = [
"pyarrow;python_version<\"3.12\"",
"pyarrow",
]
complete = [
"dask-awkward[io]",
Expand Down
24 changes: 22 additions & 2 deletions src/dask_awkward/layers/layers.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@

import copy
from collections.abc import Callable, Mapping
from typing import TYPE_CHECKING, Any, Protocol, TypeVar, cast
from typing import TYPE_CHECKING, Any, Literal, Protocol, TypeVar, Union, cast

from dask.blockwise import Blockwise, BlockwiseDepDict, blockwise_token
from dask.highlevelgraph import MaterializedLayer
from dask.layers import DataFrameTreeReduction
from typing_extensions import TypeAlias

from dask_awkward.utils import LazyInputsDict

Expand All @@ -15,6 +16,9 @@
from awkward._nplikes.typetracer import TypeTracerReport


BackendT: TypeAlias = Union[Literal["cpu"], Literal["jax"], Literal["cuda"]]


class AwkwardBlockwiseLayer(Blockwise):
"""Just like upstream Blockwise, except we override pickling"""

Expand Down Expand Up @@ -55,6 +59,9 @@ class ImplementsMocking(Protocol):
def mock(self) -> AwkwardArray:
...

def mock_empty(self, backend: BackendT) -> AwkwardArray:
...


class ImplementsProjection(ImplementsMocking, Protocol[T]):
def prepare_for_projection(self) -> tuple[AwkwardArray, TypeTracerReport, T]:
Expand Down Expand Up @@ -86,13 +93,26 @@ def mock(self) -> AwkwardArray:
assert self._meta is not None
return self._meta

def mock_empty(self, backend: BackendT = "cpu") -> AwkwardArray:
import awkward as ak

if backend not in ("cpu", "jax", "cuda"):
raise ValueError(
f"backend must be one of 'cpu', 'jax', or 'cuda', received {backend}"
)
return ak.to_backend(
self.mock().layout.form.length_zero_array(highlevel=False),
backend=backend,
highlevel=True,
)


def io_func_implements_projection(func: ImplementsIOFunction) -> bool:
return hasattr(func, "prepare_for_projection")


def io_func_implements_mocking(func: ImplementsIOFunction) -> bool:
return hasattr(func, "mock")
return hasattr(func, "mock") and hasattr(func, "mock_empty")


def io_func_implements_columnar(func: ImplementsIOFunction) -> bool:
Expand Down
13 changes: 12 additions & 1 deletion src/dask_awkward/lib/io/columnar.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,11 @@
from awkward import Array as AwkwardArray
from awkward.forms import Form

from dask_awkward.layers.layers import ImplementsIOFunction, ImplementsNecessaryColumns
from dask_awkward.layers.layers import (
BackendT,
ImplementsIOFunction,
ImplementsNecessaryColumns,
)
from dask_awkward.lib.utils import (
METADATA_ATTRIBUTES,
FormStructure,
Expand Down Expand Up @@ -58,6 +62,13 @@ class ColumnProjectionMixin(ImplementsNecessaryColumns[FormStructure]):
def mock(self: S) -> AwkwardArray:
return ak.typetracer.typetracer_from_form(self.form, behavior=self.behavior)

def mock_empty(self: S, backend: BackendT = "cpu") -> AwkwardArray:
return ak.to_backend(
self.form.length_zero_array(highlevel=False),
backend,
highlevel=True,
)

def prepare_for_projection(
self: S,
) -> tuple[AwkwardArray, TypeTracerReport, FormStructure]:
Expand Down
101 changes: 75 additions & 26 deletions src/dask_awkward/lib/io/io.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,34 @@
from __future__ import annotations

import functools
import logging
import math
import warnings
from collections.abc import Iterable
from collections.abc import Callable, Iterable
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any, Callable, cast
from typing import TYPE_CHECKING, Any, cast

import awkward as ak
import numpy as np
from awkward.types.numpytype import primitive_to_dtype
from awkward.typetracer import length_zero_if_typetracer
from dask.base import flatten, tokenize
from dask.highlevelgraph import HighLevelGraph
from dask.utils import funcname, is_integer, parse_bytes
from fsspec.utils import infer_compression

from dask_awkward.layers import (
try:
from distributed.queues import Queue
from distributed.worker import get_worker
except ImportError:
Queue = None
get_worker = None


from dask_awkward.layers.layers import (
AwkwardBlockwiseLayer,
AwkwardInputLayer,
ImplementsIOFunction,
)
from dask_awkward.layers.layers import (
AwkwardMaterializedLayer,
BackendT,
ImplementsMocking,
IOFunctionWithMocking,
io_func_implements_mocking,
Expand All @@ -42,12 +50,15 @@
from dask_awkward.lib.core import Array


logger = logging.getLogger(__name__)


class _FromAwkwardFn:
def __init__(self, arr: ak.Array) -> None:
self.arr = arr

def __call__(self, start: int, stop: int, **kwargs: Any) -> ak.Array:
return self.arr[start:stop]
return cast(ak.Array, self.arr[start:stop])


def from_awkward(
Expand Down Expand Up @@ -111,7 +122,7 @@ def __call__(self, x: list) -> ak.Array:
return ak.Array(x, behavior=self.behavior)


def from_lists(source: list, behavior: dict | None = None) -> Array:
def from_lists(source: list) -> Array:
"""Create an Array collection from a list of lists.
Parameters
Expand Down Expand Up @@ -140,11 +151,10 @@ def from_lists(source: list, behavior: dict | None = None) -> Array:
lists = list(source)
divs = (0, *np.cumsum(list(map(len, lists))))
return from_map(
_FromListsFn(behavior=behavior),
_FromListsFn(),
lists,
meta=typetracer_array(ak.Array(lists[0])),
divisions=divs,
behavior=behavior,
label="from-lists",
)

Expand Down Expand Up @@ -409,6 +419,7 @@ def to_dataframe(
"""
import dask
from dask.dataframe.core import DataFrame as DaskDataFrame
from dask.dataframe.core import new_dd_object

if optimize_graph:
Expand All @@ -420,14 +431,15 @@ def to_dataframe(
label="to-dataframe",
**kwargs,
)
meta = ak.to_dataframe(
ak.typetracer.length_zero_if_typetracer(array._meta), **kwargs
)
return new_dd_object(
intermediate.dask,
intermediate.name,
meta,
intermediate.divisions,
meta = ak.to_dataframe(length_zero_if_typetracer(array._meta), **kwargs)
return cast(
DaskDataFrame,
new_dd_object(
intermediate.dask,
intermediate.name,
meta,
intermediate.divisions,
),
)


Expand Down Expand Up @@ -460,15 +472,41 @@ def __call__(self, packed_arg):
)


def return_empty_on_raise(
fn: Callable,
allowed_exceptions: tuple[type[BaseException], ...],
backend: BackendT,
) -> Callable:
@functools.wraps(fn)
def wrapped(*args, **kwargs):
try:
return fn(*args, **kwargs)
except allowed_exceptions as err:
logmsg = (
"%s call failed with args %s and kwargs %s; empty array returned. %s"
% (
str(fn),
str(args),
str(kwargs),
str(err),
)
)
logger.info(logmsg)
return fn.mock_empty(backend)

return wrapped


def from_map(
func: ImplementsIOFunction,
func: Callable,
*iterables: Iterable,
args: tuple[Any, ...] | None = None,
label: str | None = None,
token: str | None = None,
divisions: tuple[int, ...] | tuple[None, ...] | None = None,
meta: ak.Array | None = None,
behavior: dict | None = None,
empty_on_raise: tuple[type[BaseException], ...] | None = None,
empty_backend: BackendT | None = None,
**kwargs: Any,
) -> Array:
"""Create an Array collection from a custom mapping.
Expand All @@ -488,11 +526,17 @@ def from_map(
collection-key names.
token : str, optional
String to use as the "token" in the output collection-key names.
divisions : tuple[int | None, ...], optional
divisions : tuple[int, ...] | tuple[None, ...], optional
Partition boundaries (if known).
meta : Array, optional
Collection metadata array, if known (the awkward-array type
tracer)
empty_on_raise : tuple[type[BaseException], ...], optional
Set of exceptions that can be caught to return an empty array
at compute time if file IO raises.
empty_backend : str,
The backend for the empty array resulting from a failed read
when `empty_on_raise` is defined.
**kwargs : Any
Keyword arguments passed to `func`.
Expand Down Expand Up @@ -574,13 +618,18 @@ def from_map(
io_func = func
array_meta = None

dsk = AwkwardInputLayer(name=name, inputs=inputs, io_func=io_func)
if (empty_on_raise and not empty_backend) or (empty_backend and not empty_on_raise):
raise ValueError("empty_on_raise and empty_backend must be used together.")

if behavior is not None:
warnings.warn(
"The `behavior` argument is deprecated for `from_map`, and consequently ignored."
if empty_on_raise and empty_backend:
io_func = return_empty_on_raise(
io_func,
allowed_exceptions=empty_on_raise,
backend=empty_backend,
)

dsk = AwkwardInputLayer(name=name, inputs=inputs, io_func=io_func)

hlg = HighLevelGraph.from_collections(name, dsk)
if divisions is not None:
result = new_array_object(hlg, name, meta=array_meta, divisions=divisions)
Expand Down
23 changes: 23 additions & 0 deletions src/dask_awkward/lib/testutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import awkward as ak
import numpy as np
from awkward.typetracer import typetracer_from_form
from dask.base import is_dask_collection
from packaging.version import Version

Expand Down Expand Up @@ -223,3 +224,25 @@ def unnamed_root_ds() -> Array:
],
]
return ak.Array(ds * 3)


class RandomFailFromListsFn:
def __init__(self, form):
self.form = form

def __call__(self, x: list) -> ak.Array:
n = random.randint(0, 9)
if n < 5:
raise OSError("BAD!")

return ak.Array(x)

def mock(self):
return typetracer_from_form(self.form)

def mock_empty(self, backend="cpu"):
return ak.to_backend(
self.form.length_zero_array(highlevel=False),
backend=backend,
highlevel=True,
)
Loading

0 comments on commit f447df4

Please sign in to comment.