From 43b2e43112fcdb7e1c998b2b46ee22ab669c9003 Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Thu, 25 Apr 2024 11:19:38 -0400 Subject: [PATCH] pass buffer names around, not columns --- src/dask_awkward/layers/layers.py | 2 +- src/dask_awkward/lib/core.py | 5 +++- src/dask_awkward/lib/io/columnar.py | 16 +++++-------- src/dask_awkward/lib/io/io.py | 33 +++++++++++++++++--------- src/dask_awkward/lib/io/json.py | 13 +++-------- src/dask_awkward/lib/io/parquet.py | 7 ++++-- src/dask_awkward/lib/optimize.py | 36 ++++++++++++----------------- src/dask_awkward/lib/structure.py | 1 + src/dask_awkward/lib/utils.py | 12 ++++++++++ tests/test_io.py | 6 ++--- 10 files changed, 72 insertions(+), 59 deletions(-) diff --git a/src/dask_awkward/layers/layers.py b/src/dask_awkward/layers/layers.py index 6ede5e08..a14bd173 100644 --- a/src/dask_awkward/layers/layers.py +++ b/src/dask_awkward/layers/layers.py @@ -29,8 +29,8 @@ def from_blockwise(cls, layer: Blockwise) -> AwkwardBlockwiseLayer: return ob def __getstate__(self) -> dict: - # Indicator that this layer has been serialised state = self.__dict__.copy() + # Indicator that this layer has been serialised state["has_been_unpickled"] = True state.pop("meta", None) # this is a typetracer return state diff --git a/src/dask_awkward/lib/core.py b/src/dask_awkward/lib/core.py index d4506015..b678d202 100644 --- a/src/dask_awkward/lib/core.py +++ b/src/dask_awkward/lib/core.py @@ -523,6 +523,7 @@ def f(self, other): meta = op(self._meta, other._meta) else: meta = op(self._meta, other) + [_.commit(name) for _ in self.report] return new_scalar_object(graph, name, meta=meta) return f @@ -1173,11 +1174,13 @@ def _partitions(self, index: Any) -> Array: name = f"partitions-{token}" new_keys = self.keys_array[index].tolist() dsk = {(name, i): tuple(key) for i, key in enumerate(new_keys)} + layer = AwkwardMaterializedLayer(dsk, previous_layer_names=[self.name]) graph = HighLevelGraph.from_collections( name, - AwkwardMaterializedLayer(dsk, previous_layer_names=[self.name]), + layer, dependencies=(self,), ) + layer.meta = self._meta # if a single partition was requested we trivially know the new divisions. if len(raw) == 1 and isinstance(raw[0], int) and self.known_divisions: diff --git a/src/dask_awkward/lib/io/columnar.py b/src/dask_awkward/lib/io/columnar.py index 36cfc007..ff449e8c 100644 --- a/src/dask_awkward/lib/io/columnar.py +++ b/src/dask_awkward/lib/io/columnar.py @@ -1,17 +1,10 @@ from __future__ import annotations import logging -from typing import TYPE_CHECKING, Protocol, TypeVar, cast import awkward as ak -from awkward import Array as AwkwardArray -from awkward.forms import Form from dask_awkward.layers.layers import BackendT -from dask_awkward.lib.utils import FormStructure - -if TYPE_CHECKING: - from awkward._nplikes.typetracer import TypeTracerReport log = logging.getLogger(__name__) @@ -25,10 +18,13 @@ class ColumnProjectionMixin: when only metadata buffers are required. """ - def mock_empty(self: S, backend: BackendT = "cpu") -> AwkwardArray: + def project(self, *args, **kwargs): + # default implementation does nothing + return self + + def mock_empty(self, backend: BackendT = "cpu"): # used by failure report generation - return cast( - AwkwardArray, + return ( ak.to_backend( self.form.length_zero_array(highlevel=False, behavior=self.behavior), backend, diff --git a/src/dask_awkward/lib/io/io.py b/src/dask_awkward/lib/io/io.py index f9f5c870..7c0d68f2 100644 --- a/src/dask_awkward/lib/io/io.py +++ b/src/dask_awkward/lib/io/io.py @@ -132,10 +132,13 @@ def from_awkward( ) -class _FromListsFn: - def __init__(self, behavior: Mapping | None, attrs: Mapping[str, Any] | None): +class _FromListsFn(ColumnProjectionMixin): + def __init__( + self, behavior: Mapping | None, attrs: Mapping[str, Any] | None, form=None + ): self.behavior = behavior self.attrs = attrs + self.form = form def __call__(self, x: list) -> ak.Array: return ak.Array(x, behavior=self.behavior, attrs=self.attrs) @@ -177,12 +180,13 @@ def from_lists( """ lists = list(source) divs = (0, *np.cumsum(list(map(len, lists)))) + meta = typetracer_array(ak.Array(lists[0], attrs=attrs, behavior=behavior)) return cast( Array, from_map( - _FromListsFn(behavior=behavior, attrs=attrs), + _FromListsFn(behavior=behavior, attrs=attrs, form=meta.layout.form), lists, - meta=typetracer_array(ak.Array(lists[0], attrs=attrs, behavior=behavior)), + meta=meta, divisions=divs, label="from-lists", ), @@ -425,7 +429,7 @@ def from_dask_array( ) layer = AwkwardBlockwiseLayer.from_blockwise(layer) layer.meta = meta - meta._report = set() + meta._report = set() # just because we can't project, we shouldn't track? hlg = HighLevelGraph.from_collections(name, layer, dependencies=[array]) if np.any(np.isnan(array.chunks)): return new_array_object( @@ -634,14 +638,21 @@ def from_map( ) io_func._column_report = report report.commit(name) - array_meta._report = { - report - } # column tracking report, not failure report, below - # If we know the meta, we can spoof mocking + # column tracking report, not failure report, below + array_meta._report = {report} + # Without `meta`, the meta will be computed by executing the graph elif meta is not None: + # we can still track necessary columns even if we can't project io_func = func - array_meta = meta - # Without `meta`, the meta will be computed by executing the graph + array_meta, report = typetracer_with_report( + form_with_unique_keys(meta.layout.form, "@"), + highlevel=True, + behavior=None, + buffer_key=render_buffer_key, + ) + report.commit(name) + # column tracking report, not failure report, below + array_meta._report = {report} else: io_func = func array_meta = None diff --git a/src/dask_awkward/lib/io/json.py b/src/dask_awkward/lib/io/json.py index 52849f8a..f289f3de 100644 --- a/src/dask_awkward/lib/io/json.py +++ b/src/dask_awkward/lib/io/json.py @@ -32,6 +32,7 @@ _BytesReadingInstructions, from_map, ) +from dask_awkward.lib.utils import _buf_to_col if TYPE_CHECKING: from awkward.contents.content import Content @@ -76,16 +77,8 @@ def use_optimization(self) -> bool: ) def project(self, columns: list[str]): - # transform buffer names to JSON columns - columns = { - c.replace(".content", "") - .replace("-offsets", "") - .replace("-data", "") - .replace("-index", "") - .replace("-mask", "") - for c in columns - } - form = self.form.select_columns(columns) + cols = [_buf_to_col(s) for s in columns] + form = self.form.select_columns(cols) assert form is not None schema = layout_to_jsonschema(form.length_zero_array(highlevel=False)) diff --git a/src/dask_awkward/lib/io/parquet.py b/src/dask_awkward/lib/io/parquet.py index 89774142..6863a0f8 100644 --- a/src/dask_awkward/lib/io/parquet.py +++ b/src/dask_awkward/lib/io/parquet.py @@ -24,6 +24,7 @@ from dask_awkward.lib.io.columnar import ColumnProjectionMixin from dask_awkward.lib.io.io import from_map from dask_awkward.lib.unproject_layout import unproject_layout +from dask_awkward.lib.utils import _buf_to_col if TYPE_CHECKING: pass @@ -178,9 +179,10 @@ def __call__(self, *args, **kwargs): return self.read_fn(source) def project(self, columns): + cols = [_buf_to_col(s) for s in columns] return FromParquetFileWiseFn( fs=self.fs, - form=self.form.select_columns(columns), + form=self.form.select_columns(cols), listsep=self.listsep, unnamed_root=self.unnamed_root, original_form=self.form, @@ -237,9 +239,10 @@ def __call__(self, pair: Any) -> ak.Array: ) def project(self, columns): + cols = [_buf_to_col(s) for s in columns] return FromParquetFragmentWiseFn( fs=self.fs, - form=self.form.select_columns(columns), + form=self.form.select_columns(cols), unnamed_root=self.unnamed_root, original_form=self.form, report=self.report, diff --git a/src/dask_awkward/lib/optimize.py b/src/dask_awkward/lib/optimize.py index 8d6e0beb..95bb7c7e 100644 --- a/src/dask_awkward/lib/optimize.py +++ b/src/dask_awkward/lib/optimize.py @@ -12,7 +12,7 @@ from dask.highlevelgraph import HighLevelGraph from dask_awkward.layers import AwkwardBlockwiseLayer, AwkwardInputLayer -from dask_awkward.lib.utils import typetracer_nochecks +from dask_awkward.lib.utils import _buf_to_col, typetracer_nochecks from dask_awkward.utils import first if TYPE_CHECKING: @@ -106,9 +106,12 @@ def optimize_columns(dsk: HighLevelGraph, keys: Sequence[Key]) -> HighLevelGraph lays = {_[0] for _ in keys if isinstance(_, tuple)} all_reps = set() for ln in lays: - if hasattr(dsk.layers[ln], "meta"): + if ln in dsk.layers and hasattr(dsk.layers[ln], "meta"): touch_data(dsk.layers[ln].meta) all_reps.update(getattr(dsk.layers[ln].meta, "_report", ())) + print() + print(ln) + print(getattr(dsk.layers[ln].meta, "_report", ()), all_reps) name = tokenize("output", lays) [_.commit(name) for _ in all_reps] all_layers = tuple(dsk.layers) + (name,) @@ -120,17 +123,6 @@ def optimize_columns(dsk: HighLevelGraph, keys: Sequence[Key]) -> HighLevelGraph return HighLevelGraph(dsk2, dsk.dependencies) -def _buf_to_col(s): - return ( - s[2:] - .replace(".content", "") - .replace("-offsets", "") - .replace("-data", "") - .replace("-index", "") - .replace("-mask", "") - ) - - def _optimize_columns(dsk, all_layers): # this loop is necessary_columns @@ -139,21 +131,21 @@ def _optimize_columns(dsk, all_layers): lay.io_func, "_column_report" ): continue - all_cols = lay.meta.layout.form.columns() rep = lay.io_func._column_report cols = set() + # this loop not required after next ak release for ln in all_layers: - # this loop not required after next ak release try: - cols |= {_buf_to_col(s) for s in rep.data_touched_in((ln,))} - for col in (_buf_to_col(s) for s in rep.shape_touched_in((ln,))): + cols.update(rep.data_touched_in((ln,))) + except KeyError: + pass + try: + for col in rep.shape_touched_in((ln,)): if col in cols: continue if any(_.startswith(col) for _ in cols): continue - ll = list(_ for _ in all_cols if _.startswith(col)) - if ll: - cols.add(ll[0]) + cols.add(col) except KeyError: pass @@ -175,7 +167,9 @@ def necessary_columns(*args): out = {} for k, _, cols in _optimize_columns(dsk, all_layers): - out[k] = cols + first = (_buf_to_col(s) for s in cols) + # remove root "offsets", which appears when computing divisions + out[k] = sorted(s for s in first if s and s != "offsets") return out diff --git a/src/dask_awkward/lib/structure.py b/src/dask_awkward/lib/structure.py index 6b9fafad..6771e1de 100644 --- a/src/dask_awkward/lib/structure.py +++ b/src/dask_awkward/lib/structure.py @@ -336,6 +336,7 @@ def copy(array: Array) -> Array: # dask-awkward's copy is metadata-only old_meta = array._meta new_meta = ak.Array(old_meta.layout, behavior=deepcopy(old_meta._behavior)) + new_meta._report = old_meta._report return Array( array._dask, diff --git a/src/dask_awkward/lib/utils.py b/src/dask_awkward/lib/utils.py index 7b067386..c8686faf 100644 --- a/src/dask_awkward/lib/utils.py +++ b/src/dask_awkward/lib/utils.py @@ -178,3 +178,15 @@ def typetracer_nochecks(): TypeTracerArray.runtime_typechecks = oldval else: del TypeTracerArray.runtime_typechecks + + +def _buf_to_col(s): + return ( + s[2:] + .replace("content.", "") + .replace("-offsets", "") + .replace("-data", "") + .replace("-index", "") + .replace("-mask", "") + .replace("-tags", "") + ) diff --git a/tests/test_io.py b/tests/test_io.py index 396f409d..9c289ff7 100644 --- a/tests/test_io.py +++ b/tests/test_io.py @@ -434,7 +434,7 @@ def point_abs(self): caa = ak.with_name(caa.points, name="Point", behavior=behavior) daa = dak.from_awkward(caa, npartitions=2, behavior=behavior) assert_eq(caa.xsq, daa.xsq) - assert set(first(dak.necessary_columns(daa.xsq).items())[1]) == {"x"} - assert set(first(dak.necessary_columns(daa).items())[1]) == {"x", "y"} - assert set(first(dak.necessary_columns(np.abs(daa)).items())[1]) == {"x", "y"} + assert set(first(dak.necessary_columns(daa.xsq).values())) == {"x"} + assert set(first(dak.necessary_columns(daa).values())) == {"x", "y"} + assert set(first(dak.necessary_columns(np.abs(daa)).values())) == {"x", "y"} assert_eq(np.abs(caa), np.abs(daa))