Skip to content

Commit

Permalink
typing
Browse files Browse the repository at this point in the history
  • Loading branch information
douglasdavis committed Sep 25, 2023
1 parent f0f0f2a commit d525759
Show file tree
Hide file tree
Showing 8 changed files with 24 additions and 121 deletions.
4 changes: 4 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,10 @@ warn_unreachable = true
module = ["uproot.*"]
ignore_missing_imports = true

[[tool.mypy.overrides]]
module = ["cloudpickle.*"]
ignore_missing_imports = true

[tool.pyright]
include = ["src"]
pythonVersion = "3.9"
Expand Down
50 changes: 5 additions & 45 deletions src/dask_awkward/lib/io/json.py
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,7 @@ def from_json(
resize: float = 8,
highlevel: bool = True,
behavior: dict | None = None,
blocksize: str | None = None,
blocksize: int | str | None = None,
delimiter: bytes | None = None,
compression: str | None = "infer",
storage_options: dict[str, Any] | None = None,
Expand Down Expand Up @@ -526,10 +526,10 @@ def from_json(
dask-awkward.
behavior : dict, optional
See :func:`ak.from_json`
blocksize : str, optional
blocksize : int, str, optional
If ``None`` (default), the collection will be partitioned on a
per-file bases. If defined, this sets the size (in bytes) of
each partition.
each partition. Can be a string of the form ``"10 MiB"``.
delimiter : bytes, optional
Delimiter to use for separating blocks; if ``blocksize`` is
defined but this argument is not defined, the default is the
Expand Down Expand Up @@ -701,50 +701,10 @@ def __call__(self, array: ak.Array, block_index: tuple[int]) -> None:
return None


@overload
def to_json(
array: Array,
path: str,
line_delimited: bool | str = True,
num_indent_spaces: int | None = None,
num_readability_spaces: int = 0,
nan_string: str | None = None,
posinf_string: str | None = None,
neginf_string: str | None = None,
complex_record_fields: tuple[str, str] | None = None,
convert_bytes: Callable | None = None,
convert_other: Callable | None = None,
storage_options: dict[str, Any] | None = None,
compression: str | None = None,
compute: Literal[True] = True,
) -> None:
...


@overload
def to_json(
array: Array,
path: str,
line_delimited: bool | str,
num_indent_spaces: int | None,
num_readability_spaces: int,
nan_string: str | None,
posinf_string: str | None,
neginf_string: str | None,
complex_record_fields: tuple[str, str] | None,
convert_bytes: Callable | None,
convert_other: Callable | None,
storage_options: dict[str, Any] | None,
compression: str | None,
compute: Literal[False],
) -> Scalar:
...


def to_json(
array: Array,
path: str,
line_delimited: bool | str = True,
line_delimited: bool = True,
num_indent_spaces: int | None = None,
num_readability_spaces: int = 0,
nan_string: str | None = None,
Expand All @@ -767,7 +727,7 @@ def to_json(
Root directory to save data; interpreted by filesystem-spec
(can be a remote filesystem path, for example an s3 bucket:
``"s3://bucket/data"``).
line_delimited : bool | str
line_delimited : bool
See docstring for :py:func:`ak.to_json`.
num_indent_spaces : int, optional
See docstring for :py:func:`ak.to_json`.
Expand Down
68 changes: 1 addition & 67 deletions src/dask_awkward/lib/io/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import math
import operator
from collections.abc import Sequence
from typing import Any, Literal, overload
from typing import Any, Literal

import awkward as ak
import awkward.operations.ak_from_parquet as ak_from_parquet
Expand Down Expand Up @@ -463,72 +463,6 @@ def __call__(self, data, block_index):
)


@overload
def to_parquet(
array: Array,
destination: str,
list_to32: bool = False,
string_to32: bool = True,
bytestring_to32: bool = True,
emptyarray_to: Any | None = None,
categorical_as_dictionary: bool = False,
extensionarray: bool = False,
count_nulls: bool = True,
compression: str | dict | None = "zstd",
compression_level: int | dict | None = None,
row_group_size: int | None = 64 * 1024 * 1024,
data_page_size: int | None = None,
parquet_flavor: Literal["spark"] | None = None,
parquet_version: Literal["1.0"] | Literal["2.4"] | Literal["2.6"] = "2.4",
parquet_page_version: Literal["1.0"] | Literal["2.0"] = "1.0",
parquet_metadata_statistics: bool | dict = True,
parquet_dictionary_encoding: bool | dict = False,
parquet_byte_stream_split: bool | dict = False,
parquet_coerce_timestamps: Literal["ms"] | Literal["us"] | None = None,
parquet_old_int96_timestamps: bool | None = None,
parquet_compliant_nested: bool = False,
parquet_extra_options: dict | None = None,
storage_options: dict[str, Any] | None = None,
write_metadata: bool = False,
compute: Literal[True] = True,
prefix: str | None = None,
) -> None:
...


@overload
def to_parquet(
array: Array,
destination: str,
list_to32: bool,
string_to32: bool,
bytestring_to32: bool,
emptyarray_to: Any | None,
categorical_as_dictionary: bool,
extensionarray: bool,
count_nulls: bool,
compression: str | dict | None,
compression_level: int | dict | None,
row_group_size: int | None,
data_page_size: int | None,
parquet_flavor: Literal["spark"] | None,
parquet_version: Literal["1.0"] | Literal["2.4"] | Literal["2.6"],
parquet_page_version: Literal["1.0"] | Literal["2.0"],
parquet_metadata_statistics: bool | dict,
parquet_dictionary_encoding: bool | dict,
parquet_byte_stream_split: bool | dict,
parquet_coerce_timestamps: Literal["ms"] | Literal["us"] | None,
parquet_old_int96_timestamps: bool | None,
parquet_compliant_nested: bool,
parquet_extra_options: dict | None,
storage_options: dict[str, Any] | None,
write_metadata: bool,
compute: Literal[False],
prefix: str | None,
) -> Scalar:
...


def to_parquet(
array: Array,
destination: str,
Expand Down
2 changes: 1 addition & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ def daa_old(ndjson_points1: str) -> dak.Array:
@pytest.fixture(scope="session")
def pq_points_dir(daa_old: dak.Array, tmp_path_factory: pytest.TempPathFactory) -> str:
pqdir = tmp_path_factory.mktemp("pqfiles")
dak.to_parquet(daa_old, str(pqdir), compute=True)
dak.to_parquet(daa_old, str(pqdir))
return str(pqdir)


Expand Down
14 changes: 9 additions & 5 deletions tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ def test_partitions_divisions(ndjson_points_file: str) -> None:
assert not t1.known_divisions
t2 = daa.partitions[1]
assert t2.known_divisions
assert t2.divisions == (0, divs[2] - divs[1])
assert t2.divisions == (0, divs[2] - divs[1]) # type: ignore


def test_array_rebuild(ndjson_points_file: str) -> None:
Expand Down Expand Up @@ -178,7 +178,7 @@ def test_typestr(daa: Array) -> None:
assert len(daa._typestr(max=20)) == 20 + extras


def test_head(daa: Array):
def test_head(daa: Array) -> None:
out = daa.head(1)
assert out.tolist() == daa.compute()[:1].tolist()

Expand Down Expand Up @@ -233,7 +233,7 @@ def test_scalar_getitem_getattr() -> None:
slice(None, None, 3),
],
)
def test_getitem_zero_slice_single(daa: Array, where):
def test_getitem_zero_slice_single(daa: Array, where: slice) -> None:
out = daa[where]
assert out.compute().tolist() == daa.compute()[where].tolist()
assert len(out) == len(daa.compute()[where])
Expand All @@ -257,7 +257,11 @@ def test_getitem_zero_slice_single(daa: Array, where):
],
)
@pytest.mark.parametrize("rest", [slice(None, None, None), slice(0, 1)])
def test_getitem_zero_slice_tuple(daa: Array, where, rest):
def test_getitem_zero_slice_tuple(
daa: Array,
where: slice,
rest: slice,
) -> None:
out = daa[where, rest]
assert out.compute().tolist() == daa.compute()[where, rest].tolist()
assert len(out) == len(daa.compute()[where, rest])
Expand Down Expand Up @@ -476,7 +480,7 @@ def test_compatible_partitions_after_slice() -> None:
assert_eq(lazy, ccrt)

