From 423bc04f372c846a3d33308c7b453588b30f8ecb Mon Sep 17 00:00:00 2001 From: Doug Davis Date: Wed, 15 Nov 2023 12:23:04 -0600 Subject: [PATCH 1/6] harden dtype handling for scalars --- src/dask_awkward/lib/core.py | 74 ++++++++++++++++++++---------- src/dask_awkward/lib/io/json.py | 2 +- src/dask_awkward/lib/io/parquet.py | 2 +- tests/test_core.py | 17 +++---- 4 files changed, 59 insertions(+), 36 deletions(-) diff --git a/src/dask_awkward/lib/core.py b/src/dask_awkward/lib/core.py index dc258bbc..4983385e 100644 --- a/src/dask_awkward/lib/core.py +++ b/src/dask_awkward/lib/core.py @@ -84,14 +84,22 @@ def __init__( self, dsk: HighLevelGraph, name: str, - meta: Any, + meta: Any | None = None, + dtype: DTypeLike | None = None, known_value: Any | None = None, ) -> None: if not isinstance(dsk, HighLevelGraph): dsk = HighLevelGraph.from_collections(name, dsk, dependencies=()) # type: ignore self._dask: HighLevelGraph = dsk self._name: str = name - self._meta: Any = self._check_meta(meta) + if meta is not None and dtype is None: + self._meta = self._check_meta(meta) + self._dtype = np.dtype(self._meta.type.content.primitive) + elif meta is None and dtype is not None: + self._meta = ak.Array(TypeTracerArray._new(dtype=dtype, shape=())) + self._dtype = np.dtype(self._meta.type.content.primitive) + else: + ValueError("One (and only one) of dtype or meta can be defined.") self._known_value: Any | None = known_value def __dask_graph__(self) -> Graph: @@ -125,7 +133,7 @@ def _rebuild(self, dsk, *, rename=None): return type(self)(dsk, name, self._meta, self.known_value) def __reduce__(self): - return (Scalar, (self.dask, self.name, self._meta, self.known_value)) + return (Scalar, (self.dask, self.name, None, self.dtype, self.known_value)) @property def dask(self) -> HighLevelGraph: @@ -140,20 +148,20 @@ def key(self) -> Key: return (self._name, 0) def _check_meta(self, m: Any) -> Any | None: - if isinstance(m, (MaybeNone, OneOf)) or is_unknown_scalar(m): - return m + if isinstance(m, MaybeNone): + return ak.Array(m.content) elif isinstance(m, ak.Array) and len(m) == 1: return m - raise TypeError(f"meta must be a typetracer object, not a {type(m)}") + elif isinstance(m, OneOf) or is_unknown_scalar(m): + if isinstance(m, TypeTracerArray): + return ak.Array(m) + else: + return m + raise TypeError(f"meta must be a typetracer, not a {type(m)}") @property - def dtype(self) -> np.dtype | None: - try: - if self._meta is not None: - return self._meta.dtype - except AttributeError: - pass - return None + def dtype(self) -> np.dtype: + return self._dtype @property def npartitions(self) -> int: @@ -182,15 +190,14 @@ def __repr__(self) -> str: # pragma: no cover return self.__str__() def __str__(self) -> str: - dt = self.dtype or "Unknown" if self.known_value is not None: return ( f"dask.awkward<{key_split(self.name)}, " "type=Scalar, " - f"dtype={dt}, " + f"dtype={self.dtype}, " f"known_value={self.known_value}>" ) - return f"dask.awkward<{key_split(self.name)}, type=Scalar, dtype={dt}>" + return f"dask.awkward<{key_split(self.name)}, type=Scalar, dtype={self.dtype}>" def __getitem__(self, where: Any) -> Any: token = tokenize(self, operator.getitem, where) @@ -260,7 +267,11 @@ def f(self, other): ), dependencies=tuple(deps), ) - return new_scalar_object(graph, name, meta=None) + if isinstance(other, Scalar): + meta = op(self._meta, other._meta) + else: + meta = op(self._meta, other) + return new_scalar_object(graph, name, meta=meta) return f @@ -277,7 +288,8 @@ def f(self): layer, dependencies=(self,), ) - return new_scalar_object(graph, name, meta=None) + meta = op(self._meta) + return new_scalar_object(graph, name, meta=meta) return f @@ -330,7 +342,13 @@ def f(*args): Scalar._bind_operator(op) -def new_scalar_object(dsk: HighLevelGraph, name: str, *, meta: Any) -> Scalar: +def new_scalar_object( + dsk: HighLevelGraph, + name: str, + *, + meta: Any | None = None, + dtype: DTypeLike | None = None, +) -> Scalar: """Instantiate a new scalar collection. Parameters @@ -348,8 +366,13 @@ def new_scalar_object(dsk: HighLevelGraph, name: str, *, meta: Any) -> Scalar: Resulting collection. """ - if meta is None: - meta = ak.Array(TypeTracerArray._new(dtype=np.dtype(None), shape=())) + + if meta is not None and dtype is None: + pass + elif meta is None and dtype is not None: + meta = ak.Array(TypeTracerArray._new(dtype=np.dtype(dtype), shape=())) + else: + ValueError("One (and only one) of dtype or meta can be defined.") if isinstance(meta, MaybeNone): meta = ak.Array(meta.content) @@ -412,7 +435,10 @@ def new_known_scalar( llg = AwkwardMaterializedLayer({(name, 0): s}, previous_layer_names=[]) hlg = HighLevelGraph.from_collections(name, llg, dependencies=()) return Scalar( - hlg, name, meta=TypeTracerArray._new(dtype=dtype, shape=()), known_value=s + hlg, + name, + dtype=dtype, + known_value=s, ) @@ -428,7 +454,9 @@ class will be results from awkward operations. """ def __init__(self, dsk: HighLevelGraph, name: str, meta: Any | None = None) -> None: - super().__init__(dsk, name, meta) + self._dask: HighLevelGraph = dsk + self._name: str = name + self._meta: ak.Record = self._check_meta(meta) def _check_meta(self, m: Any | None) -> Any | None: if not isinstance(m, ak.Record): diff --git a/src/dask_awkward/lib/io/json.py b/src/dask_awkward/lib/io/json.py index 43ba19e3..b20c6f63 100644 --- a/src/dask_awkward/lib/io/json.py +++ b/src/dask_awkward/lib/io/json.py @@ -752,7 +752,7 @@ def to_json( AwkwardMaterializedLayer(dsk, previous_layer_names=[map_res.name]), dependencies=(map_res,), ) - res = new_scalar_object(graph, name=name, meta=None) + res = new_scalar_object(graph, name=name, dtype="f8") if compute: res.compute() return None diff --git a/src/dask_awkward/lib/io/parquet.py b/src/dask_awkward/lib/io/parquet.py index 8a3c0417..9ac5776e 100644 --- a/src/dask_awkward/lib/io/parquet.py +++ b/src/dask_awkward/lib/io/parquet.py @@ -615,7 +615,7 @@ def to_parquet( AwkwardMaterializedLayer(dsk, previous_layer_names=[map_res.name]), dependencies=[map_res], ) - out = new_scalar_object(graph, final_name, meta=None) + out = new_scalar_object(graph, final_name, dtype="f8") if compute: out.compute() return None diff --git a/tests/test_core.py b/tests/test_core.py index 4baec896..6d04296f 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -216,16 +216,13 @@ def test_scalar_collection(daa: Array) -> None: assert type(daa["points", "x"][0][0]) is Scalar -def test_scalar_getitem_getattr() -> None: - d = {"a": 5} - s = new_known_scalar(d) - assert s["a"].compute() == d["a"] - Thing = namedtuple("Thing", "a b c") - t = Thing(c=3, b=2, a=1) - s = new_known_scalar(t) +def test_known_scalar() -> None: + i = 5 + s = new_known_scalar(5) + assert s.compute() == 5 with pytest.raises(AttributeError, match="should be done after converting"): - s.c.compute() - assert s.to_delayed().c.compute() == t.c + s.denominator.compute() + assert s.to_delayed().denominator.compute() == i.denominator @pytest.mark.parametrize("op", [operator.add, operator.truediv, operator.mul]) @@ -474,8 +471,6 @@ def test_scalar_dtype() -> None: s = 2 c = new_known_scalar(s) assert c.dtype == np.dtype(type(s)) - c._meta = None - assert c.dtype is None def test_scalar_pickle(daa: Array) -> None: From 42bf74060ffed4305ad08664f2ede3221c4b70b5 Mon Sep 17 00:00:00 2001 From: Doug Davis Date: Thu, 16 Nov 2023 12:26:19 -0600 Subject: [PATCH 2/6] use create_unknown_scalar; fix 2.5.0 pq call --- src/dask_awkward/lib/core.py | 7 ++++--- src/dask_awkward/lib/io/parquet.py | 4 ++++ src/dask_awkward/lib/structure.py | 3 ++- 3 files changed, 10 insertions(+), 4 deletions(-) diff --git a/src/dask_awkward/lib/core.py b/src/dask_awkward/lib/core.py index 4983385e..f32e11c3 100644 --- a/src/dask_awkward/lib/core.py +++ b/src/dask_awkward/lib/core.py @@ -24,6 +24,7 @@ is_unknown_scalar, ) from awkward.highlevel import NDArrayOperatorsMixin, _dir_pattern +from awkward.typetracer import create_unknown_scalar from dask.base import ( DaskMethodsMixin, dont_optimize, @@ -96,7 +97,7 @@ def __init__( self._meta = self._check_meta(meta) self._dtype = np.dtype(self._meta.type.content.primitive) elif meta is None and dtype is not None: - self._meta = ak.Array(TypeTracerArray._new(dtype=dtype, shape=())) + self._meta = ak.Array(create_unknown_scalar(dtype)) self._dtype = np.dtype(self._meta.type.content.primitive) else: ValueError("One (and only one) of dtype or meta can be defined.") @@ -147,7 +148,7 @@ def name(self) -> str: def key(self) -> Key: return (self._name, 0) - def _check_meta(self, m: Any) -> Any | None: + def _check_meta(self, m): if isinstance(m, MaybeNone): return ak.Array(m.content) elif isinstance(m, ak.Array) and len(m) == 1: @@ -370,7 +371,7 @@ def new_scalar_object( if meta is not None and dtype is None: pass elif meta is None and dtype is not None: - meta = ak.Array(TypeTracerArray._new(dtype=np.dtype(dtype), shape=())) + meta = ak.Array(create_unknown_scalar(dtype)) else: ValueError("One (and only one) of dtype or meta can be defined.") diff --git a/src/dask_awkward/lib/io/parquet.py b/src/dask_awkward/lib/io/parquet.py index 9ac5776e..e19e7b6f 100644 --- a/src/dask_awkward/lib/io/parquet.py +++ b/src/dask_awkward/lib/io/parquet.py @@ -42,6 +42,7 @@ def __init__( unnamed_root: bool = False, original_form: Form | None = None, behavior: dict | None = None, + attrs: dict | None = None, **kwargs: Any, ) -> None: self.fs = fs @@ -53,6 +54,7 @@ def __init__( self.columns = [f".{c}" for c in self.columns] self.original_form = original_form self.behavior = behavior + self.attrs = attrs self.kwargs = kwargs @abc.abstractmethod @@ -119,6 +121,7 @@ def __call__(self, source: Any) -> Any: highlevel=True, fs=self.fs, behavior=self.behavior, + attrs=self.attrs, **self.kwargs, ) return ak.Array(unproject_layout(self.original_form, array.layout)) @@ -169,6 +172,7 @@ def __call__(self, pair: Any) -> ak.Array: highlevel=True, fs=self.fs, behavior=self.behavior, + attrs=self.attrs, **self.kwargs, ) return ak.Array(unproject_layout(self.original_form, array.layout)) diff --git a/src/dask_awkward/lib/structure.py b/src/dask_awkward/lib/structure.py index 2bb797b3..76a739e5 100644 --- a/src/dask_awkward/lib/structure.py +++ b/src/dask_awkward/lib/structure.py @@ -10,6 +10,7 @@ import awkward as ak import numpy as np from awkward._nplikes.typetracer import TypeTracerArray +from awkward.typetracer import create_unknown_scalar from dask.base import is_dask_collection, tokenize from dask.highlevelgraph import HighLevelGraph @@ -630,7 +631,7 @@ def num( return new_scalar_object( hlg, name, - meta=ak.Array(TypeTracerArray._new(dtype=np.dtype("int64"), shape=())), + meta=ak.Array(create_unknown_scalar(np.dtype("int64"))), ) else: return map_partitions( From 91180a9d2468deed1a8e7fc52150804212524757 Mon Sep 17 00:00:00 2001 From: Doug Davis Date: Thu, 16 Nov 2023 12:27:59 -0600 Subject: [PATCH 3/6] simplify --- src/dask_awkward/lib/core.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/dask_awkward/lib/core.py b/src/dask_awkward/lib/core.py index f32e11c3..c82ece6c 100644 --- a/src/dask_awkward/lib/core.py +++ b/src/dask_awkward/lib/core.py @@ -98,7 +98,7 @@ def __init__( self._dtype = np.dtype(self._meta.type.content.primitive) elif meta is None and dtype is not None: self._meta = ak.Array(create_unknown_scalar(dtype)) - self._dtype = np.dtype(self._meta.type.content.primitive) + self._dtype = dtype else: ValueError("One (and only one) of dtype or meta can be defined.") self._known_value: Any | None = known_value From 554f0409997ec239c78d9786ff8fc95e94e55472 Mon Sep 17 00:00:00 2001 From: Doug Davis Date: Thu, 16 Nov 2023 12:31:23 -0600 Subject: [PATCH 4/6] typing --- src/dask_awkward/lib/core.py | 2 +- tests/test_core.py | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/src/dask_awkward/lib/core.py b/src/dask_awkward/lib/core.py index c82ece6c..38507a1e 100644 --- a/src/dask_awkward/lib/core.py +++ b/src/dask_awkward/lib/core.py @@ -98,7 +98,7 @@ def __init__( self._dtype = np.dtype(self._meta.type.content.primitive) elif meta is None and dtype is not None: self._meta = ak.Array(create_unknown_scalar(dtype)) - self._dtype = dtype + self._dtype = dtype # type: ignore else: ValueError("One (and only one) of dtype or meta can be defined.") self._known_value: Any | None = known_value diff --git a/tests/test_core.py b/tests/test_core.py index 6d04296f..04d96bd3 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -4,7 +4,6 @@ import json import operator import sys -from collections import namedtuple from collections.abc import Callable from typing import TYPE_CHECKING From e9c6af1fa8a914cbe8b7490902e3e185f42d3d96 Mon Sep 17 00:00:00 2001 From: Doug Davis Date: Thu, 30 Nov 2023 09:39:31 -0600 Subject: [PATCH 5/6] Update src/dask_awkward/lib/core.py Co-authored-by: Angus Hollands --- src/dask_awkward/lib/core.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/dask_awkward/lib/core.py b/src/dask_awkward/lib/core.py index e85bc0d9..7a613a92 100644 --- a/src/dask_awkward/lib/core.py +++ b/src/dask_awkward/lib/core.py @@ -95,7 +95,7 @@ def __init__( self._name: str = name if meta is not None and dtype is None: self._meta = self._check_meta(meta) - self._dtype = np.dtype(self._meta.type.content.primitive) + self._dtype = self._meta.layout.dtype elif meta is None and dtype is not None: self._meta = ak.Array(create_unknown_scalar(dtype)) self._dtype = dtype # type: ignore From e99d6fb1a5a5132acbb83a605da8fe21bfbcdd73 Mon Sep 17 00:00:00 2001 From: Doug Davis Date: Thu, 30 Nov 2023 10:01:19 -0600 Subject: [PATCH 6/6] unused ignore --- src/dask_awkward/lib/core.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/dask_awkward/lib/core.py b/src/dask_awkward/lib/core.py index 24331d45..87800825 100644 --- a/src/dask_awkward/lib/core.py +++ b/src/dask_awkward/lib/core.py @@ -276,7 +276,7 @@ def __init__( self._dtype = self._meta.layout.dtype elif meta is None and dtype is not None: self._meta = ak.Array(create_unknown_scalar(dtype)) - self._dtype = dtype # type: ignore + self._dtype = dtype else: ValueError("One (and only one) of dtype or meta can be defined.") self._known_value: Any | None = known_value