Skip to content

Commit

Permalink
pass buffer names around, not columns
Browse files Browse the repository at this point in the history
  • Loading branch information
martindurant committed Apr 25, 2024
1 parent 8bbe409 commit 43b2e43
Show file tree
Hide file tree
Showing 10 changed files with 72 additions and 59 deletions.
2 changes: 1 addition & 1 deletion src/dask_awkward/layers/layers.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ def from_blockwise(cls, layer: Blockwise) -> AwkwardBlockwiseLayer:
return ob

def __getstate__(self) -> dict:
# Indicator that this layer has been serialised
state = self.__dict__.copy()
# Indicator that this layer has been serialised
state["has_been_unpickled"] = True
state.pop("meta", None) # this is a typetracer
return state
Expand Down
5 changes: 4 additions & 1 deletion src/dask_awkward/lib/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,7 @@ def f(self, other):
meta = op(self._meta, other._meta)
else:
meta = op(self._meta, other)
[_.commit(name) for _ in self.report]
return new_scalar_object(graph, name, meta=meta)

return f
Expand Down Expand Up @@ -1173,11 +1174,13 @@ def _partitions(self, index: Any) -> Array:
name = f"partitions-{token}"
new_keys = self.keys_array[index].tolist()
dsk = {(name, i): tuple(key) for i, key in enumerate(new_keys)}
layer = AwkwardMaterializedLayer(dsk, previous_layer_names=[self.name])
graph = HighLevelGraph.from_collections(
name,
AwkwardMaterializedLayer(dsk, previous_layer_names=[self.name]),
layer,
dependencies=(self,),
)
layer.meta = self._meta

# if a single partition was requested we trivially know the new divisions.
if len(raw) == 1 and isinstance(raw[0], int) and self.known_divisions:
Expand Down
16 changes: 6 additions & 10 deletions src/dask_awkward/lib/io/columnar.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,10 @@
from __future__ import annotations

import logging
from typing import TYPE_CHECKING, Protocol, TypeVar, cast

import awkward as ak
from awkward import Array as AwkwardArray
from awkward.forms import Form

from dask_awkward.layers.layers import BackendT
from dask_awkward.lib.utils import FormStructure

if TYPE_CHECKING:
from awkward._nplikes.typetracer import TypeTracerReport

log = logging.getLogger(__name__)

Expand All @@ -25,10 +18,13 @@ class ColumnProjectionMixin:
when only metadata buffers are required.
"""

def mock_empty(self: S, backend: BackendT = "cpu") -> AwkwardArray:
def project(self, *args, **kwargs):
# default implementation does nothing
return self

def mock_empty(self, backend: BackendT = "cpu"):
# used by failure report generation
return cast(
AwkwardArray,
return (
ak.to_backend(
self.form.length_zero_array(highlevel=False, behavior=self.behavior),
backend,
Expand Down
33 changes: 22 additions & 11 deletions src/dask_awkward/lib/io/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,10 +132,13 @@ def from_awkward(
)


class _FromListsFn:
def __init__(self, behavior: Mapping | None, attrs: Mapping[str, Any] | None):
class _FromListsFn(ColumnProjectionMixin):
def __init__(
self, behavior: Mapping | None, attrs: Mapping[str, Any] | None, form=None
):
self.behavior = behavior
self.attrs = attrs
self.form = form

def __call__(self, x: list) -> ak.Array:
return ak.Array(x, behavior=self.behavior, attrs=self.attrs)
Expand Down Expand Up @@ -177,12 +180,13 @@ def from_lists(
"""
lists = list(source)
divs = (0, *np.cumsum(list(map(len, lists))))
meta = typetracer_array(ak.Array(lists[0], attrs=attrs, behavior=behavior))
return cast(
Array,
from_map(
_FromListsFn(behavior=behavior, attrs=attrs),
_FromListsFn(behavior=behavior, attrs=attrs, form=meta.layout.form),
lists,
meta=typetracer_array(ak.Array(lists[0], attrs=attrs, behavior=behavior)),
meta=meta,
divisions=divs,
label="from-lists",
),
Expand Down Expand Up @@ -425,7 +429,7 @@ def from_dask_array(
)
layer = AwkwardBlockwiseLayer.from_blockwise(layer)
layer.meta = meta
meta._report = set()
meta._report = set() # just because we can't project, we shouldn't track?
hlg = HighLevelGraph.from_collections(name, layer, dependencies=[array])
if np.any(np.isnan(array.chunks)):
return new_array_object(
Expand Down Expand Up @@ -634,14 +638,21 @@ def from_map(
)
io_func._column_report = report
report.commit(name)
array_meta._report = {
report
} # column tracking report, not failure report, below
# If we know the meta, we can spoof mocking
# column tracking report, not failure report, below
array_meta._report = {report}
# Without `meta`, the meta will be computed by executing the graph
elif meta is not None:
# we can still track necessary columns even if we can't project
io_func = func
array_meta = meta
# Without `meta`, the meta will be computed by executing the graph
array_meta, report = typetracer_with_report(
form_with_unique_keys(meta.layout.form, "@"),
highlevel=True,
behavior=None,
buffer_key=render_buffer_key,
)
report.commit(name)
# column tracking report, not failure report, below
array_meta._report = {report}
else:
io_func = func
array_meta = None
Expand Down
13 changes: 3 additions & 10 deletions src/dask_awkward/lib/io/json.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
_BytesReadingInstructions,
from_map,
)
from dask_awkward.lib.utils import _buf_to_col

if TYPE_CHECKING:
from awkward.contents.content import Content
Expand Down Expand Up @@ -76,16 +77,8 @@ def use_optimization(self) -> bool:
)

