diff --git a/src/dask_awkward/lib/io/io.py b/src/dask_awkward/lib/io/io.py index 9a55ddc1..2b2ac8c2 100644 --- a/src/dask_awkward/lib/io/io.py +++ b/src/dask_awkward/lib/io/io.py @@ -9,11 +9,18 @@ import awkward as ak import numpy as np from awkward.types.numpytype import primitive_to_dtype +from awkward.typetracer import length_zero_if_typetracer from dask.base import flatten, tokenize from dask.highlevelgraph import HighLevelGraph from dask.utils import funcname, is_integer, parse_bytes from fsspec.utils import infer_compression +try: + from distributed.queues import Queue +except ImportError: + Queue = None # type: ignore + + from dask_awkward.layers.layers import ( AwkwardBlockwiseLayer, AwkwardInputLayer, @@ -417,14 +424,15 @@ def to_dataframe( label="to-dataframe", **kwargs, ) - meta = ak.to_dataframe( - ak.typetracer.length_zero_if_typetracer(array._meta), **kwargs - ) - return new_dd_object( - intermediate.dask, - intermediate.name, - meta, - intermediate.divisions, + meta = ak.to_dataframe(length_zero_if_typetracer(array._meta), **kwargs) + return cast( + DaskDataFrame, + new_dd_object( + intermediate.dask, + intermediate.name, + meta, + intermediate.divisions, + ), ) @@ -466,12 +474,26 @@ def return_empty_on_raise( def wrapped(*args, **kwargs): try: return fn(*args, **kwargs) - except allowed_exceptions: + except allowed_exceptions as err: + if Queue is not None: + queue = Queue("dak_returned_empty") + queue.put((args, kwargs, str(err))) + return fn.mock_empty(backend) return wrapped +def returned_empty_report() -> list[Any]: + if Queue is None: + return [] + queue = Queue("dak_returned_empty") + things = [] + while queue.qsize(): + things.append(queue.get()) + return things + + def from_map( func: Callable, *iterables: Iterable, diff --git a/src/dask_awkward/lib/testutils.py b/src/dask_awkward/lib/testutils.py index 6d08e928..730e3d97 100644 --- a/src/dask_awkward/lib/testutils.py +++ b/src/dask_awkward/lib/testutils.py @@ -5,6 +5,7 @@ import awkward as ak import numpy as np +from awkward.typetracer import typetracer_from_form from dask.base import is_dask_collection from packaging.version import Version @@ -223,3 +224,25 @@ def unnamed_root_ds() -> Array: ], ] return ak.Array(ds * 3) + + +class RandomFailFromListsFn: + def __init__(self, form): + self.form = form + + def __call__(self, x: list) -> ak.Array: + n = random.randint(0, 9) + if n < 5: + raise OSError("BAD!") + + return ak.Array(x) + + def mock(self): + return typetracer_from_form(self.form) + + def mock_empty(self, backend="cpu"): + return ak.to_backend( + self.form.length_zero_array(highlevel=False), + backend=backend, + highlevel=True, + ) diff --git a/tests/test_io.py b/tests/test_io.py index a1d553f1..1e622f5e 100644 --- a/tests/test_io.py +++ b/tests/test_io.py @@ -339,29 +339,9 @@ def test_bytes_with_sample( assert len(sample_bytes) == 127 -class RandomFailFromListsFn: - def __init__(self, form): - self.form = form - - def __call__(self, x: list) -> ak.Array: - n = random.randint(0, 9) - if n < 5: - raise OSError("BAD!") - - return ak.Array(x) - - def mock(self): - return ak.typetracer.typetracer_from_form(self.form) - - def mock_empty(self, backend="cpu"): - return ak.to_backend( - self.form.length_zero_array(highlevel=False), - backend=backend, - highlevel=True, - ) - - def test_random_fail_from_lists(): + from dask_awkward.lib.testutils import RandomFailFromListsFn + single = [[1, 2, 3], [4, 5], [6], [], [1, 2, 3]] many = [single] * 30 divs = (0, *np.cumsum(list(map(len, many))))