Skip to content

Commit

Permalink
use a Queue for failed read 'report'
Browse files Browse the repository at this point in the history
  • Loading branch information
douglasdavis committed Nov 9, 2023
1 parent a1b8259 commit 1e23c84
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 31 deletions.
40 changes: 31 additions & 9 deletions src/dask_awkward/lib/io/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,18 @@
import awkward as ak
import numpy as np
from awkward.types.numpytype import primitive_to_dtype
from awkward.typetracer import length_zero_if_typetracer
from dask.base import flatten, tokenize
from dask.highlevelgraph import HighLevelGraph
from dask.utils import funcname, is_integer, parse_bytes
from fsspec.utils import infer_compression

try:
from distributed.queues import Queue
except ImportError:
Queue = None # type: ignore


from dask_awkward.layers.layers import (
AwkwardBlockwiseLayer,
AwkwardInputLayer,
Expand Down Expand Up @@ -417,14 +424,15 @@ def to_dataframe(
label="to-dataframe",
**kwargs,
)
meta = ak.to_dataframe(
ak.typetracer.length_zero_if_typetracer(array._meta), **kwargs
)
return new_dd_object(
intermediate.dask,
intermediate.name,
meta,
intermediate.divisions,
meta = ak.to_dataframe(length_zero_if_typetracer(array._meta), **kwargs)
return cast(
DaskDataFrame,
new_dd_object(
intermediate.dask,
intermediate.name,
meta,
intermediate.divisions,
),
)


Expand Down Expand Up @@ -466,12 +474,26 @@ def return_empty_on_raise(
def wrapped(*args, **kwargs):
try:
return fn(*args, **kwargs)
except allowed_exceptions:
except allowed_exceptions as err:
if Queue is not None:
queue = Queue("dak_returned_empty")
queue.put((args, kwargs, str(err)))

return fn.mock_empty(backend)

return wrapped


def returned_empty_report() -> list[Any]:
if Queue is None:
return []
queue = Queue("dak_returned_empty")
things = []
while queue.qsize():
things.append(queue.get())
return things


def from_map(
func: Callable,
*iterables: Iterable,
Expand Down
23 changes: 23 additions & 0 deletions src/dask_awkward/lib/testutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import awkward as ak
import numpy as np
from awkward.typetracer import typetracer_from_form
from dask.base import is_dask_collection
from packaging.version import Version

Expand Down Expand Up @@ -223,3 +224,25 @@ def unnamed_root_ds() -> Array:
],
]
return ak.Array(ds * 3)


class RandomFailFromListsFn:
def __init__(self, form):
self.form = form

def __call__(self, x: list) -> ak.Array:
n = random.randint(0, 9)
if n < 5:
raise OSError("BAD!")

return ak.Array(x)

def mock(self):
return typetracer_from_form(self.form)

def mock_empty(self, backend="cpu"):
return ak.to_backend(
self.form.length_zero_array(highlevel=False),
backend=backend,
highlevel=True,
)
24 changes: 2 additions & 22 deletions tests/test_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -339,29 +339,9 @@ def test_bytes_with_sample(
assert len(sample_bytes) == 127


class RandomFailFromListsFn:
def __init__(self, form):
self.form = form

def __call__(self, x: list) -> ak.Array:
n = random.randint(0, 9)
if n < 5:
raise OSError("BAD!")

return ak.Array(x)

def mock(self):
return ak.typetracer.typetracer_from_form(self.form)

def mock_empty(self, backend="cpu"):
return ak.to_backend(
self.form.length_zero_array(highlevel=False),
backend=backend,
highlevel=True,
)


def test_random_fail_from_lists():
from dask_awkward.lib.testutils import RandomFailFromListsFn

single = [[1, 2, 3], [4, 5], [6], [], [1, 2, 3]]
many = [single] * 30
divs = (0, *np.cumsum(list(map(len, many))))
Expand Down

0 comments on commit 1e23c84

Please sign in to comment.