def project(self, columns: list[str]):
# transform buffer names to JSON columns
columns = {
c.replace(".content", "")
.replace("-offsets", "")
.replace("-data", "")
.replace("-index", "")
.replace("-mask", "")
for c in columns
}
form = self.form.select_columns(columns)
cols = [_buf_to_col(s) for s in columns]
form = self.form.select_columns(cols)
assert form is not None
schema = layout_to_jsonschema(form.length_zero_array(highlevel=False))

Expand Down
7 changes: 5 additions & 2 deletions src/dask_awkward/lib/io/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from dask_awkward.lib.io.columnar import ColumnProjectionMixin
from dask_awkward.lib.io.io import from_map
from dask_awkward.lib.unproject_layout import unproject_layout
from dask_awkward.lib.utils import _buf_to_col

if TYPE_CHECKING:
pass
Expand Down Expand Up @@ -178,9 +179,10 @@ def __call__(self, *args, **kwargs):
return self.read_fn(source)

def project(self, columns):
cols = [_buf_to_col(s) for s in columns]
return FromParquetFileWiseFn(
fs=self.fs,
form=self.form.select_columns(columns),
form=self.form.select_columns(cols),
listsep=self.listsep,
unnamed_root=self.unnamed_root,
original_form=self.form,
Expand Down Expand Up @@ -237,9 +239,10 @@ def __call__(self, pair: Any) -> ak.Array:
)

