Skip to content

Commit

Permalink
Merge branch 'main' into scalar-dtype-and-meta-handling
Browse files Browse the repository at this point in the history
  • Loading branch information
douglasdavis authored Nov 17, 2023
2 parents 554f040 + e094ac1 commit 4597a3f
Show file tree
Hide file tree
Showing 11 changed files with 562 additions and 242 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ classifiers = [
"Topic :: Software Development",
]
dependencies = [
"awkward >=2.4.5",
"awkward >=2.5.0",
"dask >=2023.04.0",
"typing_extensions >=4.8.0",
]
Expand Down
48 changes: 32 additions & 16 deletions src/dask_awkward/lib/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import operator
import sys
import warnings
from collections.abc import Callable, Hashable, Sequence
from collections.abc import Callable, Hashable, Mapping, Sequence
from enum import IntEnum
from functools import cached_property, partial, wraps
from numbers import Number
Expand All @@ -17,14 +17,14 @@
import dask.config
import numpy as np
from awkward._do import remove_structure as ak_do_remove_structure
from awkward._nplikes.typetracer import (
from awkward.highlevel import NDArrayOperatorsMixin, _dir_pattern
from awkward.typetracer import (
MaybeNone,
OneOf,
TypeTracerArray,
create_unknown_scalar,
is_unknown_scalar,
)
from awkward.highlevel import NDArrayOperatorsMixin, _dir_pattern
from awkward.typetracer import create_unknown_scalar
from dask.base import (
DaskMethodsMixin,
dont_optimize,
Expand Down Expand Up @@ -201,15 +201,11 @@ def __str__(self) -> str:
return f"dask.awkward<{key_split(self.name)}, type=Scalar, dtype={self.dtype}>"

def __getitem__(self, where: Any) -> Any:
token = tokenize(self, operator.getitem, where)
label = "getitem"
name = f"{label}-{token}"
task = AwkwardMaterializedLayer(
{(name, 0): (operator.getitem, self.key, where)},
previous_layer_names=[self.name],
msg = (
"__getitem__ access on Scalars should be done after converting "
"the Scalar collection to delayed with the to_delayed method."
)
hlg = HighLevelGraph.from_collections(name, task, dependencies=[self])
return new_scalar_object(hlg, name, meta=None)
raise NotImplementedError(msg)

@property
def known_value(self) -> Any | None:
Expand Down Expand Up @@ -377,7 +373,7 @@ def new_scalar_object(

if isinstance(meta, MaybeNone):
meta = ak.Array(meta.content)
else:
elif meta is not None:
try:
if ak.backend(meta) != "typetracer":
raise TypeError(
Expand Down Expand Up @@ -848,7 +844,14 @@ def layout(self) -> Content:
raise ValueError("This collection's meta is None; unknown layout.")

@property
def behavior(self) -> dict:
def attrs(self) -> dict:
"""awkward Array attrs dictionary."""
if self._meta is not None:
return self._meta.attrs
raise ValueError("This collection's meta is None; no attrs property available.")

@property
def behavior(self) -> Mapping:
"""awkward Array behavior dictionary."""
if self._meta is not None:
return self._meta.behavior
Expand Down Expand Up @@ -1515,7 +1518,8 @@ def new_array_object(
name: str,
*,
meta: ak.Array | None = None,
behavior: dict | None = None,
behavior: Mapping | None = None,
attrs: Mapping[str, Any] | None = None,
npartitions: int | None = None,
divisions: tuple[int, ...] | tuple[None, ...] | None = None,
) -> Array:
Expand All @@ -1534,6 +1538,10 @@ def new_array_object(
typetracer for the new Array. If the configuration option
``awkward.compute-unknown-meta`` is set to ``False``,
undefined `meta` will be assigned an empty typetracer.
behavior : dict, optional
Custom ak.behavior for the output array.
attrs : dict, optional
Custom attributes for the output array.
npartitions : int, optional
Total number of partitions; if used `divisions` will be a
tuple of length `npartitions` + 1 with all elements``None``.
Expand Down Expand Up @@ -1577,6 +1585,8 @@ def new_array_object(

if behavior is not None:
actual_meta.behavior = behavior
if attrs is not None:
actual_meta.attrs = attrs

out = Array(dsk, name, actual_meta, divs)
if actual_meta.__doc__ != actual_meta.__class__.__doc__:
Expand Down Expand Up @@ -1906,6 +1916,8 @@ def non_trivial_reduction(
keepdims: bool,
mask_identity: bool,
reducer: Callable,
behavior: Mapping | None = None,
attrs: Mapping[str, Any] | None = None,
combiner: Callable | None = None,
token: str | None = None,
dtype: Any | None = None,
Expand Down Expand Up @@ -2254,7 +2266,11 @@ def typetracer_array(a: ak.Array | Array) -> ak.Array:
if isinstance(a, Array):
return a._meta
elif isinstance(a, ak.Array):
return ak.Array(a.layout.to_typetracer(forget_length=True))
return ak.Array(
a.layout.to_typetracer(forget_length=True),
behavior=a._behavior,
attrs=a._attrs,
)
else:
msg = (
"`a` should be an awkward array or a Dask awkward collection.\n"
Expand Down
65 changes: 49 additions & 16 deletions src/dask_awkward/lib/io/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import functools
import logging
import math
from collections.abc import Callable, Iterable
from collections.abc import Callable, Iterable, Mapping
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any, cast

Expand Down Expand Up @@ -65,8 +65,9 @@ def __call__(self, start: int, stop: int, **kwargs: Any) -> ak.Array:
def from_awkward(
source: ak.Array,
npartitions: int,
behavior: dict | None = None,
behavior: Mapping | None = None,
label: str | None = None,
attrs: Mapping[str, Any] | None = None,
) -> Array:
"""Create an Array collection from a concrete :class:`awkward.Array` object.
Expand All @@ -76,8 +77,12 @@ def from_awkward(
The concrete awkward array.
npartitions : int
The total number of partitions for the collection.
behavior : dict, optional
Custom ak.behavior for the output array.
label : str, optional
Label for the task.
attrs : mapping, optional
Custom attributes for the output array.
Returns
-------
Expand Down Expand Up @@ -112,25 +117,35 @@ def from_awkward(
divisions=locs,
meta=meta,
behavior=behavior,
attrs=attrs,
)


class _FromListsFn:
def __init__(self, behavior: dict | None = None):
def __init__(self, behavior: Mapping | None, attrs: Mapping[str, Any] | None):
self.behavior = behavior
self.attrs = attrs

def __call__(self, x: list) -> ak.Array:
return ak.Array(x, behavior=self.behavior)
return ak.Array(x, behavior=self.behavior, attrs=self.attrs)


def from_lists(source: list) -> Array:
def from_lists(
source: list,
behavior: Mapping | None = None,
attrs: Mapping[str, Any] | None = None,
) -> Array:
"""Create an Array collection from a list of lists.
Parameters
----------
source : list[list[Any]]
List of lists, each outer list will become a partition in the
collection.
behavior : dict, optional
Custom ak.behavior for the output array.
attrs : mapping, optional
Custom attributes for the output array.
Returns
-------
Expand All @@ -152,9 +167,9 @@ def from_lists(source: list) -> Array:
lists = list(source)
divs = (0, *np.cumsum(list(map(len, lists))))
return from_map(
_FromListsFn(),
_FromListsFn(behavior=behavior, attrs=attrs),
lists,
meta=typetracer_array(ak.Array(lists[0])),
meta=typetracer_array(ak.Array(lists[0], attrs=attrs, behavior=behavior)),
divisions=divs,
label="from-lists",
)
Expand All @@ -163,9 +178,10 @@ def from_lists(source: list) -> Array:
def from_delayed(
source: list[Delayed] | Delayed,
meta: ak.Array | None = None,
behavior: dict | None = None,
behavior: Mapping | None = None,
divisions: tuple[int, ...] | tuple[None, ...] | None = None,
prefix: str = "from-delayed",
attrs: Mapping[str, Any] | None = None,
) -> Array:
"""Create an Array collection from a set of :class:`~dask.delayed.Delayed` objects.
Expand All @@ -179,10 +195,14 @@ def from_delayed(
Metadata (typetracer array) if known, if ``None`` the first
partition (first element of the list of ``Delayed`` objects)
will be computed to determine the metadata.
behavior : dict, optional
Custom ak.behavior for the output array.
divisions : tuple[int | None, ...], optional
Partition boundaries (if known).
prefix : str
Prefix for the keys in the task graph.
attrs : mapping, optional
Custom attributes for the output array.
Returns
-------
Expand All @@ -206,11 +226,7 @@ def from_delayed(
raise ValueError("divisions must be a tuple of length len(source) + 1")
hlg = HighLevelGraph.from_collections(name, dsk, dependencies=parts)
return new_array_object(
hlg,
name=name,
meta=meta,
behavior=behavior,
divisions=divs,
hlg, name=name, meta=meta, behavior=behavior, divisions=divs, attrs=attrs
)


Expand Down Expand Up @@ -346,13 +362,21 @@ def to_dask_array(
return new_da_object(graph, name, meta=None, chunks=chunks, dtype=dtype)


def from_dask_array(array: DaskArray, behavior: dict | None = None) -> Array:
def from_dask_array(
array: DaskArray,
behavior: Mapping | None = None,
attrs: Mapping[str, Any] | None = None,
) -> Array:
"""Convert a Dask Array collection to a Dask Awkard Array collection.
Parameters
----------
array : dask.array.Array
Array to convert.
behavior : dict, optional
Custom ak.behavior for the output array.
attrs : mapping, optional
Custom attributes for the output array.
Returns
-------
Expand Down Expand Up @@ -389,11 +413,18 @@ def from_dask_array(array: DaskArray, behavior: dict | None = None) -> Array:
hlg = HighLevelGraph.from_collections(name, layer, dependencies=[array])
if np.any(np.isnan(array.chunks)):
return new_array_object(
hlg, name, npartitions=array.npartitions, meta=meta, behavior=behavior
hlg,
name,
npartitions=array.npartitions,
meta=meta,
behavior=behavior,
attrs=attrs,
)
else:
divs = (0, *np.cumsum(array.chunks))
return new_array_object(hlg, name, divisions=divs, meta=meta, behavior=behavior)
return new_array_object(
hlg, name, divisions=divs, meta=meta, behavior=behavior, attrs=attrs
)


def to_dataframe(
Expand Down Expand Up @@ -522,6 +553,8 @@ def from_map(
number of partitions in the output collection (only one
element of each iterable will be passed to `func` for each
partition).
args : tuple
Tuple of positional arguments to append after mapped arguments.
label : str, optional
String to use as the function-name label in the output
collection-key names.
Expand Down
13 changes: 7 additions & 6 deletions src/dask_awkward/lib/io/json.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import abc
import logging
import math
from collections.abc import Callable
from collections.abc import Callable, Mapping
from typing import TYPE_CHECKING, Any, Literal, overload

import awkward as ak
Expand Down Expand Up @@ -43,7 +43,7 @@ def __init__(
form: Form,
compression: str | None = None,
schema: str | dict | list | None = None,
behavior: dict | None = None,
behavior: Mapping | None = None,
**kwargs: Any,
) -> None:
self.compression = compression
Expand Down Expand Up @@ -91,7 +91,7 @@ def __init__(
form: Form,
compression: str | None = None,
schema: str | dict | list | None = None,
behavior: dict | None = None,
behavior: Mapping | None = None,
**kwargs: Any,
) -> None:
super().__init__(
Expand Down Expand Up @@ -125,7 +125,7 @@ def __init__(
form: Form,
compression: str | None = None,
schema: str | dict | list | None = None,
behavior: dict | None = None,
behavior: Mapping | None = None,
**kwargs: Any,
) -> None:
super().__init__(
Expand Down Expand Up @@ -163,7 +163,7 @@ def __init__(
form: Form,
compression: str | None = None,
schema: str | dict | list | None = None,
behavior: dict | None = None,
behavior: Mapping | None = None,
**kwargs: Any,
) -> None:
super().__init__(
Expand Down Expand Up @@ -432,7 +432,8 @@ def from_json(
initial: int = 1024,
resize: float = 8,
highlevel: bool = True,
behavior: dict | None = None,
behavior: Mapping | None = None,
attrs: Mapping[str, Any] | None = None,
blocksize: int | str | None = None,
delimiter: bytes | None = None,
compression: str | None = "infer",
Expand Down
Loading

0 comments on commit 4597a3f

Please sign in to comment.