Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: update to latest awkward==2.5.0 #407

Merged
merged 15 commits into from
Nov 16, 2023
9 changes: 9 additions & 0 deletions src/dask_awkward/lib/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -1482,6 +1482,7 @@ def new_array_object(
*,
meta: ak.Array | None = None,
behavior: dict | None = None,
attrs: dict | None = None,
npartitions: int | None = None,
divisions: tuple[int, ...] | tuple[None, ...] | None = None,
) -> Array:
Expand All @@ -1500,6 +1501,10 @@ def new_array_object(
typetracer for the new Array. If the configuration option
``awkward.compute-unknown-meta`` is set to ``False``,
undefined `meta` will be assigned an empty typetracer.
behavior : dict, optional
Custom #ak.behavior for the output array.
attrs : dict, optional
Custom attributes for the output array.
npartitions : int, optional
Total number of partitions; if used `divisions` will be a
tuple of length `npartitions` + 1 with all elements``None``.
Expand Down Expand Up @@ -1543,6 +1548,8 @@ def new_array_object(

if behavior is not None:
actual_meta.behavior = behavior
if attrs is not None:
actual_meta.attrs = attrs

out = Array(dsk, name, actual_meta, divs)
if actual_meta.__doc__ != actual_meta.__class__.__doc__:
Expand Down Expand Up @@ -1872,6 +1879,8 @@ def non_trivial_reduction(
keepdims: bool,
mask_identity: bool,
reducer: Callable,
behavior: dict | None = None,
attrs: dict | None = None,
combiner: Callable | None = None,
token: str | None = None,
dtype: Any | None = None,
Expand Down
14 changes: 8 additions & 6 deletions src/dask_awkward/lib/io/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,14 +104,17 @@ def from_awkward(


class _FromListsFn:
def __init__(self, behavior: dict | None = None):
def __init__(self, behavior: dict | None = None, attrs: dict | None = None):
self.behavior = behavior
self.attrs = attrs

def __call__(self, x: list) -> ak.Array:
return ak.Array(x, behavior=self.behavior)
return ak.Array(x, behavior=self.behavior, attrs=self.attrs)


def from_lists(source: list, behavior: dict | None = None) -> Array:
def from_lists(
source: list, behavior: dict | None = None, attrs: dict | None = None
) -> Array:
"""Create an Array collection from a list of lists.

Parameters
Expand Down Expand Up @@ -140,11 +143,10 @@ def from_lists(source: list, behavior: dict | None = None) -> Array:
lists = list(source)
divs = (0, *np.cumsum(list(map(len, lists))))
return from_map(
_FromListsFn(behavior=behavior),
_FromListsFn(behavior=behavior, attrs=attrs),
lists,
meta=typetracer_array(ak.Array(lists[0])),
meta=typetracer_array(ak.Array(lists[0], attrs=attrs, behavior=behavior)),
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@douglasdavis hmm we might need to add a mechanism for passing a type object and/or enforcing this meta - lists could have any type in lists[1], etc.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea I see what you're getting at here. from_lists is meant to be a pretty simple entry point to instantiating a dask-awkward array from non-disk data. I originally wrote it as a simple testing ground when adding the from_map interface and I felt like it wouldn't be a bad addition to the IO API. My thoughts here are to keep it simple and perhaps add a note to the docstring describing the function as expecting the same type for each list.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we can make it a runtime error if the subsequent partitions don't have the anticipated meta?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That seems reasonable to me

divisions=divs,
behavior=behavior,
label="from-lists",
)

Expand Down
1 change: 1 addition & 0 deletions src/dask_awkward/lib/io/json.py
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,7 @@ def from_json(
resize: float = 8,
highlevel: bool = True,
behavior: dict | None = None,
attrs: dict | None = None,
blocksize: int | str | None = None,
delimiter: bytes | None = None,
compression: str | None = "infer",
Expand Down
37 changes: 29 additions & 8 deletions src/dask_awkward/lib/io/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ def __init__(
unnamed_root: bool = False,
original_form: Form | None = None,
behavior: dict | None = None,
attrs: dict | None = None,
**kwargs: Any,
) -> None:
self.fs = fs
Expand All @@ -53,6 +54,7 @@ def __init__(
self.columns = [f".{c}" for c in self.columns]
self.original_form = original_form
self.behavior = behavior
self.attrs = attrs
self.kwargs = kwargs

@abc.abstractmethod
Expand Down Expand Up @@ -111,17 +113,22 @@ def __init__(
)

def __call__(self, source: Any) -> Any:
array = ak_from_parquet._load(
layout = ak_from_parquet._load(
[source],
parquet_columns=self.columns,
subrg=[None],
subform=self.form,
highlevel=True,
highlevel=False,
attrs=None,
behavior=None,
fs=self.fs,
behavior=self.behavior,
**self.kwargs,
)
return ak.Array(unproject_layout(self.original_form, array.layout))
return ak.Array(
unproject_layout(self.original_form, layout),
attrs=self.attrs,
behavior=self.behavior,
)

def project_columns(self, columns):
return _FromParquetFileWiseFn(
Expand All @@ -130,6 +137,7 @@ def project_columns(self, columns):
listsep=self.listsep,
unnamed_root=self.unnamed_root,
original_form=self.form,
attrs=self.attrs,
behavior=self.behavior,
**self.kwargs,
)
Expand All @@ -145,6 +153,7 @@ def __init__(
unnamed_root: bool = False,
original_form: Form | None = None,
behavior: dict | None = None,
attrs: dict | None = None,
**kwargs: Any,
) -> None:
super().__init__(
Expand All @@ -154,24 +163,30 @@ def __init__(
unnamed_root=unnamed_root,
original_form=original_form,
behavior=behavior,
attrs=attrs,
**kwargs,
)

def __call__(self, pair: Any) -> ak.Array:
subrg, source = pair
if isinstance(subrg, int):
subrg = [[subrg]]
array = ak_from_parquet._load(
layout = ak_from_parquet._load(
[source],
parquet_columns=self.columns,
subrg=subrg,
subform=self.form,
highlevel=True,
highlevel=False,
attrs=None,
behavior=None,
fs=self.fs,
behavior=self.behavior,
**self.kwargs,
)
return ak.Array(unproject_layout(self.original_form, array.layout))
return ak.Array(
unproject_layout(self.original_form, layout),
behavior=self.behavior,
attrs=self.attrs,
)

def project_columns(self, columns):
return _FromParquetFragmentWiseFn(
Expand All @@ -180,6 +195,7 @@ def project_columns(self, columns):
unnamed_root=self.unnamed_root,
original_form=self.form,
behavior=self.behavior,
attrs=self.attrs,
**self.kwargs,
)

Expand All @@ -194,6 +210,7 @@ def from_parquet(
generate_bitmasks: bool = False,
highlevel: bool = True,
behavior: dict | None = None,
attrs: dict | None = None,
ignore_metadata: bool = True,
scan_files: bool = False,
split_row_groups: bool | None = False,
Expand Down Expand Up @@ -263,6 +280,8 @@ def from_parquet(
ignore_metadata,
scan_files,
split_row_groups,
behavior,
attrs,
)

(
Expand Down Expand Up @@ -307,6 +326,7 @@ def from_parquet(
footer_sample_size=footer_sample_size,
generate_bitmasks=generate_bitmasks,
behavior=behavior,
attrs=attrs,
),
actual_paths,
label=label,
Expand Down Expand Up @@ -345,6 +365,7 @@ def from_parquet(
footer_sample_size=footer_sample_size,
generate_bitmasks=generate_bitmasks,
behavior=behavior,
attrs=attrs,
),
pairs,
label=label,
Expand Down
5 changes: 3 additions & 2 deletions src/dask_awkward/lib/operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ def concatenate(
mergebool: bool = True,
highlevel: bool = True,
behavior: dict | None = None,
attrs: dict | None = None,
) -> Array:
label = "concatenate"
token = tokenize(arrays, axis, mergebool, highlevel, behavior)
Expand All @@ -49,7 +50,7 @@ def concatenate(
g[(name, i)] = k
i += 1

meta = ak.concatenate(metas)
meta = ak.concatenate(metas, behavior=behavior, attrs=attrs)
assert isinstance(meta, ak.Array)

prev_names = [iarr.name for iarr in arrays]
Expand All @@ -65,7 +66,7 @@ def concatenate(
if partition_compatibility(*arrays) == PartitionCompatibility.NO:
raise IncompatiblePartitions("concatenate", *arrays)

fn = _ConcatenateFnAxisGT0(axis=axis)
fn = _ConcatenateFnAxisGT0(axis=axis, behavior=behavior, attrs=attrs)
return map_partitions(fn, *arrays)

else:
Expand Down
Loading
Loading