def project(self, columns):
cols = [_buf_to_col(s) for s in columns]
return FromParquetFragmentWiseFn(
fs=self.fs,
form=self.form.select_columns(columns),
form=self.form.select_columns(cols),
unnamed_root=self.unnamed_root,
original_form=self.form,
report=self.report,
Expand Down
36 changes: 15 additions & 21 deletions src/dask_awkward/lib/optimize.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from dask.highlevelgraph import HighLevelGraph

from dask_awkward.layers import AwkwardBlockwiseLayer, AwkwardInputLayer
from dask_awkward.lib.utils import typetracer_nochecks
from dask_awkward.lib.utils import _buf_to_col, typetracer_nochecks
from dask_awkward.utils import first

if TYPE_CHECKING:
Expand Down Expand Up @@ -106,9 +106,12 @@ def optimize_columns(dsk: HighLevelGraph, keys: Sequence[Key]) -> HighLevelGraph
lays = {_[0] for _ in keys if isinstance(_, tuple)}
all_reps = set()
for ln in lays:
if hasattr(dsk.layers[ln], "meta"):
if ln in dsk.layers and hasattr(dsk.layers[ln], "meta"):
touch_data(dsk.layers[ln].meta)
all_reps.update(getattr(dsk.layers[ln].meta, "_report", ()))
print()
print(ln)
print(getattr(dsk.layers[ln].meta, "_report", ()), all_reps)
name = tokenize("output", lays)
[_.commit(name) for _ in all_reps]
all_layers = tuple(dsk.layers) + (name,)
Expand All @@ -120,17 +123,6 @@ def optimize_columns(dsk: HighLevelGraph, keys: Sequence[Key]) -> HighLevelGraph
return HighLevelGraph(dsk2, dsk.dependencies)


def _buf_to_col(s):
return (
s[2:]
.replace(".content", "")
.replace("-offsets", "")
.replace("-data", "")
.replace("-index", "")
.replace("-mask", "")
)


def _optimize_columns(dsk, all_layers):

# this loop is necessary_columns
Expand All @@ -139,21 +131,21 @@ def _optimize_columns(dsk, all_layers):
lay.io_func, "_column_report"
):
continue
all_cols = lay.meta.layout.form.columns()
rep = lay.io_func._column_report
cols = set()
# this loop not required after next ak release
for ln in all_layers:
# this loop not required after next ak release
try:
cols |= {_buf_to_col(s) for s in rep.data_touched_in((ln,))}
for col in (_buf_to_col(s) for s in rep.shape_touched_in((ln,))):
cols.update(rep.data_touched_in((ln,)))
except KeyError:
pass
try:
for col in rep.shape_touched_in((ln,)):
if col in cols:
continue
if any(_.startswith(col) for _ in cols):
continue
ll = list(_ for _ in all_cols if _.startswith(col))
if ll:
cols.add(ll[0])
cols.add(col)

except KeyError:
pass
Expand All @@ -175,7 +167,9 @@ def necessary_columns(*args):

out = {}
for k, _, cols in _optimize_columns(dsk, all_layers):
out[k] = cols
first = (_buf_to_col(s) for s in cols)
# remove root "offsets", which appears when computing divisions
out[k] = sorted(s for s in first if s and s != "offsets")
return out


Expand Down
1 change: 1 addition & 0 deletions src/dask_awkward/lib/structure.py
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,7 @@ def copy(array: Array) -> Array:
# dask-awkward's copy is metadata-only
old_meta = array._meta
new_meta = ak.Array(old_meta.layout, behavior=deepcopy(old_meta._behavior))
new_meta._report = old_meta._report

return Array(
array._dask,
Expand Down
12 changes: 12 additions & 0 deletions src/dask_awkward/lib/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,3 +178,15 @@ def typetracer_nochecks():
TypeTracerArray.runtime_typechecks = oldval
else:
del TypeTracerArray.runtime_typechecks


def _buf_to_col(s):
return (
s[2:]
.replace("content.", "")
.replace("-offsets", "")
.replace("-data", "")
.replace("-index", "")
.replace("-mask", "")
.replace("-tags", "")
)
6 changes: 3 additions & 3 deletions tests/test_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,7 @@ def point_abs(self):
caa = ak.with_name(caa.points, name="Point", behavior=behavior)
daa = dak.from_awkward(caa, npartitions=2, behavior=behavior)
assert_eq(caa.xsq, daa.xsq)
assert set(first(dak.necessary_columns(daa.xsq).items())[1]) == {"x"}
assert set(first(dak.necessary_columns(daa).items())[1]) == {"x", "y"}
assert set(first(dak.necessary_columns(np.abs(daa)).items())[1]) == {"x", "y"}
assert set(first(dak.necessary_columns(daa.xsq).values())) == {"x"}
assert set(first(dak.necessary_columns(daa).values())) == {"x", "y"}
assert set(first(dak.necessary_columns(np.abs(daa)).values())) == {"x", "y"}
assert_eq(np.abs(caa), np.abs(daa))

0 comments on commit 43b2e43

Please sign in to comment.