diff --git a/pyproject.toml b/pyproject.toml index 3d50be57..0d92654b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -36,7 +36,7 @@ classifiers = [ "Topic :: Software Development", ] dependencies = [ - "awkward >=2.4.5", + "awkward >=2.5.0", "dask >=2023.04.0", "typing_extensions >=4.8.0", ] diff --git a/src/dask_awkward/lib/core.py b/src/dask_awkward/lib/core.py index 38507a1e..448f0986 100644 --- a/src/dask_awkward/lib/core.py +++ b/src/dask_awkward/lib/core.py @@ -7,7 +7,7 @@ import operator import sys import warnings -from collections.abc import Callable, Hashable, Sequence +from collections.abc import Callable, Hashable, Mapping, Sequence from enum import IntEnum from functools import cached_property, partial, wraps from numbers import Number @@ -17,14 +17,14 @@ import dask.config import numpy as np from awkward._do import remove_structure as ak_do_remove_structure -from awkward._nplikes.typetracer import ( +from awkward.highlevel import NDArrayOperatorsMixin, _dir_pattern +from awkward.typetracer import ( MaybeNone, OneOf, TypeTracerArray, + create_unknown_scalar, is_unknown_scalar, ) -from awkward.highlevel import NDArrayOperatorsMixin, _dir_pattern -from awkward.typetracer import create_unknown_scalar from dask.base import ( DaskMethodsMixin, dont_optimize, @@ -201,15 +201,11 @@ def __str__(self) -> str: return f"dask.awkward<{key_split(self.name)}, type=Scalar, dtype={self.dtype}>" def __getitem__(self, where: Any) -> Any: - token = tokenize(self, operator.getitem, where) - label = "getitem" - name = f"{label}-{token}" - task = AwkwardMaterializedLayer( - {(name, 0): (operator.getitem, self.key, where)}, - previous_layer_names=[self.name], + msg = ( + "__getitem__ access on Scalars should be done after converting " + "the Scalar collection to delayed with the to_delayed method." ) - hlg = HighLevelGraph.from_collections(name, task, dependencies=[self]) - return new_scalar_object(hlg, name, meta=None) + raise NotImplementedError(msg) @property def known_value(self) -> Any | None: @@ -377,7 +373,7 @@ def new_scalar_object( if isinstance(meta, MaybeNone): meta = ak.Array(meta.content) - else: + elif meta is not None: try: if ak.backend(meta) != "typetracer": raise TypeError( @@ -848,7 +844,14 @@ def layout(self) -> Content: raise ValueError("This collection's meta is None; unknown layout.") @property - def behavior(self) -> dict: + def attrs(self) -> dict: + """awkward Array attrs dictionary.""" + if self._meta is not None: + return self._meta.attrs + raise ValueError("This collection's meta is None; no attrs property available.") + + @property + def behavior(self) -> Mapping: """awkward Array behavior dictionary.""" if self._meta is not None: return self._meta.behavior @@ -1515,7 +1518,8 @@ def new_array_object( name: str, *, meta: ak.Array | None = None, - behavior: dict | None = None, + behavior: Mapping | None = None, + attrs: Mapping[str, Any] | None = None, npartitions: int | None = None, divisions: tuple[int, ...] | tuple[None, ...] | None = None, ) -> Array: @@ -1534,6 +1538,10 @@ def new_array_object( typetracer for the new Array. If the configuration option ``awkward.compute-unknown-meta`` is set to ``False``, undefined `meta` will be assigned an empty typetracer. + behavior : dict, optional + Custom ak.behavior for the output array. + attrs : dict, optional + Custom attributes for the output array. npartitions : int, optional Total number of partitions; if used `divisions` will be a tuple of length `npartitions` + 1 with all elements``None``. @@ -1577,6 +1585,8 @@ def new_array_object( if behavior is not None: actual_meta.behavior = behavior + if attrs is not None: + actual_meta.attrs = attrs out = Array(dsk, name, actual_meta, divs) if actual_meta.__doc__ != actual_meta.__class__.__doc__: @@ -1906,6 +1916,8 @@ def non_trivial_reduction( keepdims: bool, mask_identity: bool, reducer: Callable, + behavior: Mapping | None = None, + attrs: Mapping[str, Any] | None = None, combiner: Callable | None = None, token: str | None = None, dtype: Any | None = None, @@ -2254,7 +2266,11 @@ def typetracer_array(a: ak.Array | Array) -> ak.Array: if isinstance(a, Array): return a._meta elif isinstance(a, ak.Array): - return ak.Array(a.layout.to_typetracer(forget_length=True)) + return ak.Array( + a.layout.to_typetracer(forget_length=True), + behavior=a._behavior, + attrs=a._attrs, + ) else: msg = ( "`a` should be an awkward array or a Dask awkward collection.\n" diff --git a/src/dask_awkward/lib/io/io.py b/src/dask_awkward/lib/io/io.py index eaa26729..56956f27 100644 --- a/src/dask_awkward/lib/io/io.py +++ b/src/dask_awkward/lib/io/io.py @@ -3,7 +3,7 @@ import functools import logging import math -from collections.abc import Callable, Iterable +from collections.abc import Callable, Iterable, Mapping from dataclasses import dataclass from typing import TYPE_CHECKING, Any, cast @@ -65,8 +65,9 @@ def __call__(self, start: int, stop: int, **kwargs: Any) -> ak.Array: def from_awkward( source: ak.Array, npartitions: int, - behavior: dict | None = None, + behavior: Mapping | None = None, label: str | None = None, + attrs: Mapping[str, Any] | None = None, ) -> Array: """Create an Array collection from a concrete :class:`awkward.Array` object. @@ -76,8 +77,12 @@ def from_awkward( The concrete awkward array. npartitions : int The total number of partitions for the collection. + behavior : dict, optional + Custom ak.behavior for the output array. label : str, optional Label for the task. + attrs : mapping, optional + Custom attributes for the output array. Returns ------- @@ -112,18 +117,24 @@ def from_awkward( divisions=locs, meta=meta, behavior=behavior, + attrs=attrs, ) class _FromListsFn: - def __init__(self, behavior: dict | None = None): + def __init__(self, behavior: Mapping | None, attrs: Mapping[str, Any] | None): self.behavior = behavior + self.attrs = attrs def __call__(self, x: list) -> ak.Array: - return ak.Array(x, behavior=self.behavior) + return ak.Array(x, behavior=self.behavior, attrs=self.attrs) -def from_lists(source: list) -> Array: +def from_lists( + source: list, + behavior: Mapping | None = None, + attrs: Mapping[str, Any] | None = None, +) -> Array: """Create an Array collection from a list of lists. Parameters @@ -131,6 +142,10 @@ def from_lists(source: list) -> Array: source : list[list[Any]] List of lists, each outer list will become a partition in the collection. + behavior : dict, optional + Custom ak.behavior for the output array. + attrs : mapping, optional + Custom attributes for the output array. Returns ------- @@ -152,9 +167,9 @@ def from_lists(source: list) -> Array: lists = list(source) divs = (0, *np.cumsum(list(map(len, lists)))) return from_map( - _FromListsFn(), + _FromListsFn(behavior=behavior, attrs=attrs), lists, - meta=typetracer_array(ak.Array(lists[0])), + meta=typetracer_array(ak.Array(lists[0], attrs=attrs, behavior=behavior)), divisions=divs, label="from-lists", ) @@ -163,9 +178,10 @@ def from_lists(source: list) -> Array: def from_delayed( source: list[Delayed] | Delayed, meta: ak.Array | None = None, - behavior: dict | None = None, + behavior: Mapping | None = None, divisions: tuple[int, ...] | tuple[None, ...] | None = None, prefix: str = "from-delayed", + attrs: Mapping[str, Any] | None = None, ) -> Array: """Create an Array collection from a set of :class:`~dask.delayed.Delayed` objects. @@ -179,10 +195,14 @@ def from_delayed( Metadata (typetracer array) if known, if ``None`` the first partition (first element of the list of ``Delayed`` objects) will be computed to determine the metadata. + behavior : dict, optional + Custom ak.behavior for the output array. divisions : tuple[int | None, ...], optional Partition boundaries (if known). prefix : str Prefix for the keys in the task graph. + attrs : mapping, optional + Custom attributes for the output array. Returns ------- @@ -206,11 +226,7 @@ def from_delayed( raise ValueError("divisions must be a tuple of length len(source) + 1") hlg = HighLevelGraph.from_collections(name, dsk, dependencies=parts) return new_array_object( - hlg, - name=name, - meta=meta, - behavior=behavior, - divisions=divs, + hlg, name=name, meta=meta, behavior=behavior, divisions=divs, attrs=attrs ) @@ -346,13 +362,21 @@ def to_dask_array( return new_da_object(graph, name, meta=None, chunks=chunks, dtype=dtype) -def from_dask_array(array: DaskArray, behavior: dict | None = None) -> Array: +def from_dask_array( + array: DaskArray, + behavior: Mapping | None = None, + attrs: Mapping[str, Any] | None = None, +) -> Array: """Convert a Dask Array collection to a Dask Awkard Array collection. Parameters ---------- array : dask.array.Array Array to convert. + behavior : dict, optional + Custom ak.behavior for the output array. + attrs : mapping, optional + Custom attributes for the output array. Returns ------- @@ -389,11 +413,18 @@ def from_dask_array(array: DaskArray, behavior: dict | None = None) -> Array: hlg = HighLevelGraph.from_collections(name, layer, dependencies=[array]) if np.any(np.isnan(array.chunks)): return new_array_object( - hlg, name, npartitions=array.npartitions, meta=meta, behavior=behavior + hlg, + name, + npartitions=array.npartitions, + meta=meta, + behavior=behavior, + attrs=attrs, ) else: divs = (0, *np.cumsum(array.chunks)) - return new_array_object(hlg, name, divisions=divs, meta=meta, behavior=behavior) + return new_array_object( + hlg, name, divisions=divs, meta=meta, behavior=behavior, attrs=attrs + ) def to_dataframe( @@ -522,6 +553,8 @@ def from_map( number of partitions in the output collection (only one element of each iterable will be passed to `func` for each partition). + args : tuple + Tuple of positional arguments to append after mapped arguments. label : str, optional String to use as the function-name label in the output collection-key names. diff --git a/src/dask_awkward/lib/io/json.py b/src/dask_awkward/lib/io/json.py index b20c6f63..c0fbf54f 100644 --- a/src/dask_awkward/lib/io/json.py +++ b/src/dask_awkward/lib/io/json.py @@ -3,7 +3,7 @@ import abc import logging import math -from collections.abc import Callable +from collections.abc import Callable, Mapping from typing import TYPE_CHECKING, Any, Literal, overload import awkward as ak @@ -43,7 +43,7 @@ def __init__( form: Form, compression: str | None = None, schema: str | dict | list | None = None, - behavior: dict | None = None, + behavior: Mapping | None = None, **kwargs: Any, ) -> None: self.compression = compression @@ -91,7 +91,7 @@ def __init__( form: Form, compression: str | None = None, schema: str | dict | list | None = None, - behavior: dict | None = None, + behavior: Mapping | None = None, **kwargs: Any, ) -> None: super().__init__( @@ -125,7 +125,7 @@ def __init__( form: Form, compression: str | None = None, schema: str | dict | list | None = None, - behavior: dict | None = None, + behavior: Mapping | None = None, **kwargs: Any, ) -> None: super().__init__( @@ -163,7 +163,7 @@ def __init__( form: Form, compression: str | None = None, schema: str | dict | list | None = None, - behavior: dict | None = None, + behavior: Mapping | None = None, **kwargs: Any, ) -> None: super().__init__( @@ -432,7 +432,8 @@ def from_json( initial: int = 1024, resize: float = 8, highlevel: bool = True, - behavior: dict | None = None, + behavior: Mapping | None = None, + attrs: Mapping[str, Any] | None = None, blocksize: int | str | None = None, delimiter: bytes | None = None, compression: str | None = "infer", diff --git a/src/dask_awkward/lib/io/parquet.py b/src/dask_awkward/lib/io/parquet.py index e19e7b6f..26e83a52 100644 --- a/src/dask_awkward/lib/io/parquet.py +++ b/src/dask_awkward/lib/io/parquet.py @@ -5,6 +5,7 @@ import logging import math import operator +from collections.abc import Mapping from typing import TYPE_CHECKING, Any, Literal, TypeVar import awkward as ak @@ -41,8 +42,8 @@ def __init__( listsep: str = "list.item", unnamed_root: bool = False, original_form: Form | None = None, - behavior: dict | None = None, - attrs: dict | None = None, + behavior: Mapping | None = None, + attrs: Mapping[str, Any] | None = None, **kwargs: Any, ) -> None: self.fs = fs @@ -99,7 +100,7 @@ def __init__( listsep: str = "list.item", unnamed_root: bool = False, original_form: Form | None = None, - behavior: dict | None = None, + behavior: Mapping | None = None, **kwargs: Any, ) -> None: super().__init__( @@ -113,18 +114,22 @@ def __init__( ) def __call__(self, source: Any) -> Any: - array = ak_from_parquet._load( + layout = ak_from_parquet._load( [source], parquet_columns=self.columns, subrg=[None], subform=self.form, - highlevel=True, + highlevel=False, fs=self.fs, behavior=self.behavior, attrs=self.attrs, **self.kwargs, ) - return ak.Array(unproject_layout(self.original_form, array.layout)) + return ak.Array( + unproject_layout(self.original_form, layout), + attrs=self.attrs, + behavior=self.behavior, + ) def project_columns(self, columns): return _FromParquetFileWiseFn( @@ -133,6 +138,7 @@ def project_columns(self, columns): listsep=self.listsep, unnamed_root=self.unnamed_root, original_form=self.form, + attrs=self.attrs, behavior=self.behavior, **self.kwargs, ) @@ -147,7 +153,8 @@ def __init__( listsep: str = "list.item", unnamed_root: bool = False, original_form: Form | None = None, - behavior: dict | None = None, + behavior: Mapping | None = None, + attrs: Mapping[str, Any] | None = None, **kwargs: Any, ) -> None: super().__init__( @@ -157,6 +164,7 @@ def __init__( unnamed_root=unnamed_root, original_form=original_form, behavior=behavior, + attrs=attrs, **kwargs, ) @@ -164,18 +172,22 @@ def __call__(self, pair: Any) -> ak.Array: subrg, source = pair if isinstance(subrg, int): subrg = [[subrg]] - array = ak_from_parquet._load( + layout = ak_from_parquet._load( [source], parquet_columns=self.columns, subrg=subrg, subform=self.form, - highlevel=True, + highlevel=False, fs=self.fs, behavior=self.behavior, attrs=self.attrs, **self.kwargs, ) - return ak.Array(unproject_layout(self.original_form, array.layout)) + return ak.Array( + unproject_layout(self.original_form, layout), + behavior=self.behavior, + attrs=self.attrs, + ) def project_columns(self, columns): return _FromParquetFragmentWiseFn( @@ -184,6 +196,7 @@ def project_columns(self, columns): unnamed_root=self.unnamed_root, original_form=self.form, behavior=self.behavior, + attrs=self.attrs, **self.kwargs, ) @@ -197,7 +210,8 @@ def from_parquet( footer_sample_size: int = 1_000_000, generate_bitmasks: bool = False, highlevel: bool = True, - behavior: dict | None = None, + behavior: Mapping | None = None, + attrs: Mapping[str, Any] | None = None, ignore_metadata: bool = True, scan_files: bool = False, split_row_groups: bool | None = False, @@ -267,6 +281,8 @@ def from_parquet( ignore_metadata, scan_files, split_row_groups, + behavior, + attrs, ) ( @@ -311,6 +327,7 @@ def from_parquet( footer_sample_size=footer_sample_size, generate_bitmasks=generate_bitmasks, behavior=behavior, + attrs=attrs, ), actual_paths, label=label, @@ -349,6 +366,7 @@ def from_parquet( footer_sample_size=footer_sample_size, generate_bitmasks=generate_bitmasks, behavior=behavior, + attrs=attrs, ), pairs, label=label, diff --git a/src/dask_awkward/lib/operations.py b/src/dask_awkward/lib/operations.py index 32e94479..2074ee2a 100644 --- a/src/dask_awkward/lib/operations.py +++ b/src/dask_awkward/lib/operations.py @@ -1,5 +1,8 @@ from __future__ import annotations +from collections.abc import Mapping +from typing import Any + import awkward as ak from dask.base import tokenize from dask.highlevelgraph import HighLevelGraph @@ -32,7 +35,8 @@ def concatenate( axis: int = 0, mergebool: bool = True, highlevel: bool = True, - behavior: dict | None = None, + behavior: Mapping | None = None, + attrs: Mapping[str, Any] | None = None, ) -> Array: label = "concatenate" token = tokenize(arrays, axis, mergebool, highlevel, behavior) @@ -49,7 +53,7 @@ def concatenate( g[(name, i)] = k i += 1 - meta = ak.concatenate(metas) + meta = ak.concatenate(metas, behavior=behavior, attrs=attrs) assert isinstance(meta, ak.Array) prev_names = [iarr.name for iarr in arrays] @@ -65,7 +69,7 @@ def concatenate( if partition_compatibility(*arrays) == PartitionCompatibility.NO: raise IncompatiblePartitions("concatenate", *arrays) - fn = _ConcatenateFnAxisGT0(axis=axis) + fn = _ConcatenateFnAxisGT0(axis=axis, behavior=behavior, attrs=attrs) return map_partitions(fn, *arrays) else: diff --git a/src/dask_awkward/lib/reducers.py b/src/dask_awkward/lib/reducers.py index 716327cc..c3019778 100644 --- a/src/dask_awkward/lib/reducers.py +++ b/src/dask_awkward/lib/reducers.py @@ -1,5 +1,6 @@ from __future__ import annotations +from collections.abc import Mapping from typing import TYPE_CHECKING, Any import awkward as ak @@ -39,6 +40,8 @@ def all( axis: int | None = None, keepdims: bool = False, mask_identity: bool = False, + behavior: Mapping | None = None, + attrs: Mapping[str, Any] | None = None, ) -> Any: if axis is None or axis == 0 or axis == -1 * array.ndim: return non_trivial_reduction( @@ -49,6 +52,8 @@ def all( is_positional=False, keepdims=keepdims, mask_identity=mask_identity, + behavior=behavior, + attrs=attrs, ) else: return map_partitions( @@ -58,6 +63,8 @@ def all( axis=axis, keepdims=keepdims, mask_identity=mask_identity, + behavior=behavior, + attrs=attrs, ) @@ -67,6 +74,8 @@ def any( axis: int | None = None, keepdims: bool = False, mask_identity: bool = False, + behavior: Mapping | None = None, + attrs: Mapping[str, Any] | None = None, ) -> Any: if axis is None or axis == 0 or axis == -1 * array.ndim: return non_trivial_reduction( @@ -77,6 +86,8 @@ def any( is_positional=False, keepdims=keepdims, mask_identity=mask_identity, + behavior=behavior, + attrs=attrs, ) else: return map_partitions( @@ -86,6 +97,8 @@ def any( axis=axis, keepdims=keepdims, mask_identity=mask_identity, + behavior=behavior, + attrs=attrs, ) @@ -95,6 +108,8 @@ def argmax( axis: int | None = None, keepdims: bool = False, mask_identity: bool = True, + behavior: Mapping | None = None, + attrs: Mapping[str, Any] | None = None, ) -> Any: if axis is None or axis == 0 or axis == -1 * array.ndim: return non_trivial_reduction( @@ -105,6 +120,8 @@ def argmax( is_positional=True, keepdims=keepdims, mask_identity=mask_identity, + behavior=behavior, + attrs=attrs, ) else: return map_partitions( @@ -114,6 +131,8 @@ def argmax( axis=axis, keepdims=keepdims, mask_identity=mask_identity, + behavior=behavior, + attrs=attrs, ) @@ -123,6 +142,8 @@ def argmin( axis: int | None = None, keepdims: bool = False, mask_identity: bool = True, + behavior: Mapping | None = None, + attrs: Mapping[str, Any] | None = None, ) -> Any: if axis is None or axis == 0 or axis == -1 * array.ndim: return non_trivial_reduction( @@ -133,6 +154,8 @@ def argmin( is_positional=True, keepdims=keepdims, mask_identity=mask_identity, + behavior=behavior, + attrs=attrs, ) else: return map_partitions( @@ -142,18 +165,22 @@ def argmin( axis=axis, keepdims=keepdims, mask_identity=mask_identity, + behavior=behavior, + attrs=attrs, ) @borrow_docstring(ak.corr) def corr( - x, - y, - weight=None, - axis=None, - keepdims=False, - mask_identity=False, -): + x: Array, + y: Array, + weight: Array | int | float | complex | None = None, + axis: int | None = None, + keepdims: bool = False, + mask_identity: bool = False, + behavior: Mapping | None = None, + attrs: Mapping[str, Any] | None = None, +) -> Any: raise DaskAwkwardNotImplemented("TODO") @@ -163,6 +190,8 @@ def count( axis: int | None = None, keepdims: bool = False, mask_identity: bool = False, + behavior: Mapping | None = None, + attrs: Mapping[str, Any] | None = None, ) -> Any: if axis is None or axis == 0 or axis == -1 * array.ndim: return non_trivial_reduction( @@ -174,6 +203,8 @@ def count( is_positional=False, keepdims=keepdims, mask_identity=mask_identity, + behavior=behavior, + attrs=attrs, ) else: return map_partitions( @@ -183,6 +214,8 @@ def count( axis=axis, keepdims=keepdims, mask_identity=mask_identity, + behavior=behavior, + attrs=attrs, ) @@ -192,6 +225,8 @@ def count_nonzero( axis: int | None = None, keepdims: bool = False, mask_identity: bool = False, + behavior: Mapping | None = None, + attrs: Mapping[str, Any] | None = None, ) -> Any: if axis is None or axis == 0 or axis == -1 * array.ndim: return non_trivial_reduction( @@ -203,6 +238,8 @@ def count_nonzero( is_positional=False, keepdims=keepdims, mask_identity=mask_identity, + behavior=behavior, + attrs=attrs, ) else: return map_partitions( @@ -212,33 +249,47 @@ def count_nonzero( axis=axis, keepdims=keepdims, mask_identity=mask_identity, + behavior=behavior, + attrs=attrs, ) @borrow_docstring(ak.covar) def covar( - x, - y, - weight=None, - axis=None, - keepdims=False, - mask_identity=False, -): + x: Array, + y: Array, + weight: Array | int | float | complex | None = None, + axis: int | None = None, + keepdims: bool = False, + mask_identity: bool = False, + behavior: Mapping | None = None, + attrs: Mapping[str, Any] | None = None, +) -> Any: raise DaskAwkwardNotImplemented("TODO") @borrow_docstring(ak.linear_fit) def linear_fit( - x, - y, - weight=None, - axis=None, - keepdims=False, - mask_identity=False, -): + x: Array, + y: Array, + weight: Array | int | float | complex | None = None, + axis: int | None = None, + keepdims: bool = False, + mask_identity: bool = False, + behavior: Mapping | None = None, + attrs: Mapping[str, Any] | None = None, +) -> Any: raise DaskAwkwardNotImplemented("TODO") +class _MaxFn: + def __init__(self, **kwargs): + self.kwargs = kwargs + + def __call__(self, array, **kwargs): + return ak.max(array, **self.kwargs, **kwargs) + + @borrow_docstring(ak.max) def max( array: Array, @@ -246,36 +297,49 @@ def max( keepdims: bool = False, initial: float | None = None, mask_identity: bool = True, + behavior: Mapping | None = None, + attrs: Mapping[str, Any] | None = None, ) -> Any: if axis is None or axis == 0 or axis == -1 * array.ndim: return non_trivial_reduction( axis=axis, label="max", array=array, - reducer=ak.max, + reducer=_MaxFn(initial=initial), is_positional=False, keepdims=keepdims, mask_identity=mask_identity, + behavior=behavior, + attrs=attrs, ) else: return map_partitions( - ak.max, + _MaxFn(initial=initial), array, output_divisions=1, axis=axis, keepdims=keepdims, mask_identity=mask_identity, + behavior=behavior, + attrs=attrs, ) @borrow_docstring(ak.mean) def mean( - array, - weight=None, - axis=None, - keepdims=False, - mask_identity=False, -): + array: Array, + weight: Array | int | float | complex | None = None, + axis: int | None = None, + keepdims: bool = False, + mask_identity: bool = False, + behavior: Mapping | None = None, + attrs: Mapping[str, Any] | None = None, +) -> Array: + if weight is not None: + raise DaskAwkwardNotImplemented( + f"weight={weight} is not supported for this array yet." + ) + if axis == 0 or axis == -1 * array.ndim: raise DaskAwkwardNotImplemented( f"axis={axis} is not supported for this array yet." @@ -288,10 +352,20 @@ def mean( axis=axis, keepdims=keepdims, mask_identity=mask_identity, + behavior=behavior, + attrs=attrs, ) raise DaskAwkwardNotImplemented("TODO") +class _MinFn: + def __init__(self, **kwargs): + self.kwargs = kwargs + + def __call__(self, array, **kwargs): + return ak.min(array, **self.kwargs, **kwargs) + + @borrow_docstring(ak.min) def min( array: Array, @@ -299,42 +373,57 @@ def min( keepdims: bool = False, initial: float | None = None, mask_identity: bool = True, + behavior: Mapping | None = None, + attrs: Mapping[str, Any] | None = None, ) -> Any: if axis is None or axis == 0 or axis == -1 * array.ndim: return non_trivial_reduction( axis=axis, label="min", array=array, - reducer=ak.min, + reducer=_MinFn(initial=initial), is_positional=False, keepdims=keepdims, mask_identity=mask_identity, + behavior=behavior, + attrs=attrs, ) else: return map_partitions( - ak.min, + _MinFn(initial=initial), array, output_divisions=1, axis=axis, keepdims=keepdims, mask_identity=mask_identity, + behavior=behavior, + attrs=attrs, ) @borrow_docstring(ak.moment) def moment( - x, - n, - weight=None, - axis=None, - keepdims=False, - mask_identity=False, -): + x: Array, + n: int, + weight: Array | int | float | complex | None = None, + axis: int | None = None, + keepdims: bool = False, + mask_identity: bool = False, + behavior: Mapping | None = None, + attrs: Mapping[str, Any] | None = None, +) -> Any: raise DaskAwkwardNotImplemented("TODO") @borrow_docstring(ak.prod) -def prod(array, axis=None, keepdims=False, mask_identity=False): +def prod( + array: Array, + axis: int | None = None, + keepdims: bool = False, + mask_identity: bool = False, + behavior: Mapping | None = None, + attrs: Mapping[str, Any] | None = None, +) -> Any: if axis is None or axis == 0 or axis == -1 * array.ndim: return non_trivial_reduction( axis=axis, @@ -344,6 +433,8 @@ def prod(array, axis=None, keepdims=False, mask_identity=False): is_positional=False, keepdims=keepdims, mask_identity=mask_identity, + behavior=behavior, + attrs=attrs, ) else: return map_partitions( @@ -353,16 +444,32 @@ def prod(array, axis=None, keepdims=False, mask_identity=False): axis=axis, keepdims=keepdims, mask_identity=mask_identity, + behavior=behavior, + attrs=attrs, ) @borrow_docstring(ak.ptp) -def ptp(arr, axis=None, keepdims=False, mask_identity=True): +def ptp( + arr: Array, + axis: int | None = None, + keepdims: bool = False, + mask_identity: bool = True, + behavior: Mapping | None = None, + attrs: Mapping[str, Any] | None = None, +) -> Any: raise DaskAwkwardNotImplemented("TODO") @borrow_docstring(ak.softmax) -def softmax(x, axis=None, keepdims=False, mask_identity=False): +def softmax( + x: Array, + axis: int | None = None, + keepdims: bool = False, + mask_identity: bool = False, + behavior: Mapping | None = None, + attrs: Mapping[str, Any] | None = None, +) -> Any: raise DaskAwkwardNotImplemented("TODO") @@ -376,13 +483,15 @@ def __call__(self, array): @borrow_docstring(ak.std) def std( - x, - weight=None, - ddof=0, - axis=None, - keepdims=False, - mask_identity=False, -): + x: Array, + weight: Array | int | float | complex | None = None, + ddof: int = 0, + axis: int | None = None, + keepdims: bool = False, + mask_identity: bool = False, + behavior: Mapping | None = None, + attrs: Mapping[str, Any] | None = None, +) -> Any: if weight is not None: raise DaskAwkwardNotImplemented("weight argument is not supported.") if axis is None or axis == 0 or axis == -1 * x.ndim: @@ -396,6 +505,8 @@ def std( axis=axis, keepdims=keepdims, mask_identity=mask_identity, + behavior=behavior, + attrs=attrs, ), x, output_divisions=1, @@ -409,6 +520,8 @@ def sum( axis: int | None = None, keepdims: bool = False, mask_identity: bool = False, + behavior: Mapping | None = None, + attrs: Mapping[str, Any] | None = None, ) -> Any: if axis is None or axis == 0 or axis == -1 * array.ndim: return non_trivial_reduction( @@ -419,6 +532,8 @@ def sum( is_positional=False, keepdims=keepdims, mask_identity=mask_identity, + behavior=behavior, + attrs=attrs, ) else: return map_partitions( @@ -428,6 +543,8 @@ def sum( axis=axis, keepdims=keepdims, mask_identity=mask_identity, + behavior=behavior, + attrs=attrs, ) @@ -441,13 +558,15 @@ def __call__(self, array): @borrow_docstring(ak.var) def var( - x, - weight=None, - ddof=0, - axis=None, - keepdims=False, - mask_identity=False, -): + x: Array, + weight: Array | int | float | complex | None = None, + ddof: int = 0, + axis: int | None = None, + keepdims: bool = False, + mask_identity: bool = False, + behavior: Mapping | None = None, + attrs: Mapping[str, Any] | None = None, +) -> Any: if weight is not None: raise DaskAwkwardNotImplemented("weight argument is not supported.") if axis is None or axis == 0 or axis == -1 * x.ndim: @@ -461,6 +580,8 @@ def var( axis=axis, keepdims=keepdims, mask_identity=mask_identity, + behavior=behavior, + attrs=attrs, ), x, output_divisions=1, diff --git a/src/dask_awkward/lib/str.py b/src/dask_awkward/lib/str.py index 85324ae9..719557a7 100644 --- a/src/dask_awkward/lib/str.py +++ b/src/dask_awkward/lib/str.py @@ -1,7 +1,7 @@ from __future__ import annotations import functools -from collections.abc import Callable +from collections.abc import Callable, Mapping from typing import Any, TypeVar import awkward.operations.str as akstr @@ -28,7 +28,8 @@ def capitalize( array: Array, *, highlevel: bool = True, - behavior: dict | None = None, + behavior: Mapping | None = None, + attrs: Mapping[str, Any] | None = None, ) -> Array: return map_partitions( akstr.capitalize, @@ -45,7 +46,8 @@ def center( padding: str | bytes = " ", *, highlevel: bool = True, - behavior: dict | None = None, + behavior: Mapping | None = None, + attrs: Mapping[str, Any] | None = None, ) -> Array: return map_partitions( akstr.center, @@ -64,7 +66,8 @@ def count_substring( *, ignore_case: bool = False, highlevel: bool = True, - behavior: dict | None = None, + behavior: Mapping | None = None, + attrs: Mapping[str, Any] | None = None, ) -> Array: return map_partitions( akstr.count_substring, @@ -83,7 +86,8 @@ def count_substring_regex( *, ignore_case: bool = False, highlevel: bool = True, - behavior: dict | None = None, + behavior: Mapping | None = None, + attrs: Mapping[str, Any] | None = None, ) -> Array: return map_partitions( akstr.count_substring_regex, @@ -102,7 +106,8 @@ def ends_with( *, ignore_case: bool = False, highlevel: bool = True, - behavior: dict | None = None, + behavior: Mapping | None = None, + attrs: Mapping[str, Any] | None = None, ) -> Array: return map_partitions( akstr.ends_with, @@ -120,7 +125,8 @@ def extract_regex( pattern: bytes | str, *, highlevel: bool = True, - behavior: dict | None = None, + behavior: Mapping | None = None, + attrs: Mapping[str, Any] | None = None, ) -> Array: return map_partitions( akstr.extract_regex, @@ -138,7 +144,8 @@ def find_substring( *, ignore_case: bool = False, highlevel: bool = True, - behavior: dict | None = None, + behavior: Mapping | None = None, + attrs: Mapping[str, Any] | None = None, ) -> Array: return map_partitions( akstr.find_substring, @@ -157,7 +164,8 @@ def find_substring_regex( *, ignore_case: bool = False, highlevel: bool = True, - behavior: dict | None = None, + behavior: Mapping | None = None, + attrs: Mapping[str, Any] | None = None, ) -> Array: return map_partitions( akstr.find_substring_regex, @@ -176,7 +184,8 @@ def index_in( *, skip_nones: bool = False, highlevel: bool = True, - behavior: dict | None = None, + behavior: Mapping | None = None, + attrs: Mapping[str, Any] | None = None, ) -> Array: return map_partitions( akstr.index_in, @@ -193,7 +202,8 @@ def is_alnum( array: Array, *, highlevel: bool = True, - behavior: dict | None = None, + behavior: Mapping | None = None, + attrs: Mapping[str, Any] | None = None, ) -> Array: return map_partitions( akstr.is_alnum, @@ -208,7 +218,8 @@ def is_alpha( array: Array, *, highlevel: bool = True, - behavior: dict | None = None, + behavior: Mapping | None = None, + attrs: Mapping[str, Any] | None = None, ) -> Array: return map_partitions( akstr.is_alpha, @@ -223,7 +234,8 @@ def is_ascii( array: Array, *, highlevel: bool = True, - behavior: dict | None = None, + behavior: Mapping | None = None, + attrs: Mapping[str, Any] | None = None, ) -> Array: return map_partitions( akstr.is_ascii, @@ -238,7 +250,8 @@ def is_decimal( array: Array, *, highlevel: bool = True, - behavior: dict | None = None, + behavior: Mapping | None = None, + attrs: Mapping[str, Any] | None = None, ) -> Array: return map_partitions( akstr.is_decimal, @@ -253,7 +266,8 @@ def is_digit( array: Array, *, highlevel: bool = True, - behavior: dict | None = None, + behavior: Mapping | None = None, + attrs: Mapping[str, Any] | None = None, ) -> Array: return map_partitions( akstr.is_digit, @@ -270,7 +284,8 @@ def is_in( *, skip_nones: bool = False, highlevel: bool = True, - behavior: dict | None = None, + behavior: Mapping | None = None, + attrs: Mapping[str, Any] | None = None, ) -> Array: return map_partitions( akstr.is_in, @@ -287,7 +302,8 @@ def is_lower( array: Array, *, highlevel: bool = True, - behavior: dict | None = None, + behavior: Mapping | None = None, + attrs: Mapping[str, Any] | None = None, ) -> Array: return map_partitions( akstr.is_lower, @@ -302,7 +318,8 @@ def is_numeric( array: Array, *, highlevel: bool = True, - behavior: dict | None = None, + behavior: Mapping | None = None, + attrs: Mapping[str, Any] | None = None, ) -> Array: return map_partitions( akstr.is_numeric, @@ -317,7 +334,8 @@ def is_printable( array: Array, *, highlevel: bool = True, - behavior: dict | None = None, + behavior: Mapping | None = None, + attrs: Mapping[str, Any] | None = None, ) -> Array: return map_partitions( akstr.is_printable, @@ -332,7 +350,8 @@ def is_space( array: Array, *, highlevel: bool = True, - behavior: dict | None = None, + behavior: Mapping | None = None, + attrs: Mapping[str, Any] | None = None, ) -> Array: return map_partitions( akstr.is_space, @@ -347,7 +366,8 @@ def is_title( array: Array, *, highlevel: bool = True, - behavior: dict | None = None, + behavior: Mapping | None = None, + attrs: Mapping[str, Any] | None = None, ) -> Array: return map_partitions( akstr.is_title, @@ -362,7 +382,8 @@ def is_upper( array: Array, *, highlevel: bool = True, - behavior: dict | None = None, + behavior: Mapping | None = None, + attrs: Mapping[str, Any] | None = None, ) -> Array: return map_partitions( akstr.is_upper, @@ -378,7 +399,8 @@ def join( separator: Any, *, highlevel: bool = True, - behavior: dict | None = None, + behavior: Mapping | None = None, + attrs: Mapping[str, Any] | None = None, ) -> Array: return map_partitions( akstr.join, @@ -393,7 +415,8 @@ def join( def join_element_wise( *arrays: Array, highlevel: bool = True, - behavior: dict | None = None, + behavior: Mapping | None = None, + attrs: Mapping[str, Any] | None = None, ) -> Array: return map_partitions( akstr.join_element_wise, @@ -408,7 +431,8 @@ def length( array: Array, *, highlevel: bool = True, - behavior: dict | None = None, + behavior: Mapping | None = None, + attrs: Mapping[str, Any] | None = None, ) -> Array: return map_partitions( akstr.length, @@ -423,7 +447,8 @@ def lower( array: Array, *, highlevel: bool = True, - behavior: dict | None = None, + behavior: Mapping | None = None, + attrs: Mapping[str, Any] | None = None, ) -> Array: return map_partitions( akstr.lower, @@ -440,7 +465,8 @@ def lpad( padding: str | bytes = " ", *, highlevel: bool = True, - behavior: dict | None = None, + behavior: Mapping | None = None, + attrs: Mapping[str, Any] | None = None, ) -> Array: return map_partitions( akstr.lpad, @@ -458,7 +484,8 @@ def ltrim( characters: str | bytes, *, highlevel: bool = True, - behavior: dict | None = None, + behavior: Mapping | None = None, + attrs: Mapping[str, Any] | None = None, ) -> Array: return map_partitions( akstr.ltrim, @@ -474,7 +501,8 @@ def ltrim_whitespace( array: Array, *, highlevel: bool = True, - behavior: dict | None = None, + behavior: Mapping | None = None, + attrs: Mapping[str, Any] | None = None, ) -> Array: return map_partitions( akstr.ltrim_whitespace, @@ -491,7 +519,8 @@ def match_like( *, ignore_case: bool = False, highlevel: bool = True, - behavior: dict | None = None, + behavior: Mapping | None = None, + attrs: Mapping[str, Any] | None = None, ) -> Array: return map_partitions( akstr.match_like, @@ -510,7 +539,8 @@ def match_substring( *, ignore_case: bool = False, highlevel: bool = True, - behavior: dict | None = None, + behavior: Mapping | None = None, + attrs: Mapping[str, Any] | None = None, ) -> Array: return map_partitions( akstr.match_substring, @@ -529,7 +559,8 @@ def match_substring_regex( *, ignore_case: bool = False, highlevel: bool = True, - behavior: dict | None = None, + behavior: Mapping | None = None, + attrs: Mapping[str, Any] | None = None, ) -> Array: return map_partitions( akstr.match_substring_regex, @@ -547,7 +578,8 @@ def repeat( num_repeats: Any, *, highlevel: bool = True, - behavior: dict | None = None, + behavior: Mapping | None = None, + attrs: Mapping[str, Any] | None = None, ) -> Array: return map_partitions( akstr.repeat, @@ -566,7 +598,8 @@ def replace_slice( replacement: str | bytes, *, highlevel: bool = True, - behavior: dict | None = None, + behavior: Mapping | None = None, + attrs: Mapping[str, Any] | None = None, ) -> Array: return map_partitions( akstr.replace_slice, @@ -587,7 +620,8 @@ def replace_substring( *, max_replacements: int | None = None, highlevel: bool = True, - behavior: dict | None = None, + behavior: Mapping | None = None, + attrs: Mapping[str, Any] | None = None, ) -> Array: return map_partitions( akstr.replace_substring, @@ -608,7 +642,8 @@ def replace_substring_regex( *, max_replacements: int | None = None, highlevel: bool = True, - behavior: dict | None = None, + behavior: Mapping | None = None, + attrs: Mapping[str, Any] | None = None, ) -> Array: return map_partitions( akstr.replace_substring_regex, @@ -626,7 +661,8 @@ def reverse( array: Array, *, highlevel: bool = True, - behavior: dict | None = None, + behavior: Mapping | None = None, + attrs: Mapping[str, Any] | None = None, ) -> Array: return map_partitions( akstr.reverse, @@ -643,7 +679,8 @@ def rpad( padding: str | bytes = " ", *, highlevel: bool = True, - behavior: dict | None = None, + behavior: Mapping | None = None, + attrs: Mapping[str, Any] | None = None, ) -> Array: return map_partitions( akstr.rpad, @@ -661,7 +698,8 @@ def rtrim( characters: str | bytes, *, highlevel: bool = True, - behavior: dict | None = None, + behavior: Mapping | None = None, + attrs: Mapping[str, Any] | None = None, ) -> Array: return map_partitions( akstr.rtrim, @@ -677,7 +715,8 @@ def rtrim_whitespace( array: Array, *, highlevel: bool = True, - behavior: dict | None = None, + behavior: Mapping | None = None, + attrs: Mapping[str, Any] | None = None, ) -> Array: return map_partitions( akstr.rtrim_whitespace, @@ -695,7 +734,8 @@ def slice( step: int = 1, *, highlevel: bool = True, - behavior: dict | None = None, + behavior: Mapping | None = None, + attrs: Mapping[str, Any] | None = None, ) -> Array: return map_partitions( akstr.slice, @@ -716,7 +756,8 @@ def split_pattern( max_splits: int | None = None, reverse: bool = False, highlevel: bool = True, - behavior: dict | None = None, + behavior: Mapping | None = None, + attrs: Mapping[str, Any] | None = None, ) -> Array: return map_partitions( akstr.split_pattern, @@ -737,7 +778,8 @@ def split_pattern_regex( max_splits: int | None = None, reverse: bool = False, highlevel: bool = True, - behavior: dict | None = None, + behavior: Mapping | None = None, + attrs: Mapping[str, Any] | None = None, ) -> Array: return map_partitions( akstr.split_pattern_regex, @@ -757,7 +799,8 @@ def split_whitespace( max_splits: int | None = None, reverse: bool = False, highlevel: bool = True, - behavior: dict | None = None, + behavior: Mapping | None = None, + attrs: Mapping[str, Any] | None = None, ) -> Array: return map_partitions( akstr.split_whitespace, @@ -775,7 +818,8 @@ def starts_with( *, ignore_case: bool = False, highlevel: bool = True, - behavior: dict | None = None, + behavior: Mapping | None = None, + attrs: Mapping[str, Any] | None = None, ) -> Array: return map_partitions( akstr.starts_with, @@ -792,7 +836,8 @@ def swapcase( array: Array, *, highlevel: bool = True, - behavior: dict | None = None, + behavior: Mapping | None = None, + attrs: Mapping[str, Any] | None = None, ) -> Array: return map_partitions( akstr.swapcase, @@ -807,7 +852,8 @@ def title( array: Array, *, highlevel: bool = True, - behavior: dict | None = None, + behavior: Mapping | None = None, + attrs: Mapping[str, Any] | None = None, ) -> Array: return map_partitions( akstr.title, @@ -822,7 +868,8 @@ def to_categorical( array: Array, *, highlevel: bool = True, - behavior: dict | None = None, + behavior: Mapping | None = None, + attrs: Mapping[str, Any] | None = None, ) -> Array: return map_partitions( akstr.to_categorical, @@ -838,7 +885,8 @@ def trim( characters: str | bytes, *, highlevel: bool = True, - behavior: dict | None = None, + behavior: Mapping | None = None, + attrs: Mapping[str, Any] | None = None, ) -> Array: return map_partitions( akstr.trim, @@ -854,7 +902,8 @@ def trim_whitespace( array: Array, *, highlevel: bool = True, - behavior: dict | None = None, + behavior: Mapping | None = None, + attrs: Mapping[str, Any] | None = None, ) -> Array: return map_partitions( akstr.trim_whitespace, @@ -869,7 +918,8 @@ def upper( array: Array, *, highlevel: bool = True, - behavior: dict | None = None, + behavior: Mapping | None = None, + attrs: Mapping[str, Any] | None = None, ) -> Array: return map_partitions( akstr.upper, diff --git a/src/dask_awkward/lib/structure.py b/src/dask_awkward/lib/structure.py index 76a739e5..35a3b1ca 100644 --- a/src/dask_awkward/lib/structure.py +++ b/src/dask_awkward/lib/structure.py @@ -9,8 +9,7 @@ import awkward as ak import numpy as np -from awkward._nplikes.typetracer import TypeTracerArray -from awkward.typetracer import create_unknown_scalar +from awkward.typetracer import create_unknown_scalar, is_unknown_scalar from dask.base import is_dask_collection, tokenize from dask.highlevelgraph import HighLevelGraph @@ -97,6 +96,7 @@ def argcartesian( with_name: str | None = None, highlevel: bool = True, behavior: Mapping | None = None, + attrs: Mapping[str, Any] | None = None, ) -> Array: if not highlevel: raise ValueError("Only highlevel=True is supported") @@ -110,6 +110,7 @@ def argcartesian( with_name=with_name, highlevel=highlevel, behavior=behavior, + attrs=attrs, ) return map_partitions(fn, *arrays, label="argcartesian", output_divisions=1) raise DaskAwkwardNotImplemented("TODO") @@ -136,6 +137,7 @@ def argcombinations( with_name: str | None = None, highlevel: bool = True, behavior: Mapping | None = None, + attrs: Mapping[str, Any] | None = None, ) -> Array: if not highlevel: raise ValueError("Only highlevel=True is supported") @@ -157,6 +159,7 @@ def argcombinations( with_name=with_name, highlevel=highlevel, behavior=behavior, + attrs=attrs, ) return map_partitions( fn, @@ -183,16 +186,14 @@ def argsort( stable: bool = True, highlevel: bool = True, behavior: Mapping | None = None, + attrs: Mapping[str, Any] | None = None, ) -> Array: if not highlevel: raise ValueError("Only highlevel=True is supported") if axis == 0: raise DaskAwkwardNotImplemented("TODO") fn = _ArgsortFn( - axis=axis, - ascending=ascending, - stable=stable, - behavior=behavior, + axis=axis, ascending=ascending, stable=stable, behavior=behavior, attrs=attrs ) return map_partitions(fn, array, label="argsort", output_divisions=1) @@ -252,6 +253,7 @@ def cartesian( with_name: str | None = None, highlevel: bool = True, behavior: Mapping | None = None, + attrs: Mapping[str, Any] | None = None, ) -> Array: if not highlevel: raise ValueError("Only highlevel=True is supported") @@ -263,6 +265,7 @@ def cartesian( with_name=with_name, highlevel=highlevel, behavior=behavior, + attrs=attrs, ) return map_partitions(fn, *arrays, label="cartesian", output_divisions=1) raise DaskAwkwardNotImplemented("TODO") @@ -289,6 +292,7 @@ def combinations( with_name: str | None = None, highlevel: bool = True, behavior: Mapping | None = None, + attrs: Mapping[str, Any] | None = None, ) -> Array: if not highlevel: raise ValueError("Only highlevel=True is supported") @@ -306,6 +310,7 @@ def combinations( with_name=with_name, highlevel=highlevel, behavior=behavior, + attrs=attrs, ) return map_partitions( fn, @@ -347,11 +352,14 @@ def fill_none( axis: int | None = -1, highlevel: bool = True, behavior: Mapping | None = None, + attrs: Mapping[str, Any] | None = None, ) -> Array: if not highlevel: raise ValueError("Only highlevel=True is supported") - fn = _FillNoneFn(value, axis=axis, highlevel=highlevel, behavior=behavior) + fn = _FillNoneFn( + value, axis=axis, highlevel=highlevel, behavior=behavior, attrs=attrs + ) return map_partitions(fn, array, label="fill-none", output_divisions=1) @@ -369,11 +377,12 @@ def drop_none( axis: int | None = None, highlevel: bool = True, behavior: Mapping | None = None, + attrs: Mapping[str, Any] | None = None, ) -> Array: if not highlevel: raise ValueError("Only highlevel=True is supported") - fn = _DropNoneFn(axis=axis, highlevel=highlevel, behavior=behavior) + fn = _DropNoneFn(axis=axis, highlevel=highlevel, behavior=behavior, attrs=attrs) return map_partitions(fn, array, label="drop-none", output_divisions=1) @@ -391,14 +400,11 @@ def firsts( axis: int = 1, highlevel: bool = True, behavior: Mapping | None = None, -) -> Array: + attrs: Mapping[str, Any] | None = None, +) -> Any: if axis == 1: return map_partitions( - _FirstsFn( - axis=axis, - highlevel=highlevel, - behavior=behavior, - ), + _FirstsFn(axis=axis, highlevel=highlevel, behavior=behavior, attrs=attrs), array, label="firsts", output_divisions=1, @@ -422,15 +428,12 @@ def flatten( axis: int | None = 1, highlevel: bool = True, behavior: Mapping | None = None, + attrs: Mapping[str, Any] | None = None, ) -> Array: if not highlevel: raise ValueError("Only highlevel=True is supported") return map_partitions( - _FlattenFn( - axis=axis, - highlevel=highlevel, - behavior=behavior, - ), + _FlattenFn(axis=axis, highlevel=highlevel, behavior=behavior, attrs=attrs), array, label="flatten", output_divisions=None, @@ -439,7 +442,11 @@ def flatten( @borrow_docstring(ak.from_regular) def from_regular( - array: Array, axis: int = 1, highlevel: bool = True, behavior: Mapping | None = None + array: Array, + axis: int = 1, + highlevel: bool = True, + behavior: Mapping | None = None, + attrs: Mapping[str, Any] | None = None, ) -> Array: if not highlevel: raise ValueError("Only highlevel=True is supported") @@ -454,6 +461,7 @@ def from_regular( highlevel=highlevel, behavior=behavior, label="from-regular", + attrs=attrs, ) @@ -463,7 +471,8 @@ def full_like( fill_value: Any, highlevel: bool = True, behavior: Mapping | None = None, - dtype: np.dtype | str | None = None, + dtype: DTypeLike | str | None = None, + attrs: Mapping[str, Any] | None = None, ) -> Array: if not highlevel: raise ValueError("Only highlevel=True is supported") @@ -481,6 +490,7 @@ def full_like( fill_value, highlevel=highlevel, behavior=behavior, + attrs=attrs, dtype=dtype, output_divisions=1, ) @@ -495,6 +505,7 @@ def isclose( equal_nan: bool = False, highlevel: bool = True, behavior: Mapping | None = None, + attrs: Mapping[str, Any] | None = None, ) -> Array: if not highlevel: raise ValueError("Only highlevel=True is supported") @@ -513,6 +524,7 @@ def isclose( behavior=behavior, label="is-close", output_divisions=1, + attrs=attrs, ) @@ -526,9 +538,13 @@ def __call__(self, array): @borrow_docstring(ak.is_none) def is_none( - array: Array, axis: int = 0, highlevel: bool = True, behavior: Mapping | None = None + array: Array, + axis: int = 0, + highlevel: bool = True, + behavior: Mapping | None = None, + attrs: Mapping[str, Any] | None = None, ) -> Array: - fn = _IsNoneFn(axis=axis, highlevel=highlevel, behavior=behavior) + fn = _IsNoneFn(axis=axis, highlevel=highlevel, behavior=behavior, attrs=attrs) return map_partitions(fn, array, label="is-none", output_divisions=1) @@ -538,6 +554,7 @@ def local_index( axis: int = -1, highlevel: bool = True, behavior: Mapping | None = None, + attrs: Mapping[str, Any] | None = None, ) -> Array: if not highlevel: raise ValueError("Only highlevel=True is supported") @@ -549,6 +566,7 @@ def local_index( axis=axis, highlevel=highlevel, behavior=behavior, + attrs=attrs, ) @@ -559,16 +577,14 @@ def mask( valid_when: bool = True, highlevel: bool = True, behavior: Mapping | None = None, + attrs: Mapping[str, Any] | None = None, ) -> Array: if partition_compatibility(array, mask) == PartitionCompatibility.NO: raise IncompatiblePartitions("mask", array, mask) + if not highlevel: + raise ValueError("Only highlevel=True is supported") return map_partitions( - ak.mask, - array, - mask, - valid_when=valid_when, - highlevel=highlevel, - behavior=behavior, + ak.mask, array, mask, valid_when=valid_when, behavior=behavior, attrs=attrs ) @@ -581,6 +597,7 @@ def nan_to_num( neginf: Any | None = None, highlevel: bool = True, behavior: Any | None = None, + attrs: Mapping[str, Any] | None = None, ) -> Array: # return map_partitions( # ak.nan_to_num, @@ -598,7 +615,7 @@ def nan_to_num( def _numaxis0(*integers): f = first(integers) - if isinstance(f, TypeTracerArray): + if is_unknown_scalar(f): return f return np.sum(np.array(integers)) @@ -609,6 +626,7 @@ def num( axis: int = 1, highlevel: bool = True, behavior: Mapping | None = None, + attrs: Mapping[str, Any] | None = None, ) -> Any: if not highlevel: raise ValueError("Only highlevel=True is supported") @@ -638,9 +656,9 @@ def num( ak.num, array, axis=axis, - highlevel=highlevel, behavior=behavior, output_divisions=1, + attrs=attrs, ) @@ -649,6 +667,7 @@ def ones_like( array: Array, highlevel: bool = True, behavior: Mapping | None = None, + attrs: Mapping[str, Any] | None = None, dtype: DTypeLike | None = None, ) -> Array: if not highlevel: @@ -660,22 +679,21 @@ def ones_like( behavior=behavior, dtype=dtype, output_divisions=1, + attrs=attrs, ) @borrow_docstring(ak.to_packed) def to_packed( - array: Array, highlevel: bool = True, behavior: Mapping | None = None + array: Array, + highlevel: bool = True, + behavior: Mapping | None = None, + attrs: Mapping[str, Any] | None = None, ) -> Array: if not highlevel: raise ValueError("Only highlevel=True is supported") - return map_partitions( - ak.to_packed, - array, - highlevel=highlevel, - behavior=behavior, - ) + return map_partitions(ak.to_packed, array, behavior=behavior, attrs=attrs) class _PadNoneFn: @@ -701,6 +719,7 @@ def pad_none( clip: bool = False, highlevel: bool = True, behavior: Mapping | None = None, + attrs: Mapping[str, Any] | None = None, ) -> Array: if not highlevel: raise ValueError("Only highlevel=True is supported") @@ -708,12 +727,7 @@ def pad_none( if axis == 0: DaskAwkwardNotImplemented("axis=0 for pad_none is not supported") return map_partitions( - _PadNoneFn( - target=target, - axis=axis, - clip=clip, - behavior=behavior, - ), + _PadNoneFn(target=target, axis=axis, clip=clip, behavior=behavior, attrs=attrs), array, label="pad-none", output_divisions=1, @@ -722,7 +736,10 @@ def pad_none( @borrow_docstring(ak.ravel) def ravel( - array: Array, highlevel: bool = True, behavior: Mapping | None = None + array: Array, + highlevel: bool = True, + behavior: Mapping | None = None, + attrs: Mapping[str, Any] | None = None, ) -> Array: if not highlevel: raise ValueError("Only highlevel=True is supported") @@ -733,15 +750,18 @@ def ravel( return map_partitions( ak.ravel, array, - highlevel=highlevel, behavior=behavior, + attrs=attrs, label="ravel", ) @borrow_docstring(ak.run_lengths) def run_lengths( - array: Array, highlevel: bool = True, behavior: Mapping | None = None + array: Array, + highlevel: bool = True, + behavior: Mapping | None = None, + attrs: Mapping[str, Any] | None = None, ) -> Array: if not highlevel: raise ValueError("Only highlevel=True is supported") @@ -756,8 +776,8 @@ def run_lengths( return map_partitions( ak.run_lengths, array, - highlevel=highlevel, behavior=behavior, + attrs=attrs, label="run-lengths", ) @@ -773,13 +793,17 @@ def __call__(self, array): @borrow_docstring(ak.singletons) def singletons( - array: Array, axis: int = 0, highlevel: bool = True, behavior: Mapping | None = None + array: Array, + axis: int = 0, + highlevel: bool = True, + behavior: Mapping | None = None, + attrs: Mapping[str, Any] | None = None, ) -> Array: if not highlevel: raise ValueError("Only highlevel=True is supported") return map_partitions( - _SingletonsFn(axis, highlevel=highlevel, behavior=behavior), + _SingletonsFn(axis, behavior=behavior, attrs=attrs), array, label="singletons", ) @@ -801,6 +825,7 @@ def sort( stable: bool = True, highlevel: bool = True, behavior: Mapping | None = None, + attrs: Mapping[str, Any] | None = None, ) -> Array: if not highlevel: raise ValueError("Only highlevel=True is supported") @@ -811,6 +836,7 @@ def sort( ascending=ascending, stable=stable, behavior=behavior, + attrs=attrs, ) return map_partitions(fn, array, label="sort", output_divisions=1) @@ -821,13 +847,18 @@ def strings_astype( to: np.dtype | str, highlevel: bool = True, behavior: Mapping | None = None, + attrs: Mapping[str, Any] | None = None, ) -> Array: raise DaskAwkwardNotImplemented("TODO") @borrow_docstring(ak.to_regular) def to_regular( - array: Array, axis: int = 1, highlevel: bool = True, behavior: Mapping | None = None + array: Array, + axis: int = 1, + highlevel: bool = True, + behavior: Mapping | None = None, + attrs: Mapping[str, Any] | None = None, ) -> Array: if not highlevel: raise ValueError("Only highlevel=True is supported") @@ -842,9 +873,9 @@ def to_regular( ak.to_regular, array, axis=axis, - highlevel=highlevel, behavior=behavior, label="to-regular", + attrs=attrs, ) @@ -855,6 +886,7 @@ def unflatten( axis: int = 0, highlevel: bool = True, behavior: Mapping | None = None, + attrs: Mapping[str, Any] | None = None, ) -> Array: if not highlevel: raise ValueError("Only highlevel=True is supported") @@ -871,31 +903,41 @@ def unflatten( array, counts, axis=axis, - highlevel=highlevel, behavior=behavior, label="unflatten", ) -def _array_with_behavior(array: Array, behavior: Mapping | None) -> Array: +def _array_with_rebuilt_meta( + array: Array, behavior: Mapping | None, attrs: Mapping[str, Any] | None +) -> Array: + if attrs is None: + attrs = array._meta.attrs + if behavior is None: - new_meta = array._meta - else: - new_meta = ak.Array(array._meta, behavior=behavior) + behavior = array._meta.behavior + + new_meta = ak.Array(array._meta, behavior=behavior, attrs=attrs) + return Array(array.dask, array.name, new_meta, array.divisions) @borrow_docstring(ak.unzip) def unzip( - array: Array, highlevel: bool = True, behavior: Mapping | None = None + array: Array, + highlevel: bool = True, + behavior: Mapping | None = None, + attrs: Mapping[str, Any] | None = None, ) -> tuple[Array, ...]: if not highlevel: raise ValueError("Only highlevel=True is supported") fields = ak.fields(array._meta) if len(fields) == 0: - return (_array_with_behavior(array, behavior),) + return (_array_with_rebuilt_meta(array, behavior, attrs),) else: - return tuple(_array_with_behavior(array[field], behavior) for field in fields) + return tuple( + _array_with_rebuilt_meta(array[field], behavior, attrs) for field in fields + ) @borrow_docstring(ak.values_astype) @@ -904,6 +946,7 @@ def values_astype( to: np.dtype | str, highlevel: bool = True, behavior: Mapping | None = None, + attrs: Mapping[str, Any] | None = None, ) -> Array: if not highlevel: raise ValueError("Only highlevel=True is supported") @@ -913,6 +956,7 @@ def values_astype( to=to, behavior=behavior, label="values-astype", + attrs=attrs, ) @@ -922,10 +966,12 @@ def __init__( mergebool: bool = True, highlevel: bool = True, behavior: Mapping | None = None, + attrs: Mapping[str, Any] | None = None, ) -> None: self.mergebool = mergebool self.highlevel = highlevel self.behavior = behavior + self.attrs = attrs def __call__(self, condition: ak.Array, x: ak.Array, y: ak.Array) -> ak.Array: return ak.where( @@ -935,6 +981,7 @@ def __call__(self, condition: ak.Array, x: ak.Array, y: ak.Array) -> ak.Array: mergebool=self.mergebool, highlevel=self.highlevel, behavior=self.behavior, + attrs=self.attrs, ) @@ -946,6 +993,7 @@ def where( mergebool: bool = True, highlevel: bool = True, behavior: Mapping | None = None, + attrs: Mapping[str, Any] | None = None, ) -> Array: if not highlevel: raise ValueError("Only highlevel=True is supported") @@ -962,7 +1010,7 @@ def where( raise IncompatiblePartitions("where", *dask_args) return map_partitions( - _WhereFn(mergebool=mergebool, highlevel=highlevel, behavior=behavior), + _WhereFn(mergebool=mergebool, behavior=behavior, attrs=attrs), condition, x, y, @@ -973,21 +1021,19 @@ def where( class _WithFieldFn: def __init__( self, - where: str | Sequence[str] | None = None, - highlevel: bool = True, - behavior: Mapping | None = None, + where: str | Sequence[str] | None, + highlevel: bool, + behavior: Mapping | None, + attrs: Mapping[str, Any] | None, ) -> None: self.where = where self.highlevel = highlevel self.behavior = behavior + self.attrs = attrs def __call__(self, base: ak.Array, what: ak.Array) -> ak.Array: return ak.with_field( - base, - what, - where=self.where, - highlevel=self.highlevel, - behavior=self.behavior, + base, what, where=self.where, behavior=self.behavior, attrs=self.attrs ) @@ -998,6 +1044,7 @@ def with_field( where: str | Sequence[str] | None = None, highlevel: bool = True, behavior: Mapping | None = None, + attrs: Mapping[str, Any] | None = None, ) -> Array: if not highlevel: raise ValueError("Only highlevel=True is supported") @@ -1016,7 +1063,7 @@ def with_field( if partition_compatibility(*dask_args) == PartitionCompatibility.NO: raise IncompatiblePartitions("with_field", *dask_args) return map_partitions( - _WithFieldFn(where=where, highlevel=highlevel, behavior=behavior), + _WithFieldFn(where=where, highlevel=highlevel, behavior=behavior, attrs=attrs), base, what, label="with-field", @@ -1025,12 +1072,18 @@ def with_field( class _WithNameFn: - def __init__(self, name: str | None, behavior: Mapping | None = None) -> None: + def __init__( + self, + name: str | None, + behavior: Mapping | None, + attrs: Mapping[str, Any] | None, + ) -> None: self.name = name self.behavior = behavior + self.attrs = attrs def __call__(self, array: ak.Array) -> ak.Array: - return ak.with_name(array, self.name, behavior=self.behavior) + return ak.with_name(array, self.name, behavior=self.behavior, attrs=self.attrs) @borrow_docstring(ak.with_name) @@ -1039,12 +1092,13 @@ def with_name( name: str | None, highlevel: bool = True, behavior: Mapping | None = None, + attrs: Mapping[str, Any] | None = None, ) -> Array: if not highlevel: raise ValueError("Only highlevel=True is supported") return map_partitions( - _WithNameFn(name=name, behavior=behavior), + _WithNameFn(name=name, behavior=behavior, attrs=attrs), array, label="with-name", output_divisions=1, @@ -1052,10 +1106,17 @@ def with_name( class _WithParameterFn: - def __init__(self, parameter, value, behavior): + def __init__( + self, + parameter: str, + value: Any, + behavior: Mapping | None, + attrs: Mapping[str, Any] | None, + ): self.parameter = parameter self.value = value self.behavior = behavior + self.attrs = attrs def __call__(self, array): return ak.with_parameter( @@ -1063,6 +1124,7 @@ def __call__(self, array): parameter=self.parameter, value=self.value, behavior=self.behavior, + attrs=self.attrs, ) @@ -1073,11 +1135,14 @@ def with_parameter( value: Any, highlevel: bool = True, behavior: Mapping | None = None, + attrs: Mapping[str, Any] | None = None, ) -> Array: if not highlevel: raise ValueError("Only highlevel=True is supported") return map_partitions( - _WithParameterFn(parameter=parameter, value=value, behavior=behavior), + _WithParameterFn( + parameter=parameter, value=value, behavior=behavior, attrs=attrs + ), array, label="with-parameter", output_divisions=1, @@ -1085,11 +1150,12 @@ def with_parameter( class _WithoutParameterFn: - def __init__(self, behavior): + def __init__(self, behavior: Mapping | None, attrs: Mapping[str, Any] | None): self.behavior = behavior + self.attrs = attrs def __call__(self, array): - return ak.without_parameters(array, behavior=self.behavior) + return ak.without_parameters(array, behavior=self.behavior, attrs=self.attrs) @borrow_docstring(ak.without_parameters) @@ -1097,11 +1163,12 @@ def without_parameters( array: Array, highlevel: bool = True, behavior: Mapping | None = None, + attrs: Mapping[str, Any] | None = None, ) -> Array: if not highlevel: raise ValueError("Only highlevel=True is supported") return map_partitions( - _WithoutParameterFn(behavior=behavior), + _WithoutParameterFn(behavior=behavior, attrs=attrs), array, label="without-parameters", output_divisions=1, @@ -1113,7 +1180,8 @@ def zeros_like( array: Array, highlevel: bool = True, behavior: Mapping | None = None, - dtype: np.dtype | str | None = None, + attrs: Mapping[str, Any] | None = None, + dtype: DTypeLike | None = None, ) -> Array: if not highlevel: raise ValueError("Only highlevel=True is supported") @@ -1124,6 +1192,7 @@ def zeros_like( behavior=behavior, dtype=dtype, output_divisions=1, + attrs=attrs, ) @@ -1157,11 +1226,12 @@ def zip( behavior: Mapping | None = None, right_broadcast: bool = False, optiontype_outside_record: bool = False, + attrs: Mapping[str, Any] | None = None, ) -> Array: if not highlevel: raise ValueError("Only highlevel=True is supported") - if isinstance(arrays, dict): + if isinstance(arrays, Mapping): keys, colls, metadict = [], [], {} for k, coll in arrays.items(): keys.append(k) @@ -1177,6 +1247,7 @@ def zip( behavior=behavior, right_broadcast=right_broadcast, optiontype_outside_record=optiontype_outside_record, + attrs=attrs, ) return map_partitions( @@ -1189,6 +1260,7 @@ def zip( behavior=behavior, right_broadcast=right_broadcast, optiontype_outside_record=optiontype_outside_record, + attrs=attrs, ), *colls, label="zip", @@ -1196,7 +1268,7 @@ def zip( opt_touch_all=True, ) - elif isinstance(arrays, (list, tuple)): + elif isinstance(arrays, Sequence): fn = _ZipListInputFn( depth_limit=depth_limit, parameters=parameters, @@ -1205,6 +1277,7 @@ def zip( behavior=behavior, right_broadcast=right_broadcast, optiontype_outside_record=optiontype_outside_record, + attrs=attrs, ) return map_partitions( fn, @@ -1214,7 +1287,7 @@ def zip( else: raise DaskAwkwardNotImplemented( - "only sized iterables are supported by dak.zip (dict, list, or tuple)" + "only mappings or sequences are supported by dak.zip (e.g. dict, list, or tuple)" ) diff --git a/tests/test_getitem.py b/tests/test_getitem.py index 420b14ea..c6e36909 100644 --- a/tests/test_getitem.py +++ b/tests/test_getitem.py @@ -47,6 +47,8 @@ def test_list_with_ints_raise(daa: dak.Array) -> None: def test_single_int(daa: dak.Array, caa: ak.Array) -> None: + daa = dak.copy(daa) + daa.eager_compute_divisions() total = len(daa) assert daa.known_divisions for i in range(total): diff --git a/tests/test_structure.py b/tests/test_structure.py index 53d4efcf..e6ab781d 100644 --- a/tests/test_structure.py +++ b/tests/test_structure.py @@ -88,7 +88,9 @@ def test_zip_tuple_input(caa: ak.Array, daa: dak.Array) -> None: def test_zip_bad_input(daa: dak.Array) -> None: da1 = daa.points.x gd = (x for x in (da1, da1)) - with pytest.raises(DaskAwkwardNotImplemented, match="only sized iterables"): + with pytest.raises( + DaskAwkwardNotImplemented, match="only mappings or sequences are supported" + ): dak.zip(gd)