# sanity
assert dak.compatible_partitions(lazy, lazy + 2)
assert dak.compatible_partitions(lazy, lazy + 2) # type: ignore
assert dak.compatible_partitions(lazy, dak.num(lazy, axis=1) > 2)

assert not dak.compatible_partitions(lazy[:-2], lazy)
Expand Down
2 changes: 1 addition & 1 deletion tests/test_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ def test_to_dataframe_str(
assert_eq(dd, df, check_index=False)


def test_from_awkward_empty_array(daa) -> None:
def test_from_awkward_empty_array(daa: dak.Array) -> None:
# no form
c1 = ak.Array([])
assert len(c1) == 0
Expand Down
3 changes: 2 additions & 1 deletion tests/test_io_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ def test_to_and_from_json(

p1 = os.path.join(tdir, "z", "z")

dak.to_json(array=daa, path=p1, compute=True)
dak.to_json(daa, p1)
paths = list((Path(tdir) / "z" / "z").glob("part*.json"))
assert len(paths) == daa.npartitions
arrays = ak.concatenate([ak.from_json(p, line_delimited=True) for p in paths])
Expand All @@ -205,6 +205,7 @@ def test_to_and_from_json(
compression=compression,
compute=False,
)
assert isinstance(s, dak.Scalar)
s.compute()
suffix = "gz" if compression == "gzip" else compression
r = dak.from_json(os.path.join(tdir, f"*.json.{suffix}"))
Expand Down
2 changes: 1 addition & 1 deletion tests/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ def test_to_parquet_with_prefix(
tmp_path: pathlib.Path,
prefix: str | None,
) -> None:
dak.to_parquet(daa, str(tmp_path), prefix=prefix, compute=True)
dak.to_parquet(daa, str(tmp_path), prefix=prefix)
files = list(tmp_path.glob("*"))
for ifile in files:
fname = ifile.parts[-1]
Expand Down

0 comments on commit d525759

Please sign in to comment.