Skip to content

Commit

Permalink
misc: typing (#373)
Browse files Browse the repository at this point in the history
  • Loading branch information
douglasdavis authored Sep 25, 2023
1 parent 605d842 commit ffa20fb
Show file tree
Hide file tree
Showing 21 changed files with 148 additions and 260 deletions.
10 changes: 9 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ classifiers = [
dependencies = [
"awkward >=2.4.0",
"dask >=2023.04.0",
"typing_extensions>=4.8.0; python_version < \"3.11\"",
"typing_extensions >=4.8.0",
]
dynamic = ["version"]

Expand Down Expand Up @@ -144,6 +144,14 @@ warn_unreachable = true
module = ["tlz.*"]
ignore_missing_imports = true

[[tool.mypy.overrides]]
module = ["uproot.*"]
ignore_missing_imports = true

[[tool.mypy.overrides]]
module = ["cloudpickle.*"]
ignore_missing_imports = true

[tool.pyright]
include = ["src"]
pythonVersion = "3.9"
Expand Down
5 changes: 1 addition & 4 deletions src/dask_awkward/layers/layers.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ def __init__(
super().__init__(mapping, **kwargs)

def mock(self) -> tuple[MaterializedLayer, Any | None]:
mapping = self.mapping.copy()
mapping = copy.copy(self.mapping)
if not mapping:
# no partitions at all
return self, None
Expand Down Expand Up @@ -256,9 +256,6 @@ def mock(self) -> tuple[MaterializedLayer, Any | None]:
task = (self.fn, *name0s)
return MaterializedLayer({(name, 0): task}), None

# failed to cull during column opt
return self, None


class AwkwardTreeReductionLayer(DataFrameTreeReduction):
def mock(self) -> tuple[AwkwardTreeReductionLayer, Any | None]:
Expand Down
75 changes: 29 additions & 46 deletions src/dask_awkward/lib/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import awkward as ak
import dask.config
import numpy as np
from awkward._do import remove_structure as ak_do_remove_structure
from awkward._nplikes.typetracer import (
MaybeNone,
OneOf,
Expand Down Expand Up @@ -833,26 +834,22 @@ def _getitem_trivial_map_partitions(
label=label,
)

def _getitem_outer_bool_or_int_lazy_array(
self, where: Array | tuple[Any, ...]
) -> Any:
def _getitem_outer_bool_or_int_lazy_array(self, where):
ba = where if isinstance(where, Array) else where[0]
if partition_compatibility(self, ba) == PartitionCompatibility.NO:
raise IncompatiblePartitions("getitem", self, ba)

new_meta: Any | None = None
if self._meta is not None:
if isinstance(where, tuple):
raise DaskAwkwardNotImplemented(
"tuple style input boolean/int selection is not supported."
)
elif isinstance(where, Array):
new_meta = self._meta[where._meta]
return self.map_partitions(
operator.getitem,
where,
meta=new_meta,
)
if isinstance(where, tuple):
raise DaskAwkwardNotImplemented(
"tuple style input boolean/int selection is not supported."
)

new_meta = self._meta[where._meta]
return self.map_partitions(
operator.getitem,
where,
meta=new_meta,
)

def _getitem_outer_str_or_list(
self,
Expand Down Expand Up @@ -942,9 +939,9 @@ def _getitem_outer_int(self, where: int | tuple[Any, ...]) -> Any:
else:
return new_scalar_object(hlg, name, meta=new_meta)

def _getitem_slice_on_zero(self, where: tuple[slice, ...]):
def _getitem_slice_on_zero(self, where):
# normalise
sl: slice = where[0]
sl = where[0]
rest = tuple(where[1:])
step = sl.step or 1
start = sl.start or 0
Expand Down Expand Up @@ -1014,7 +1011,7 @@ def _getitem_slice_on_zero(self, where: tuple[slice, ...]):
divisions=tuple(divisions),
)

def _getitem_tuple(self, where: tuple[Any, ...]) -> Array:
def _getitem_tuple(self, where):
if isinstance(where[0], int):
return self._getitem_outer_int(where)

Expand Down Expand Up @@ -1052,7 +1049,7 @@ def _getitem_tuple(self, where: tuple[Any, ...]) -> Array:
f"Array.__getitem__ doesn't support multi object: {where}"
)

def _getitem_single(self, where: Any) -> Array:
def _getitem_single(self, where):
# a single string
if isinstance(where, str):
return self._getitem_outer_str_or_list(where, label=where)
Expand Down Expand Up @@ -1089,17 +1086,7 @@ def _getitem_single(self, where: Any) -> Array:

raise DaskAwkwardNotImplemented(f"__getitem__ doesn't support where={where}.")

@overload
def __getitem__(self, where: Array | str | Sequence[str] | slice) -> Array:
...

@overload
def __getitem__(self, where: int) -> Scalar:
...

def __getitem__(
self, where: Array | str | Sequence[str] | int | slice
) -> Array | Scalar:
def __getitem__(self, where):
"""Select items from the collection.
Heavily under construction.
Expand Down Expand Up @@ -1369,9 +1356,7 @@ def head(self, nrow=10, compute=True):
By default this is then processed eagerly and returned.
"""
out: Array = self.partitions[0].map_partitions(
lambda x: x[:nrow], meta=self._meta
)
out = self.partitions[0].map_partitions(lambda x: x[:nrow], meta=self._meta)
if compute:
return out.compute()
if self.known_divisions:
Expand Down Expand Up @@ -1727,16 +1712,13 @@ def map_partitions(
)


PartialReductionType = ak.Array


def _chunk_reducer_non_positional(
chunk: ak.Array | PartialReductionType,
chunk: ak.Array,
is_axis_none: bool,
*,
reducer: Callable,
mask_identity: bool,
) -> PartialReductionType:
) -> ak.Array:
return reducer(
chunk,
keepdims=True,
Expand All @@ -1746,14 +1728,14 @@ def _chunk_reducer_non_positional(


def _concat_reducer_non_positional(
partials: list[PartialReductionType], is_axis_none: bool
partials: list[ak.Array], is_axis_none: bool
) -> ak.Array:
concat_axis = -1 if is_axis_none else 0
return ak.concatenate(partials, axis=concat_axis)


def _finalise_reducer_non_positional(
partial: PartialReductionType,
partial: ak.Array,
is_axis_none: bool,
*,
reducer: Callable,
Expand All @@ -1771,7 +1753,7 @@ def _finalise_reducer_non_positional(
def _prepare_axis_none_chunk(chunk: ak.Array) -> ak.Array:
# TODO: this is private Awkward code. We should figure out how to export it
# if needed
(layout,) = ak._do.remove_structure(
(layout,) = ak_do_remove_structure(
ak.to_layout(chunk),
flatten_records=False,
drop_nones=False,
Expand All @@ -1785,7 +1767,7 @@ def non_trivial_reduction(
*,
label: str,
array: Array,
axis: Literal[0] | None,
axis: int | None,
is_positional: bool,
keepdims: bool,
mask_identity: bool,
Expand All @@ -1794,7 +1776,7 @@ def non_trivial_reduction(
token: str | None = None,
dtype: Any | None = None,
split_every: int | bool | None = None,
):
) -> Array | Scalar:
if is_positional:
raise NotImplementedError("positional reducers at axis=0 or axis=None")

Expand All @@ -1807,8 +1789,9 @@ def non_trivial_reduction(
if combiner is None:
combiner = reducer

if is_positional:
assert combiner is reducer
# is_positional == True is not implemented
# if is_positional:
# assert combiner is reducer

# For `axis=None`, we prepare each array to have the following structure:
# [[[ ... [x1 x2 x3 ... xN] ... ]]] (length-1 outer lists)
Expand Down
9 changes: 7 additions & 2 deletions src/dask_awkward/lib/inspect.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,11 @@ def necessary_columns(*args: Any, traverse: bool = True) -> dict[str, list[str]]
return out


def sample(arr, factor: int | None = None, probability: float | None = None) -> Array:
def sample(
arr: Array,
factor: int | None = None,
probability: float | None = None,
) -> Array:
"""Decimate the data to a smaller number of rows.
Must give either `factor` or `probability`.
Expand All @@ -111,5 +115,6 @@ def sample(arr, factor: int | None = None, probability: float | None = None) ->
return arr.map_partitions(lambda x: x[::factor], meta=arr._meta)
else:
return arr.map_partitions(
lambda x: x[np.random.random(len(x)) < probability], meta=arr._meta
lambda x: x[np.random.random(len(x)) < probability], # type: ignore
meta=arr._meta,
)
45 changes: 20 additions & 25 deletions src/dask_awkward/lib/io/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,10 @@ def from_awkward(
"""
nrows = len(source)
if nrows == 0:
locs = [None, None]
locs: tuple[None, ...] | tuple[int, ...] = (None, None)
else:
chunksize = int(math.ceil(nrows / npartitions))
locs = list(range(0, nrows, chunksize)) + [nrows]
locs = tuple(list(range(0, nrows, chunksize)) + [nrows])
starts = locs[:-1]
stops = locs[1:]
meta = typetracer_array(source)
Expand All @@ -106,7 +106,7 @@ def from_awkward(
stops,
label=label or "from-awkward",
token=tokenize(source, npartitions),
divisions=tuple(locs),
divisions=locs,
meta=meta,
behavior=behavior,
)
Expand All @@ -116,11 +116,11 @@ class _FromListsFn:
def __init__(self, behavior: dict | None = None):
self.behavior = behavior

def __call__(self, x, **kwargs):
def __call__(self, x: list) -> ak.Array:
return ak.Array(x, behavior=self.behavior)


def from_lists(source: list[list[Any]], behavior: dict | None = None) -> Array:
def from_lists(source: list, behavior: dict | None = None) -> Array:
"""Create an Array collection from a list of lists.
Parameters
Expand Down Expand Up @@ -149,7 +149,7 @@ def from_lists(source: list[list[Any]], behavior: dict | None = None) -> Array:
lists = list(source)
divs = (0, *np.cumsum(list(map(len, lists))))
return from_map(
_FromListsFn(),
_FromListsFn(behavior=behavior),
lists,
meta=typetracer_array(ak.Array(lists[0])),
divisions=divs,
Expand Down Expand Up @@ -383,7 +383,7 @@ def from_dask_array(array: DaskArray, behavior: dict | None = None) -> Array:


def to_dataframe(
array,
array: Array,
optimize_graph: bool = True,
**kwargs: Any,
) -> DaskDataFrame:
Expand Down Expand Up @@ -463,7 +463,7 @@ def from_map(
args: tuple[Any, ...] | None = None,
label: str | None = None,
token: str | None = None,
divisions: tuple[int, ...] | None = None,
divisions: tuple[int, ...] | tuple[None, ...] | None = None,
meta: ak.Array | None = None,
behavior: dict | None = None,
**kwargs: Any,
Expand Down Expand Up @@ -603,7 +603,7 @@ def _bytes_with_sample(
compression: str | None,
delimiter: bytes,
not_zero: bool,
blocksize: str | int,
blocksize: str | int | None,
sample: str | int | bool,
) -> tuple[list[list[_BytesReadingInstructions]], bytes]:
"""Generate instructions for reading bytes from paths in a filesystem.
Expand Down Expand Up @@ -653,7 +653,7 @@ def _bytes_with_sample(

if blocksize is None:
offsets = [[0]] * len(paths)
lengths = [[None]] * len(paths)
lengths: list = [[None]] * len(paths)
else:
offsets = []
lengths = []
Expand Down Expand Up @@ -717,21 +717,16 @@ def _bytes_with_sample(
sample_size = parse_bytes(sample) if isinstance(sample, str) else sample
with fs.open(paths[0], compression=compression) as f:
# read block without seek (because we start at zero)
if delimiter is None:
sample_bytes = f.read(sample_size)
else:
sample_buff = f.read(sample_size)
while True:
new = f.read(sample_size)
if not new:
break
if delimiter in new:
sample_buff = (
sample_buff + new.split(delimiter, 1)[0] + delimiter
)
break
sample_buff = sample_buff + new
sample_bytes = sample_buff
sample_buff = f.read(sample_size)
while True:
new = f.read(sample_size)
if not new:
break
if delimiter in new:
sample_buff = sample_buff + new.split(delimiter, 1)[0] + delimiter
break
sample_buff = sample_buff + new
sample_bytes = sample_buff

rfind = sample_bytes.rfind(delimiter)
if rfind > 0:
Expand Down
Loading

0 comments on commit ffa20fb

Please sign in to comment.