From 7b006945f91f786ae4e493237324ae0952df5e35 Mon Sep 17 00:00:00 2001 From: Doug Davis Date: Mon, 31 Oct 2022 12:11:16 -0500 Subject: [PATCH 01/21] schema <--> form/layout --- src/dask_awkward/lib/io/json.py | 98 +++++++++++++++++++++++++++++++++ 1 file changed, 98 insertions(+) diff --git a/src/dask_awkward/lib/io/json.py b/src/dask_awkward/lib/io/json.py index 88b84013..e856dbe8 100644 --- a/src/dask_awkward/lib/io/json.py +++ b/src/dask_awkward/lib/io/json.py @@ -478,3 +478,101 @@ def to_json( if compute: res.compute() return res + + +def layout_to_jsonschema(layout, input=None): + """Convert awkward array Layout to a JSON Schema dictionary.""" + if input is None: + input = {"type": "object", "properties": {}} + if layout.is_RecordType: + input["type"] = "object" + input["properties"] = {} + for field in layout.fields: + input["properties"][field] = {"type": None} + layout_to_jsonschema(layout[field], input["properties"][field]) + elif layout.is_ListType: + input["type"] = "array" + input["items"] = {} + layout_to_jsonschema(layout.content, input["items"]) + elif layout.dtype.kind == "i": + input["type"] = "integer" + elif layout.dtype.kind == "f": + input["type"] = "number" + elif layout.dtype.kind.lower() in "uso": + input["type"] = "string" + return input + + +def form_to_jsonschema(form, input=None): + """Convert awkward array Layout to a JSON Schema dictionary.""" + if input is None: + input = {"type": "object", "properties": {}} + if form.is_RecordType: + input["type"] = "object" + input["properties"] = {} + for field, content in zip(form.fields, form.contents): + input["properties"][field] = {"type": None} + form_to_jsonschema(content, input["properties"][field]) + elif form.parameters.get("__array__") == "string": + input["type"] = "string" + elif form.is_ListType: + input["type"] = "array" + input["items"] = {} + form_to_jsonschema(form.content, input["items"]) + elif "int" in form.type.primitive: + input["type"] = "integer" + elif "float" in form.type.primitive: + input["type"] = "number" + elif hasattr(form, "content"): + form_to_jsonschema(form.content, input) + else: + raise ValueError + return input + + +def form_to_dict(form): + """Convert awkward array Layout to a JSON Schema dictionary.""" + if form.is_RecordType: + out = {} + for field, content in zip(form.fields, form.contents): + out[field] = form_to_dict(content) + return out + elif form.parameters.get("__array__") == "string": + return "string" + elif form.is_ListType: + return [form_to_dict(form.content)] + elif hasattr(form, "content"): + return form_to_dict(form.content) + else: + return form.type.primitive + + +def ak_schema_repr(arr): + import yaml + + return yaml.dump(arr.layout.form) + + +def _read_beginning_compressed( + storage: AbstractFileSystem, + source: str, + compression: str | None, + n_lines: int = 5, +) -> ak.Array: + lines = [] + with storage.open(source, mode="rt", compression=compression) as f: + for i, line in enumerate(f): + if i >= n_lines: + break + lines.append(ak.from_json(line)) + return ak.from_iter(lines) + + +def _read_beginning_uncompressed( + storage: AbstractFileSystem, + source: str, + numbytes: int = 16384, +) -> ak.Array: + bytes = storage.cat(source, start=0, end=numbytes) + array = ak.concatenate([ak.from_json(line) for line in bytes.split(b"\n")[:-1]]) + return array From df3db182b0971beae8f190eb1ed6b299ace7f257 Mon Sep 17 00:00:00 2001 From: Doug Davis Date: Wed, 8 Mar 2023 12:58:22 -0600 Subject: [PATCH 02/21] json IO dev --- src/dask_awkward/lib/io/json.py | 59 +++++++++++++++++++-------------- tests/test_io.py | 27 ++++----------- 2 files changed, 41 insertions(+), 45 deletions(-) diff --git a/src/dask_awkward/lib/io/json.py b/src/dask_awkward/lib/io/json.py index e856dbe8..c00b2781 100644 --- a/src/dask_awkward/lib/io/json.py +++ b/src/dask_awkward/lib/io/json.py @@ -405,41 +405,28 @@ def __init__( path: str, npartitions: int, compression: str | None, - line_delimited: bool, **kwargs: Any, ) -> None: self.fs = fs self.path = path - self.just_dir = ".json" not in self.path - if self.just_dir: - if not self.fs.exists(path): - self.fs.mkdir(path) - self.wildcarded = "*" in self.path + if not self.fs.exists(path): + self.fs.mkdir(path) self.zfill = math.ceil(math.log(npartitions, 10)) self.kwargs = kwargs self.compression = compression if self.compression == "infer": self.compression = infer_compression(self.path) - self.line_delimited = line_delimited def __call__(self, array: ak.Array, block_index: tuple[int]) -> None: part = str(block_index[0]).zfill(self.zfill) + filename = f"part{part}.json" + if self.compression is not None and self.compression != "infer": + compression = "gz" if self.compression == "gzip" else self.compression + filename = f"{filename}.{compression}" - if self.just_dir: - path = os.path.join(self.path, f"part{part}.json") - elif self.wildcarded: - path = self.path.replace("*", part) - else: - raise RuntimeError("Cannot construct output file path.") # pragma: no cover - - try: - with self.fs.open(path, mode="wt", compression=self.compression) as f: - ak.to_json(array, f, line_delimited=self.line_delimited, **self.kwargs) - except FileNotFoundError: - raise FileNotFoundError( - f"Parent directory for output file ({path}) " - "is not available, create it." - ) + thispath = self.fs.sep.join([self.path, filename]) + with self.fs.open(thispath, mode="wt", compression=self.compression) as f: + ak.to_json(array, f, line_delimited=True, **self.kwargs) return None @@ -447,12 +434,35 @@ def __call__(self, array: ak.Array, block_index: tuple[int]) -> None: def to_json( array: Array, path: str, - line_delimited: bool = True, storage_options: dict[str, Any] | None = None, compute: bool = False, - compression: str | None = "infer", + compression: str | None = None, **kwargs: Any, ) -> Scalar: + """Write data to line delimited JSON. + + Parameters + ---------- + array : Array + dask-awkward Array collection to write to disk. + path : str + Root directory of location to write to. + storage_options : dict[str, Any], optional + filesystem-spec storage options. + compute : bool + If ``True`` immediately compute this collection. + compression : str, optional + filesystem-spec compression algorithm to use. + **kwargs : Any + Additional arguments passed to ``ak.to_json`` + + Returns + ------- + None or dask_awkward.Scalar + Scalar which provides a lazy compute if `compute` is + ``False``, or ``None`` if `compute` is ``True``. + + """ storage_options = storage_options or {} fs, _ = url_to_fs(path, **storage_options) nparts = array.npartitions @@ -462,7 +472,6 @@ def to_json( path, npartitions=nparts, compression=compression, - line_delimited=line_delimited, **kwargs, ), array, diff --git a/tests/test_io.py b/tests/test_io.py index ba307a34..305dd4f3 100644 --- a/tests/test_io.py +++ b/tests/test_io.py @@ -320,12 +320,13 @@ def test_to_bag(daa, caa): assert comprec.tolist() == entry.tolist() -def test_to_json(daa, tmpdir_factory): +@pytest.mark.parametrize("compression", ["xz", "gzip", "zip"]) +def test_to_json(daa, tmpdir_factory, compression): tdir = str(tmpdir_factory.mktemp("json_temp")) p1 = os.path.join(tdir, "z", "z") - dak.to_json(daa, p1, compute=True, line_delimited=True) + dak.to_json(daa, p1, compute=True) paths = list((Path(tdir) / "z" / "z").glob("part*.json")) assert len(paths) == daa.npartitions arrays = ak.concatenate([ak.from_json(p, line_delimited=True) for p in paths]) @@ -336,25 +337,11 @@ def test_to_json(daa, tmpdir_factory): s = dak.to_json( daa, - os.path.join(tdir, "file-*.json.gz"), + tdir, + compression=compression, compute=False, - line_delimited=True, ) s.compute() - r = dak.from_json(os.path.join(tdir, "*.json.gz")) + suffix = "gz" if compression == "gzip" else compression + r = dak.from_json(os.path.join(tdir, f"*.json.{suffix}")) assert_eq(x, r) - - -def test_to_json_raise_filenotfound( - daa: dak.Array, - tmpdir_factory: pytest.TempdirFactory, -) -> None: - p = tmpdir_factory.mktemp("onelevel") - p2 = os.path.join(str(p), "two") - with pytest.raises(FileNotFoundError, match="Parent directory for output file"): - dak.to_json( - daa, - os.path.join(p2, "three", "four", "*.json"), - compute=True, - line_delimited=True, - ) From d2b7a678d2255250540fa7ec386e6b5d6db1a9d4 Mon Sep 17 00:00:00 2001 From: Doug Davis Date: Wed, 8 Mar 2023 13:03:21 -0600 Subject: [PATCH 03/21] lint --- src/dask_awkward/lib/io/json.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/dask_awkward/lib/io/json.py b/src/dask_awkward/lib/io/json.py index c00b2781..ef89d243 100644 --- a/src/dask_awkward/lib/io/json.py +++ b/src/dask_awkward/lib/io/json.py @@ -2,7 +2,6 @@ import abc import math -import os import warnings from typing import TYPE_CHECKING, Any From 4e83fd21069205cd9b6992703e7664f949282e9a Mon Sep 17 00:00:00 2001 From: Doug Davis Date: Thu, 13 Jul 2023 14:27:49 -0500 Subject: [PATCH 04/21] update w.r.t. upstream awkward changes --- src/dask_awkward/lib/io/json.py | 90 ++++++++++++++++----------------- 1 file changed, 43 insertions(+), 47 deletions(-) diff --git a/src/dask_awkward/lib/io/json.py b/src/dask_awkward/lib/io/json.py index e203549e..1a6df2ce 100644 --- a/src/dask_awkward/lib/io/json.py +++ b/src/dask_awkward/lib/io/json.py @@ -29,11 +29,11 @@ from dask_awkward.lib.io.io import from_map if TYPE_CHECKING: + from awkward.contents.content import Content from fsspec.spec import AbstractFileSystem from dask_awkward.lib.core import Array, Scalar - __all__ = ("from_json", "to_json") @@ -264,6 +264,24 @@ def _from_json_bytes( return new_array_object(hlg, name, meta=meta, behavior=behavior, npartitions=n) +def from_json2( + source, + *, + line_delimited=False, + schema=None, + nan_string=None, + posinf_string=None, + neginf_string=None, + complex_record_fields=None, + buffersize=65536, + initial=1024, + resize=8, + highlevel=True, + behavior=None, +): + pass + + def from_json( urlpath: str | list[str], schema: dict | None = None, @@ -488,71 +506,49 @@ def to_json( return res -def layout_to_jsonschema(layout, input=None): +def layout_to_jsonschema( + layout: Content, + existing_schema: dict = None, + required: bool = False, +) -> dict: """Convert awkward array Layout to a JSON Schema dictionary.""" - if input is None: - input = {"type": "object", "properties": {}} - if layout.is_RecordType: - input["type"] = "object" - input["properties"] = {} + if existing_schema is None: + existing_schema = {"type": "object", "properties": {}} + if layout.is_record: + existing_schema["type"] = "object" + existing_schema["properties"] = {} + existing_schema["required"] = layout.fields for field in layout.fields: - input["properties"][field] = {"type": None} - layout_to_jsonschema(layout[field], input["properties"][field]) - elif layout.is_ListType: - input["type"] = "array" - input["items"] = {} - layout_to_jsonschema(layout.content, input["items"]) + existing_schema["properties"][field] = {"type": None} + layout_to_jsonschema(layout[field], existing_schema["properties"][field]) + elif layout.is_list: + existing_schema["type"] = "array" + existing_schema["items"] = {} + layout_to_jsonschema(layout.content, existing_schema["items"]) elif layout.dtype.kind == "i": - input["type"] = "integer" + existing_schema["type"] = "integer" elif layout.dtype.kind == "f": - input["type"] = "number" + existing_schema["type"] = "number" elif layout.dtype.kind.lower() in "uso": - input["type"] = "string" - return input - - -def form_to_jsonschema(form, input=None): - """Convert awkward array Layout to a JSON Schema dictionary.""" - if input is None: - input = {"type": "object", "properties": {}} - if form.is_RecordType: - input["type"] = "object" - input["properties"] = {} - for field, content in zip(form.fields, form.contents): - input["properties"][field] = {"type": None} - form_to_jsonschema(content, input["properties"][field]) - elif form.parameters.get("__array__") == "string": - input["type"] = "string" - elif form.is_ListType: - input["type"] = "array" - input["items"] = {} - form_to_jsonschema(form.content, input["items"]) - elif "int" in form.type.primitive: - input["type"] = "integer" - elif "float" in form.type.primitive: - input["type"] = "number" - elif hasattr(form, "content"): - form_to_jsonschema(form.content, input) - else: - raise ValueError - return input + existing_schema["type"] = "string" + return existing_schema def form_to_dict(form): """Convert awkward array Layout to a JSON Schema dictionary.""" - if form.is_RecordType: + if form.is_record: out = {} for field, content in zip(form.fields, form.contents): out[field] = form_to_dict(content) return out elif form.parameters.get("__array__") == "string": return "string" - elif form.is_ListType: + elif form.is_list: return [form_to_dict(form.content)] elif hasattr(form, "content"): return form_to_dict(form.content) else: - return form.type.primitive + return form.dtype def ak_schema_repr(arr): From bc686d2dc7487bbed6ddfaeca6b60815fee1fe1f Mon Sep 17 00:00:00 2001 From: Doug Davis Date: Mon, 17 Jul 2023 12:31:21 -0500 Subject: [PATCH 05/21] unpolished column projection works --- src/dask_awkward/lib/io/json.py | 87 +++++++++++++-------------------- tests/test_io.py | 17 +------ 2 files changed, 34 insertions(+), 70 deletions(-) diff --git a/src/dask_awkward/lib/io/json.py b/src/dask_awkward/lib/io/json.py index 1a6df2ce..958a0239 100644 --- a/src/dask_awkward/lib/io/json.py +++ b/src/dask_awkward/lib/io/json.py @@ -11,6 +11,7 @@ import json # type: ignore[no-redef] import awkward as ak +from awkward.forms.form import Form from dask.base import tokenize from dask.blockwise import BlockIndex from dask.bytes.core import read_bytes @@ -44,6 +45,8 @@ def __init__( storage: AbstractFileSystem, compression: str | None = None, schema: dict | None = None, + form: Form | None = None, + original_form: Form | None = None, **kwargs: Any, ) -> None: self.compression = compression @@ -51,6 +54,8 @@ def __init__( self.schema = schema self.args = args self.kwargs = kwargs + self.form = form + self.original_form = original_form @abc.abstractmethod def __call__(self, source: Any) -> ak.Array: @@ -64,6 +69,7 @@ def __init__( storage: AbstractFileSystem, compression: str | None = None, schema: dict | None = None, + form: Form | None = None, **kwargs: Any, ) -> None: super().__init__( @@ -71,6 +77,7 @@ def __init__( storage=storage, compression=compression, schema=schema, + form=form, **kwargs, ) @@ -78,16 +85,19 @@ def __call__(self, source: str) -> ak.Array: with self.storage.open(source, mode="rt", compression=self.compression) as f: return ak.from_json(f.read(), line_delimited=True, schema=self.schema) - def project_columns(self, columns): - schema = self.schema - - # TODO: do something with columns to redefine schema... + def project_columns(self, columns, original_form: Form | None = None): + if self.form is not None: + form = self.form.select_columns(columns) + schema = layout_to_jsonschema(form.length_zero_array(highlevel=False)) + return _FromJsonLineDelimitedFn( + schema=schema, + form=form, + storage=self.storage, + compression=self.compression, + original_form=original_form, + ) - return _FromJsonLineDelimitedFn( - schema=schema, - storage=self.storage, - compression=self.compression, - ) + return self class _FromJsonSingleObjInFileFn(_FromJsonFn): @@ -95,8 +105,10 @@ def __init__( self, *args: Any, storage: AbstractFileSystem, - schema: dict | None = None, compression: str | None = None, + schema: dict | None = None, + form: Form | None = None, + original_form: Form | None = None, **kwargs: Any, ) -> None: super().__init__( @@ -104,6 +116,8 @@ def __init__( storage=storage, compression=compression, schema=schema, + form=form, + original_form=original_form, **kwargs, ) @@ -113,8 +127,15 @@ def __call__(self, source: str) -> ak.Array: class _FromJsonBytesFn: - def __init__(self, schema: dict | None = None) -> None: + def __init__( + self, + schema: dict | None = None, + form: Form | None = None, + original_form: Form | None = None, + ) -> None: self.schema = schema + self.form = form + self.original_form = original_form def __call__(self, source: bytes) -> ak.Array: return ak.from_json(source, line_delimited=True, schema=self.schema) @@ -215,6 +236,7 @@ def _from_json_files( storage=fs, compression=compression, schema=schema, + form=meta.layout.form, ) return from_map( @@ -264,24 +286,6 @@ def _from_json_bytes( return new_array_object(hlg, name, meta=meta, behavior=behavior, npartitions=n) -def from_json2( - source, - *, - line_delimited=False, - schema=None, - nan_string=None, - posinf_string=None, - neginf_string=None, - complex_record_fields=None, - buffersize=65536, - initial=1024, - resize=8, - highlevel=True, - behavior=None, -): - pass - - def from_json( urlpath: str | list[str], schema: dict | None = None, @@ -555,28 +559,3 @@ def ak_schema_repr(arr): import yaml return yaml.dump(arr.layout.form) - - -def _read_beginning_compressed( - storage: AbstractFileSystem, - source: str, - compression: str | None, - n_lines: int = 5, -) -> ak.Array: - lines = [] - with storage.open(source, mode="rt", compression=compression) as f: - for i, line in enumerate(f): - if i >= n_lines: - break - lines.append(ak.from_json(line)) - return ak.from_iter(lines) - - -def _read_beginning_uncompressed( - storage: AbstractFileSystem, - source: str, - numbytes: int = 16384, -) -> ak.Array: - bytes = storage.cat(source, start=0, end=numbytes) - array = ak.concatenate([ak.from_json(line) for line in bytes.split(b"\n")[:-1]]) - return array diff --git a/tests/test_io.py b/tests/test_io.py index f08a4f12..dd912644 100644 --- a/tests/test_io.py +++ b/tests/test_io.py @@ -325,7 +325,7 @@ def test_to_bag(daa, caa): @pytest.mark.parametrize("compression", ["xz", "gzip", "zip"]) -def test_to_json(daa, tmpdir_factory, compression): +def test_to_and_from_json(daa, tmpdir_factory, compression): tdir = str(tmpdir_factory.mktemp("json_temp")) p1 = os.path.join(tdir, "z", "z") @@ -351,21 +351,6 @@ def test_to_json(daa, tmpdir_factory, compression): assert_eq(x, r) -def test_to_json_raise_filenotfound( - daa: dak.Array, - tmpdir_factory: pytest.TempdirFactory, -) -> None: - p = tmpdir_factory.mktemp("onelevel") - p2 = os.path.join(str(p), "two") - with pytest.raises(FileNotFoundError, match="Parent directory for output file"): - dak.to_json( - daa, - os.path.join(p2, "three", "four", "*.json"), - compute=True, - line_delimited=True, - ) - - @pytest.mark.parametrize("optimize_graph", [True, False]) def test_to_dataframe(daa: dak.Array, caa: ak.Array, optimize_graph: bool) -> None: pytest.importorskip("pandas") From 31802a9c0bd6e7cc027649ae4b530d377ff37235 Mon Sep 17 00:00:00 2001 From: Doug Davis Date: Wed, 19 Jul 2023 14:54:15 -0500 Subject: [PATCH 06/21] passing tests --- src/dask_awkward/lib/io/json.py | 38 ++++++++++++++++++++----------- src/dask_awkward/lib/testutils.py | 2 +- tests/test_core.py | 15 +----------- 3 files changed, 27 insertions(+), 28 deletions(-) diff --git a/src/dask_awkward/lib/io/json.py b/src/dask_awkward/lib/io/json.py index 958a0239..4fd372d1 100644 --- a/src/dask_awkward/lib/io/json.py +++ b/src/dask_awkward/lib/io/json.py @@ -89,6 +89,7 @@ def project_columns(self, columns, original_form: Form | None = None): if self.form is not None: form = self.form.select_columns(columns) schema = layout_to_jsonschema(form.length_zero_array(highlevel=False)) + return _FromJsonLineDelimitedFn( schema=schema, form=form, @@ -197,10 +198,10 @@ def derive_json_meta( def _from_json_files( *, urlpath: str | list[str], + meta: ak.Array, schema: dict | None = None, one_obj_per_file: bool = False, compression: str | None = "infer", - meta: ak.Array | None = None, behavior: dict | None = None, derive_meta_kwargs: dict[str, Any] | None = None, storage_options: dict[str, Any] | None = None, @@ -247,10 +248,10 @@ def _from_json_files( def _from_json_bytes( *, urlpath: str | list[str], + meta: ak.Array, schema: dict | None, blocksize: int | str, delimiter: Any, - meta: ak.Array | None, behavior: dict | None, storage_options: dict[str, Any] | None, ) -> Array: @@ -392,10 +393,10 @@ def from_json( if delimiter is None and blocksize is None: return _from_json_files( urlpath=urlpath, + meta=meta, schema=schema, one_obj_per_file=one_obj_per_file, compression=compression, - meta=meta, behavior=behavior, derive_meta_kwargs=derive_meta_kwargs, storage_options=storage_options, @@ -406,10 +407,10 @@ def from_json( elif delimiter is not None and blocksize is not None: return _from_json_bytes( urlpath=urlpath, + meta=meta, schema=schema, delimiter=delimiter, blocksize=blocksize, - meta=meta, behavior=behavior, storage_options=storage_options, ) @@ -512,29 +513,40 @@ def to_json( def layout_to_jsonschema( layout: Content, - existing_schema: dict = None, + existing_schema: dict | None = None, + title: str = "untitled", + description: str = "Auto generated by dask-awkward", required: bool = False, ) -> dict: """Convert awkward array Layout to a JSON Schema dictionary.""" if existing_schema is None: - existing_schema = {"type": "object", "properties": {}} + existing_schema = { + "title": title, + "description": description, + "type": "object", + "properties": {}, + } if layout.is_record: existing_schema["type"] = "object" existing_schema["properties"] = {} - existing_schema["required"] = layout.fields + if required: + existing_schema["required"] = layout.fields for field in layout.fields: existing_schema["properties"][field] = {"type": None} layout_to_jsonschema(layout[field], existing_schema["properties"][field]) + elif (layout.parameters or {}) == {"__array__": "string"}: + existing_schema["type"] = "string" elif layout.is_list: existing_schema["type"] = "array" existing_schema["items"] = {} layout_to_jsonschema(layout.content, existing_schema["items"]) - elif layout.dtype.kind == "i": - existing_schema["type"] = "integer" - elif layout.dtype.kind == "f": - existing_schema["type"] = "number" - elif layout.dtype.kind.lower() in "uso": - existing_schema["type"] = "string" + elif layout.is_option: + layout_to_jsonschema(layout.content, existing_schema) + elif layout.is_numpy: + if layout.dtype.kind == "i": + existing_schema["type"] = "integer" + elif layout.dtype.kind == "f": + existing_schema["type"] = "number" return existing_schema diff --git a/src/dask_awkward/lib/testutils.py b/src/dask_awkward/lib/testutils.py index e5a5961f..4904f80e 100644 --- a/src/dask_awkward/lib/testutils.py +++ b/src/dask_awkward/lib/testutils.py @@ -125,7 +125,7 @@ def make_xy_point() -> dict[str, int]: def make_xy_point_str() -> dict[str, str]: - return {"x": str(_RG.randint(0, 10)), "y": str(_RG.randint(0, 10))} + return {"x": str(_RG.randint(0, 10)) * 3, "y": str(_RG.randint(0, 10)) * 4} def list_of_xy_points(n: int) -> list[dict[str, int]]: diff --git a/tests/test_core.py b/tests/test_core.py index 5e4f295e..54c0203b 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -1,7 +1,7 @@ from __future__ import annotations from collections import namedtuple -from typing import TYPE_CHECKING, Any +from typing import TYPE_CHECKING import awkward as ak import dask.array as da @@ -108,13 +108,6 @@ def test_meta_exists(daa: Array) -> None: assert daa["points"]._meta is not None -def test_meta_raise(ndjson_points_file: str) -> None: - with pytest.raises( - TypeError, match="meta must be an instance of an Awkward Array." - ): - dak.from_json([ndjson_points_file], meta=5) - - def test_ndim(ndjson_points_file: str) -> None: daa = dak.from_json([ndjson_points_file] * 2) assert daa.ndim == daa.compute().ndim @@ -549,12 +542,6 @@ def test_partition_compat_with_strictness() -> None: ) -@pytest.mark.parametrize("meta", [5, False, [1, 2, 3]]) -def test_bad_meta_type(ndjson_points_file: str, meta: Any) -> None: - with pytest.raises(TypeError, match="meta must be an instance of an Awkward Array"): - dak.from_json([ndjson_points_file] * 3, meta=meta) - - def test_bad_meta_backend_array(daa): with pytest.raises(TypeError, match="meta Array must have a typetracer backend"): daa.points.x.map_partitions(lambda x: x**2, meta=ak.Array([])) From 7a97c02235fb02c426c6e3239811e50804597c88 Mon Sep 17 00:00:00 2001 From: Doug Davis Date: Tue, 1 Aug 2023 17:37:47 -0500 Subject: [PATCH 07/21] to_json correct function args; some typing --- src/dask_awkward/lib/io/json.py | 145 ++++++++++++++++++++++++++------ 1 file changed, 120 insertions(+), 25 deletions(-) diff --git a/src/dask_awkward/lib/io/json.py b/src/dask_awkward/lib/io/json.py index 4fd372d1..82394e88 100644 --- a/src/dask_awkward/lib/io/json.py +++ b/src/dask_awkward/lib/io/json.py @@ -3,7 +3,8 @@ import abc import math import warnings -from typing import TYPE_CHECKING, Any +from collections.abc import Callable, Sequence +from typing import TYPE_CHECKING, Any, Literal, overload try: import ujson as json @@ -85,7 +86,11 @@ def __call__(self, source: str) -> ak.Array: with self.storage.open(source, mode="rt", compression=self.compression) as f: return ak.from_json(f.read(), line_delimited=True, schema=self.schema) - def project_columns(self, columns, original_form: Form | None = None): + def project_columns( + self, + columns: Sequence[str], + original_form: Form | None = None, + ) -> _FromJsonLineDelimitedFn: if self.form is not None: form = self.form.select_columns(columns) schema = layout_to_jsonschema(form.length_zero_array(highlevel=False)) @@ -211,6 +216,10 @@ def _from_json_files( mode="rb", storage_options=storage_options, ) + + if compression == "infer": + compression = infer_compression(urlpaths[0]) + if meta is None: meta_read_kwargs = derive_meta_kwargs or {} meta = derive_json_meta( @@ -218,14 +227,12 @@ def _from_json_files( urlpaths[0], schema=schema, one_obj_per_file=one_obj_per_file, + compression=compression, **meta_read_kwargs, ) token = tokenize(fstoken, one_obj_per_file, compression, meta) - if compression == "infer": - compression = infer_compression(urlpaths[0]) - if one_obj_per_file: f: _FromJsonFn = _FromJsonSingleObjInFileFn( storage=fs, @@ -434,55 +441,134 @@ def __init__( if not self.fs.exists(path): self.fs.mkdir(path) self.zfill = math.ceil(math.log(npartitions, 10)) - self.kwargs = kwargs self.compression = compression if self.compression == "infer": self.compression = infer_compression(self.path) + self.kwargs = kwargs def __call__(self, array: ak.Array, block_index: tuple[int]) -> None: part = str(block_index[0]).zfill(self.zfill) filename = f"part{part}.json" if self.compression is not None and self.compression != "infer": - compression = "gz" if self.compression == "gzip" else self.compression - filename = f"{filename}.{compression}" + ext = self.compression + if ext == "gzip": + ext = "gz" + if ext == "zstd": + ext = "zst" + filename = f"{filename}.{ext}" thispath = self.fs.sep.join([self.path, filename]) with self.fs.open(thispath, mode="wt", compression=self.compression) as f: - ak.to_json(array, f, line_delimited=True, **self.kwargs) + ak.to_json(array, f, **self.kwargs) return None +@overload +def to_json( + array: Array, + path: str, + *, + line_delimited: bool | str, + num_indent_spaces: int | None, + num_readability_spaces: int, + nan_string: str | None, + posinf_string: str | None, + neginf_string: str | None, + complex_record_fields: tuple[str, str] | None, + convert_bytes: Callable | None, + convert_other: Callable | None, + storage_options: dict[str, Any] | None, + compression: str | None, + compute: Literal[False], +) -> Scalar: + ... + + +@overload def to_json( array: Array, path: str, + *, + line_delimited: bool | str, + num_indent_spaces: int | None, + num_readability_spaces: int, + nan_string: str | None, + posinf_string: str | None, + neginf_string: str | None, + complex_record_fields: tuple[str, str] | None, + convert_bytes: Callable | None, + convert_other: Callable | None, + storage_options: dict[str, Any] | None, + compression: str | None, + compute: Literal[True], +) -> None: + ... + + +def to_json( + array: Array, + path: str, + *, + line_delimited: bool | str = True, + num_indent_spaces: int | None = None, + num_readability_spaces: int = 0, + nan_string: str | None = None, + posinf_string: str | None = None, + neginf_string: str | None = None, + complex_record_fields: tuple[str, str] | None = None, + convert_bytes: Callable | None = None, + convert_other: Callable | None = None, storage_options: dict[str, Any] | None = None, - compute: bool = False, compression: str | None = None, - **kwargs: Any, -) -> Scalar: - """Write data to line delimited JSON. + compute: bool = False, +) -> Scalar | None: + """Store Array collection in JSON text. Parameters ---------- array : Array - dask-awkward Array collection to write to disk. + Collection to store in JSON format path : str - Root directory of location to write to. + Root directory to save data; interpreted by filesystem-spec + (can be a remote filesystem path, for example an s3 bucket: + ``"s3://bucket/data"``). + line_delimited : bool | str + See docstring for :py:func:`ak.to_json`. + num_indent_spaces : int, optional + See docstring for :py:func:`ak.to_json`. + num_readability_spaces : int + See docstring for :py:func:`ak.to_json`. + nan_string : str, optional + See docstring for :py:func:`ak.to_json`. + posinf_string : str, optional + See docstring for :py:func:`ak.to_json`. + neginf_string : str, optional + See docstring for :py:func:`ak.to_json`. + complex_record_fields : tuple[str, str], optional + See docstring for :py:func:`ak.to_json`. + convert_bytes : Callable, optional + See docstring for :py:func:`ak.to_json`. + convert_other : Callable, optional + See docstring for :py:func:`ak.to_json`. storage_options : dict[str, Any], optional - filesystem-spec storage options. - compute : bool - If ``True`` immediately compute this collection. + Options passed to ``fsspec``. compression : str, optional - filesystem-spec compression algorithm to use. - **kwargs : Any - Additional arguments passed to ``ak.to_json`` + Compress JSON data via ``fsspec`` + compute : bool + Immediately compute the collection. Returns ------- - None or dask_awkward.Scalar - Scalar which provides a lazy compute if `compute` is - ``False``, or ``None`` if `compute` is ``True``. + Scalar or None + Computable Scalar object if ``compute`` is ``False``, + otherwise returns ``None``. + + Examples + -------- + + >>> import dask_awkward as dak + >>> print("Hello, world!") """ storage_options = storage_options or {} @@ -494,7 +580,15 @@ def to_json( path, npartitions=nparts, compression=compression, - **kwargs, + line_delimited=line_delimited, + num_indent_spaces=num_indent_spaces, + num_readability_spaces=num_readability_spaces, + nan_string=nan_string, + posinf_string=posinf_string, + neginf_string=neginf_string, + complex_record_fields=complex_record_fields, + convert_bytes=convert_bytes, + convert_other=convert_other, ), array, BlockIndex((nparts,)), @@ -508,6 +602,7 @@ def to_json( res = new_scalar_object(graph, name=name, meta=None) if compute: res.compute() + return None return res From c8bc1ec9e044e1024dd1040c6a7a4a23adedd47f Mon Sep 17 00:00:00 2001 From: Doug Davis Date: Thu, 10 Aug 2023 14:26:43 -0500 Subject: [PATCH 08/21] more layout type supported --- src/dask_awkward/lib/_utils.py | 4 ++++ src/dask_awkward/lib/io/json.py | 18 +++++++++++++++++- 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/src/dask_awkward/lib/_utils.py b/src/dask_awkward/lib/_utils.py index 30d433f4..78050ba2 100644 --- a/src/dask_awkward/lib/_utils.py +++ b/src/dask_awkward/lib/_utils.py @@ -42,6 +42,10 @@ def set_form_keys(form: Form, *, key: str) -> Form: elif form.is_numpy: form.form_key = key + elif form.is_union: + for entry in form.contents: + set_form_keys(entry, key=key) + # Anything else grab the content and keep recursing else: set_form_keys(form.content, key=key) diff --git a/src/dask_awkward/lib/io/json.py b/src/dask_awkward/lib/io/json.py index 82394e88..f76f6da7 100644 --- a/src/dask_awkward/lib/io/json.py +++ b/src/dask_awkward/lib/io/json.py @@ -606,6 +606,11 @@ def to_json( return res +def array_param_is_string_or_bytestring(layout: Content) -> bool: + params = layout.parameters or {} + return params == {"__array__": "string"} or params == {"__array__": "bytestring"} + + def layout_to_jsonschema( layout: Content, existing_schema: dict | None = None, @@ -629,7 +634,10 @@ def layout_to_jsonschema( for field in layout.fields: existing_schema["properties"][field] = {"type": None} layout_to_jsonschema(layout[field], existing_schema["properties"][field]) - elif (layout.parameters or {}) == {"__array__": "string"}: + elif (layout.parameters or {}) == {"__array__": "categorical"}: + existing_schema["enum"] = layout.content.to_list() + existing_schema["type"] = layout_to_jsonschema(layout.content)["type"] + elif array_param_is_string_or_bytestring(layout): existing_schema["type"] = "string" elif layout.is_list: existing_schema["type"] = "array" @@ -642,6 +650,14 @@ def layout_to_jsonschema( existing_schema["type"] = "integer" elif layout.dtype.kind == "f": existing_schema["type"] = "number" + elif layout.is_indexed: + pass + elif layout.is_unknown: + existing_schema["type"] = "object" + elif layout.is_union: + existing_schema["type"] = [ + layout_to_jsonschema(content)["type"] for content in layout.contents + ] return existing_schema From d1dcc720ec2b9e6100d964e8b9a4682cb45f1946 Mon Sep 17 00:00:00 2001 From: Doug Davis Date: Tue, 15 Aug 2023 14:40:10 -0500 Subject: [PATCH 09/21] handle minItems maxItems for awkward's regular arrays --- src/dask_awkward/lib/io/json.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/dask_awkward/lib/io/json.py b/src/dask_awkward/lib/io/json.py index f76f6da7..5901c0b2 100644 --- a/src/dask_awkward/lib/io/json.py +++ b/src/dask_awkward/lib/io/json.py @@ -641,6 +641,9 @@ def layout_to_jsonschema( existing_schema["type"] = "string" elif layout.is_list: existing_schema["type"] = "array" + if layout.is_regular: + existing_schema["minItems"] = layout.size + existing_schema["maxItems"] = layout.size existing_schema["items"] = {} layout_to_jsonschema(layout.content, existing_schema["items"]) elif layout.is_option: From f2cb69c24605013369105c99ce7d6d5d3208852e Mon Sep 17 00:00:00 2001 From: Doug Davis Date: Thu, 17 Aug 2023 09:55:48 -0500 Subject: [PATCH 10/21] add tests (and move json specific tests) --- tests/test_io.py | 108 ---------------------- tests/test_io_json.py | 208 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 208 insertions(+), 108 deletions(-) create mode 100644 tests/test_io_json.py diff --git a/tests/test_io.py b/tests/test_io.py index 41696074..886dbe39 100644 --- a/tests/test_io.py +++ b/tests/test_io.py @@ -1,6 +1,5 @@ from __future__ import annotations -import os from pathlib import Path import awkward as ak @@ -10,90 +9,10 @@ from dask.delayed import delayed from numpy.typing import DTypeLike -try: - import ujson as json -except ImportError: - import json # type: ignore[no-redef] - import dask_awkward as dak from dask_awkward.lib.testutils import assert_eq -def test_force_by_lines_meta(ndjson_points_file: str) -> None: - daa1 = dak.from_json( - [ndjson_points_file] * 5, - derive_meta_kwargs={"force_by_lines": True}, - ) - daa2 = dak.from_json([ndjson_points_file] * 3) - assert daa1._meta is not None - assert daa2._meta is not None - f1 = daa1._meta.layout.form - f2 = daa2._meta.layout.form - assert f1 == f2 - - -def test_derive_json_meta_trigger_warning(ndjson_points_file: str) -> None: - with pytest.warns(UserWarning): - dak.from_json([ndjson_points_file], derive_meta_kwargs={"bytechunks": 64}) - - -def test_json_one_obj_per_file(single_record_file: str) -> None: - daa = dak.from_json( - [single_record_file] * 5, - one_obj_per_file=True, - ) - caa = ak.concatenate([ak.from_json(Path(single_record_file))] * 5) - assert_eq(daa, caa) - - -def test_json_delim_defined(ndjson_points_file: str) -> None: - source = [ndjson_points_file] * 6 - daa = dak.from_json(source, delimiter=b"\n") - - concretes = [] - for s in source: - with open(s) as f: - for line in f: - concretes.append(json.loads(line)) - caa = ak.from_iter(concretes) - assert_eq( - daa["points"][["x", "y"]], - caa["points"][["x", "y"]], - ) - - -def test_json_sample_rows_true(ndjson_points_file: str) -> None: - source = [ndjson_points_file] * 5 - - daa = dak.from_json( - source, - derive_meta_kwargs={"force_by_lines": True, "sample_rows": 2}, - ) - - concretes = [] - for s in source: - with open(s) as f: - for line in f: - concretes.append(json.loads(line)) - caa = ak.from_iter(concretes) - - assert_eq(daa, caa) - - -def test_json_bytes_no_delim_defined(ndjson_points_file: str) -> None: - source = [ndjson_points_file] * 7 - daa = dak.from_json(source, blocksize=650, delimiter=None) - - concretes = [] - for s in source: - with open(s) as f: - for line in f: - concretes.append(json.loads(line)) - - caa = ak.from_iter(concretes) - assert_eq(daa, caa) - - def test_to_and_from_dask_array(daa: dak.Array) -> None: daa = dak.from_awkward(daa.compute(), npartitions=3) @@ -324,33 +243,6 @@ def test_to_bag(daa, caa): assert comprec.tolist() == entry.tolist() -@pytest.mark.parametrize("compression", ["xz", "gzip", "zip"]) -def test_to_and_from_json(daa, tmpdir_factory, compression): - tdir = str(tmpdir_factory.mktemp("json_temp")) - - p1 = os.path.join(tdir, "z", "z") - - dak.to_json(daa, p1, compute=True) - paths = list((Path(tdir) / "z" / "z").glob("part*.json")) - assert len(paths) == daa.npartitions - arrays = ak.concatenate([ak.from_json(p, line_delimited=True) for p in paths]) - assert_eq(daa, arrays) - - x = dak.from_json(os.path.join(p1, "*.json")) - assert_eq(arrays, x) - - s = dak.to_json( - daa, - tdir, - compression=compression, - compute=False, - ) - s.compute() - suffix = "gz" if compression == "gzip" else compression - r = dak.from_json(os.path.join(tdir, f"*.json.{suffix}")) - assert_eq(x, r) - - @pytest.mark.parametrize("optimize_graph", [True, False]) def test_to_dataframe(daa: dak.Array, caa: ak.Array, optimize_graph: bool) -> None: pytest.importorskip("pandas") diff --git a/tests/test_io_json.py b/tests/test_io_json.py new file mode 100644 index 00000000..fc69d6c5 --- /dev/null +++ b/tests/test_io_json.py @@ -0,0 +1,208 @@ +from __future__ import annotations + +import os +from pathlib import Path + +import awkward as ak +import dask +import pytest + +import dask_awkward as dak +from dask_awkward.lib.core import Array +from dask_awkward.lib.optimize import optimize as dak_optimize +from dask_awkward.lib.testutils import assert_eq + +try: + import ujson as json +except ImportError: + import json # type: ignore[no-redef] + + +data1 = r"""{"name":"Bob","team":"tigers","goals":[0,0,0,1,2,0,1]} +{"name":"Alice","team":"bears","goals":[3,2,1,0,1]} +{"name":"Jack","team":"bears","goals":[0,0,0,0,0,0,0,0,1]} +{"name":"Jill","team":"bears","goals":[3,0,2]} +{"name":"Ted","team":"tigers","goals":[0,0,0,0,0]} +""" + +data2 = r"""{"name":"Ellen","team":"tigers","goals":[1,0,0,0,2,0,1]} +{"name":"Dan","team":"bears","goals":[0,0,3,1,0,2,0,0]} +{"name":"Brad","team":"bears","goals":[0,0,4,0,0,1]} +{"name":"Nancy","team":"tigers","goals":[0,0,1,1,1,1,0]} +{"name":"Lance","team":"bears","goals":[1,1,1,1,1]} +""" + +data3 = r"""{"name":"Sara","team":"tigers","goals":[0,1,0,2,0,3]} +{"name":"Ryan","team":"tigers","goals":[1,2,3,0,0,0,0]} +""" + + +@pytest.fixture(scope="session") +def json_data_dir(tmp_path_factory: pytest.TempPathFactory) -> Path: + toplevel = tmp_path_factory.mktemp("json_data") + for i, datum in enumerate((data1, data2, data3)): + with open(toplevel / f"file{i}.json", "w") as f: + print(datum, file=f) + return toplevel + + +@pytest.fixture(scope="session") +def concrete_data(json_data_dir: Path) -> ak.Array: + array = ak.concatenate( + [ + ak.from_json(json_data_dir / "file0.json", line_delimited=True), + ak.from_json(json_data_dir / "file1.json", line_delimited=True), + ak.from_json(json_data_dir / "file2.json", line_delimited=True), + ], + ) + + return array + + +def test_json_sanity(json_data_dir: Path, concrete_data: ak.Array) -> None: + ds = dak.from_json(str(json_data_dir) + "/*.json") + assert ds + + assert_eq(ds, concrete_data) + + +def input_layer_array_partition0(collection: Array) -> ak.Array: + """Get first partition concrete array after the input layer. + + Parameteters + ------------ + collection : dask_awkward.Array + dask-awkward Array collection of interest + + Returns + ------- + ak.Array + Concrete awkward array representing the first partition + immediately after the input layer. + + """ + with dask.config.set({"awkward.optimization.which": ["columns"]}): + optimized_hlg = dak_optimize(collection.dask, []) + layer_name = [ + name for name in list(optimized_hlg.layers) if name.startswith("from-json") + ][0] + sgc, arg = optimized_hlg[(layer_name, 0)] + array = sgc.dsk[layer_name][0](arg) + return array + + +def test_json_column_projection1(json_data_dir: Path) -> None: + ds = dak.from_json(str(json_data_dir) + "/*.json") + ds2 = ds[["name", "goals"]] + array = input_layer_array_partition0(ds2) + assert array.fields == ["name", "goals"] + + +def test_json_column_projection2(json_data_dir: Path) -> None: + ds = dak.from_json(str(json_data_dir) + "/*.json") + # grab name and goals but then only use goals! + ds2 = dak.max(ds[["name", "goals"]].goals, axis=1) + array = input_layer_array_partition0(ds2) + assert array.fields == ["goals"] + + +def test_json_force_by_lines_meta(ndjson_points_file: str) -> None: + daa1 = dak.from_json( + [ndjson_points_file] * 5, + derive_meta_kwargs={"force_by_lines": True}, + ) + daa2 = dak.from_json([ndjson_points_file] * 3) + assert daa1._meta is not None + assert daa2._meta is not None + f1 = daa1._meta.layout.form + f2 = daa2._meta.layout.form + assert f1 == f2 + + +def test_derive_json_meta_trigger_warning(ndjson_points_file: str) -> None: + with pytest.warns(UserWarning): + dak.from_json([ndjson_points_file], derive_meta_kwargs={"bytechunks": 64}) + + +def test_json_one_obj_per_file(single_record_file: str) -> None: + daa = dak.from_json( + [single_record_file] * 5, + one_obj_per_file=True, + ) + caa = ak.concatenate([ak.from_json(Path(single_record_file))] * 5) + assert_eq(daa, caa) + + +def test_json_delim_defined(ndjson_points_file: str) -> None: + source = [ndjson_points_file] * 6 + daa = dak.from_json(source, delimiter=b"\n") + + concretes = [] + for s in source: + with open(s) as f: + for line in f: + concretes.append(json.loads(line)) + caa = ak.from_iter(concretes) + assert_eq( + daa["points"][["x", "y"]], + caa["points"][["x", "y"]], + ) + + +def test_json_sample_rows_true(ndjson_points_file: str) -> None: + source = [ndjson_points_file] * 5 + + daa = dak.from_json( + source, + derive_meta_kwargs={"force_by_lines": True, "sample_rows": 2}, + ) + + concretes = [] + for s in source: + with open(s) as f: + for line in f: + concretes.append(json.loads(line)) + caa = ak.from_iter(concretes) + + assert_eq(daa, caa) + + +def test_json_bytes_no_delim_defined(ndjson_points_file: str) -> None: + source = [ndjson_points_file] * 7 + daa = dak.from_json(source, blocksize=650, delimiter=None) + + concretes = [] + for s in source: + with open(s) as f: + for line in f: + concretes.append(json.loads(line)) + + caa = ak.from_iter(concretes) + assert_eq(daa, caa) + + +@pytest.mark.parametrize("compression", ["xz", "gzip", "zip"]) +def test_to_and_from_json(daa, tmpdir_factory, compression): + tdir = str(tmpdir_factory.mktemp("json_temp")) + + p1 = os.path.join(tdir, "z", "z") + + dak.to_json(daa, p1, compute=True) + paths = list((Path(tdir) / "z" / "z").glob("part*.json")) + assert len(paths) == daa.npartitions + arrays = ak.concatenate([ak.from_json(p, line_delimited=True) for p in paths]) + assert_eq(daa, arrays) + + x = dak.from_json(os.path.join(p1, "*.json")) + assert_eq(arrays, x) + + s = dak.to_json( + daa, + tdir, + compression=compression, + compute=False, + ) + s.compute() + suffix = "gz" if compression == "gzip" else compression + r = dak.from_json(os.path.join(tdir, f"*.json.{suffix}")) + assert_eq(x, r) From bd65e0009b41bd28981f515e4bdd3fe5e53532c9 Mon Sep 17 00:00:00 2001 From: Doug Davis Date: Thu, 17 Aug 2023 09:58:39 -0500 Subject: [PATCH 11/21] generic paths --- tests/test_io_json.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/tests/test_io_json.py b/tests/test_io_json.py index fc69d6c5..0b9140b2 100644 --- a/tests/test_io_json.py +++ b/tests/test_io_json.py @@ -60,7 +60,8 @@ def concrete_data(json_data_dir: Path) -> ak.Array: def test_json_sanity(json_data_dir: Path, concrete_data: ak.Array) -> None: - ds = dak.from_json(str(json_data_dir) + "/*.json") + source = os.path.join(str(json_data_dir), "*.json") + ds = dak.from_json(source) assert ds assert_eq(ds, concrete_data) @@ -92,14 +93,16 @@ def input_layer_array_partition0(collection: Array) -> ak.Array: def test_json_column_projection1(json_data_dir: Path) -> None: - ds = dak.from_json(str(json_data_dir) + "/*.json") + source = os.path.join(str(json_data_dir), "*.json") + ds = dak.from_json(source) ds2 = ds[["name", "goals"]] array = input_layer_array_partition0(ds2) assert array.fields == ["name", "goals"] def test_json_column_projection2(json_data_dir: Path) -> None: - ds = dak.from_json(str(json_data_dir) + "/*.json") + source = os.path.join(str(json_data_dir), "*.json") + ds = dak.from_json(source) # grab name and goals but then only use goals! ds2 = dak.max(ds[["name", "goals"]].goals, axis=1) array = input_layer_array_partition0(ds2) From 5a078923fa831168d99b94efd5cb680a17d0b0e4 Mon Sep 17 00:00:00 2001 From: Doug Davis Date: Thu, 17 Aug 2023 10:20:53 -0500 Subject: [PATCH 12/21] typing --- src/dask_awkward/lib/core.py | 12 ++++++++- src/dask_awkward/lib/io/json.py | 48 ++++++++++++++++----------------- tests/test_io_json.py | 19 +++++++------ 3 files changed, 46 insertions(+), 33 deletions(-) diff --git a/src/dask_awkward/lib/core.py b/src/dask_awkward/lib/core.py index c7216436..b112fb6d 100644 --- a/src/dask_awkward/lib/core.py +++ b/src/dask_awkward/lib/core.py @@ -1089,7 +1089,17 @@ def _getitem_single(self, where: Any) -> Array: raise DaskAwkwardNotImplemented(f"__getitem__ doesn't support where={where}.") - def __getitem__(self, where: Any) -> AwkwardDaskCollection: + @overload + def __getitem__(self, where: Array | str | Sequence[str] | slice) -> Array: + ... + + @overload + def __getitem__(self, where: int) -> Scalar: + ... + + def __getitem__( + self, where: Array | str | Sequence[str] | int | slice + ) -> Array | Scalar: """Select items from the collection. Heavily under construction. diff --git a/src/dask_awkward/lib/io/json.py b/src/dask_awkward/lib/io/json.py index 5901c0b2..a58a53c4 100644 --- a/src/dask_awkward/lib/io/json.py +++ b/src/dask_awkward/lib/io/json.py @@ -469,18 +469,18 @@ def to_json( array: Array, path: str, *, - line_delimited: bool | str, - num_indent_spaces: int | None, - num_readability_spaces: int, - nan_string: str | None, - posinf_string: str | None, - neginf_string: str | None, - complex_record_fields: tuple[str, str] | None, - convert_bytes: Callable | None, - convert_other: Callable | None, - storage_options: dict[str, Any] | None, - compression: str | None, - compute: Literal[False], + line_delimited: bool | str = True, + num_indent_spaces: int | None = None, + num_readability_spaces: int = 0, + nan_string: str | None = None, + posinf_string: str | None = None, + neginf_string: str | None = None, + complex_record_fields: tuple[str, str] | None = None, + convert_bytes: Callable | None = None, + convert_other: Callable | None = None, + storage_options: dict[str, Any] | None = None, + compression: str | None = None, + compute: Literal[False] = False, ) -> Scalar: ... @@ -490,18 +490,18 @@ def to_json( array: Array, path: str, *, - line_delimited: bool | str, - num_indent_spaces: int | None, - num_readability_spaces: int, - nan_string: str | None, - posinf_string: str | None, - neginf_string: str | None, - complex_record_fields: tuple[str, str] | None, - convert_bytes: Callable | None, - convert_other: Callable | None, - storage_options: dict[str, Any] | None, - compression: str | None, - compute: Literal[True], + line_delimited: bool | str = True, + num_indent_spaces: int | None = None, + num_readability_spaces: int = 0, + nan_string: str | None = None, + posinf_string: str | None = None, + neginf_string: str | None = None, + complex_record_fields: tuple[str, str] | None = None, + convert_bytes: Callable | None = None, + convert_other: Callable | None = None, + storage_options: dict[str, Any] | None = None, + compression: str | None = None, + compute: Literal[True] = True, ) -> None: ... diff --git a/tests/test_io_json.py b/tests/test_io_json.py index 0b9140b2..4bf8a59a 100644 --- a/tests/test_io_json.py +++ b/tests/test_io_json.py @@ -84,9 +84,8 @@ def input_layer_array_partition0(collection: Array) -> ak.Array: """ with dask.config.set({"awkward.optimization.which": ["columns"]}): optimized_hlg = dak_optimize(collection.dask, []) - layer_name = [ - name for name in list(optimized_hlg.layers) if name.startswith("from-json") - ][0] + layers = list(optimized_hlg.layers) # type: ignore[attr-defined] + layer_name = [name for name in layers if name.startswith("from-json")][0] sgc, arg = optimized_hlg[(layer_name, 0)] array = sgc.dsk[layer_name][0](arg) return array @@ -185,12 +184,16 @@ def test_json_bytes_no_delim_defined(ndjson_points_file: str) -> None: @pytest.mark.parametrize("compression", ["xz", "gzip", "zip"]) -def test_to_and_from_json(daa, tmpdir_factory, compression): - tdir = str(tmpdir_factory.mktemp("json_temp")) +def test_to_and_from_json( + daa: Array, + tmp_path_factory: pytest.TempPathFactory, + compression: str, +) -> None: + tdir = str(tmp_path_factory.mktemp("json_temp")) p1 = os.path.join(tdir, "z", "z") - dak.to_json(daa, p1, compute=True) + dak.to_json(array=daa, path=p1, compute=True) paths = list((Path(tdir) / "z" / "z").glob("part*.json")) assert len(paths) == daa.npartitions arrays = ak.concatenate([ak.from_json(p, line_delimited=True) for p in paths]) @@ -200,8 +203,8 @@ def test_to_and_from_json(daa, tmpdir_factory, compression): assert_eq(arrays, x) s = dak.to_json( - daa, - tdir, + array=daa, + path=tdir, compression=compression, compute=False, ) From 8764ec600a7e6ba61844e10be69410e48e0b7a27 Mon Sep 17 00:00:00 2001 From: Doug Davis Date: Thu, 17 Aug 2023 14:43:17 -0500 Subject: [PATCH 13/21] handle unknown type; add layout_to_jsonschema to top level API --- src/dask_awkward/__init__.py | 2 +- src/dask_awkward/lib/io/json.py | 9 ++++++++- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/src/dask_awkward/__init__.py b/src/dask_awkward/__init__.py index 267fdf69..794c4ff1 100644 --- a/src/dask_awkward/__init__.py +++ b/src/dask_awkward/__init__.py @@ -29,7 +29,7 @@ to_dataframe, to_delayed, ) -from dask_awkward.lib.io.json import from_json, to_json +from dask_awkward.lib.io.json import from_json, layout_to_jsonschema, to_json from dask_awkward.lib.io.parquet import from_parquet, to_parquet from dask_awkward.lib.operations import concatenate from dask_awkward.lib.reducers import ( diff --git a/src/dask_awkward/lib/io/json.py b/src/dask_awkward/lib/io/json.py index a58a53c4..60c40883 100644 --- a/src/dask_awkward/lib/io/json.py +++ b/src/dask_awkward/lib/io/json.py @@ -656,7 +656,14 @@ def layout_to_jsonschema( elif layout.is_indexed: pass elif layout.is_unknown: - existing_schema["type"] = "object" + existing_schema["type"] = [ + "array", + "boolean", + "object", + "null", + "number", + "string", + ] elif layout.is_union: existing_schema["type"] = [ layout_to_jsonschema(content)["type"] for content in layout.contents From 8ec7acbb8d685b37d350ff5a1a43e1c3705a27e8 Mon Sep 17 00:00:00 2001 From: Doug Davis Date: Tue, 29 Aug 2023 16:21:44 -0500 Subject: [PATCH 14/21] rework meta determination; remove single obj per file support --- src/dask_awkward/lib/io/json.py | 367 ++++++++++++-------------------- tests/test_io.py | 2 +- tests/test_io_json.py | 45 ---- 3 files changed, 137 insertions(+), 277 deletions(-) diff --git a/src/dask_awkward/lib/io/json.py b/src/dask_awkward/lib/io/json.py index 60c40883..467a9af3 100644 --- a/src/dask_awkward/lib/io/json.py +++ b/src/dask_awkward/lib/io/json.py @@ -2,15 +2,9 @@ import abc import math -import warnings from collections.abc import Callable, Sequence from typing import TYPE_CHECKING, Any, Literal, overload -try: - import ujson as json -except ImportError: - import json # type: ignore[no-redef] - import awkward as ak from awkward.forms.form import Form from dask.base import tokenize @@ -39,13 +33,29 @@ __all__ = ("from_json", "to_json") -class _FromJsonFn: +def _fs_token_paths( + source: Any, + *, + storage_options: dict[str, Any] | None = None, +) -> tuple[AbstractFileSystem, str, list[str]]: + fs, token, paths = get_fs_token_paths(source, storage_options=storage_options) + + # if paths is length 1, check to see if it's a directory and + # wildcard search for JSON files. Otherwise, just keep whatever + # has already been found. + if len(paths) == 1 and fs.isdir(paths[0]): + paths = list(filter(lambda s: ".json" in s, fs.ls(paths[0]))) + + return fs, token, paths + + +class FromJsonFn: def __init__( self, *args: Any, storage: AbstractFileSystem, compression: str | None = None, - schema: dict | None = None, + schema: str | dict | list | None = None, form: Form | None = None, original_form: Form | None = None, **kwargs: Any, @@ -63,13 +73,13 @@ def __call__(self, source: Any) -> ak.Array: ... -class _FromJsonLineDelimitedFn(_FromJsonFn): +class FromJsonLineDelimitedFn(FromJsonFn): def __init__( self, *args: Any, storage: AbstractFileSystem, compression: str | None = None, - schema: dict | None = None, + schema: str | dict | list | None = None, form: Form | None = None, **kwargs: Any, ) -> None: @@ -84,196 +94,153 @@ def __init__( def __call__(self, source: str) -> ak.Array: with self.storage.open(source, mode="rt", compression=self.compression) as f: - return ak.from_json(f.read(), line_delimited=True, schema=self.schema) + return ak.from_json( + f.read(), + line_delimited=True, + schema=self.schema, + **self.kwargs, + ) def project_columns( self, columns: Sequence[str], original_form: Form | None = None, - ) -> _FromJsonLineDelimitedFn: + ) -> FromJsonLineDelimitedFn: + # if a schema is already defined we are not going to project + # columns; we will keep the schema that already exists. + if self.schema is not None: + return self + if self.form is not None: form = self.form.select_columns(columns) schema = layout_to_jsonschema(form.length_zero_array(highlevel=False)) - return _FromJsonLineDelimitedFn( + return FromJsonLineDelimitedFn( schema=schema, form=form, storage=self.storage, compression=self.compression, original_form=original_form, + **self.kwargs, ) return self -class _FromJsonSingleObjInFileFn(_FromJsonFn): +class FromJsonBytesFn: def __init__( self, - *args: Any, - storage: AbstractFileSystem, - compression: str | None = None, - schema: dict | None = None, + schema: str | dict | list | None = None, form: Form | None = None, original_form: Form | None = None, **kwargs: Any, - ) -> None: - super().__init__( - *args, - storage=storage, - compression=compression, - schema=schema, - form=form, - original_form=original_form, - **kwargs, - ) - - def __call__(self, source: str) -> ak.Array: - with self.storage.open(source, mode="rb", compression=self.compression) as f: - return ak.from_json(f, schema=self.schema, **self.kwargs) - - -class _FromJsonBytesFn: - def __init__( - self, - schema: dict | None = None, - form: Form | None = None, - original_form: Form | None = None, ) -> None: self.schema = schema self.form = form self.original_form = original_form + self.kwargs = kwargs def __call__(self, source: bytes) -> ak.Array: - return ak.from_json(source, line_delimited=True, schema=self.schema) + return ak.from_json( + source, + line_delimited=True, + schema=self.schema, + **self.kwargs, + ) -def derive_json_meta( - storage: AbstractFileSystem, - source: str, - schema: dict | None = None, - compression: str | None = "infer", - sample_rows: int = 5, +def meta_from_bytechunks( + fs: AbstractFileSystem, + paths: list[str], bytechunks: str | int = "16 KiB", - force_by_lines: bool = False, - one_obj_per_file: bool = False, -) -> ak.Array: - if compression == "infer": - compression = infer_compression(source) - + **kwargs: Any, +): bytechunks = parse_bytes(bytechunks) - - if one_obj_per_file: - fn = _FromJsonSingleObjInFileFn( - storage=storage, - compression=compression, - schema=schema, - ) - return typetracer_array(fn(source)) - - # when the data is uncompressed we read `bytechunks` number of - # bytes then split on a newline bytes, and use the first - # `sample_rows` number of lines. - if compression is None and not force_by_lines: - try: - bytes = storage.cat(source, start=0, end=bytechunks) - lines = [json.loads(ln) for ln in bytes.split(b"\n")[:sample_rows]] - return typetracer_array(ak.from_iter(lines)) - except ValueError: - # we'll get a ValueError if we can't decode the JSON from - # the bytes that we grabbed. - warnings.warn( - f"Couldn't determine metadata from reading first {bytechunks} " - f"of the dataset; will read the first {sample_rows} instead. " - "Try increasing the value of `bytechunks` or decreasing `sample_rows` " - "to remove this warning." - ) - - # for compressed data (or if explicitly asked for with - # force_by_lines set to True) we read the first `sample_rows` - # number of rows after opening the compressed file. - with storage.open(source, mode="rt", compression=compression) as f: + bytes = fs.cat(paths[0], start=0, end=bytechunks) + rfind = bytes.rfind(b"\n") + if rfind > 0: + bytes = bytes[:rfind] + array = ak.from_json(bytes, line_delimited=True, **kwargs) + return typetracer_array(array) + + +def meta_from_line_by_line( + fs: AbstractFileSystem, + paths: list[str], + compression: str | None, + sample_rows: int = 10, + **kwargs: Any, +): + with fs.open(paths[0], mode="rt", compression=compression) as f: lines = [] for i, line in enumerate(f): - lines.append(json.loads(line)) + lines.append(line) if i >= sample_rows: break - return typetracer_array(ak.from_iter(lines)) + array = ak.from_json("\n".join(lines), line_delimited=True, **kwargs) + return typetracer_array(array) def _from_json_files( + fs: AbstractFileSystem, + token: str, + paths: list[str], *, - urlpath: str | list[str], - meta: ak.Array, - schema: dict | None = None, - one_obj_per_file: bool = False, + schema: str | dict | list | None = None, compression: str | None = "infer", behavior: dict | None = None, - derive_meta_kwargs: dict[str, Any] | None = None, - storage_options: dict[str, Any] | None = None, + **kwargs: Any, ) -> Array: - fs, fstoken, urlpaths = get_fs_token_paths( - urlpath, - mode="rb", - storage_options=storage_options, - ) - if compression == "infer": - compression = infer_compression(urlpaths[0]) + compression = infer_compression(paths[0]) - if meta is None: - meta_read_kwargs = derive_meta_kwargs or {} - meta = derive_json_meta( - fs, - urlpaths[0], - schema=schema, - one_obj_per_file=one_obj_per_file, - compression=compression, - **meta_read_kwargs, - ) + if not compression: + meta = meta_from_bytechunks(fs, paths, **kwargs) + else: + meta = meta_from_line_by_line(fs, paths, compression, **kwargs) - token = tokenize(fstoken, one_obj_per_file, compression, meta) + token = tokenize(compression, meta, kwargs) - if one_obj_per_file: - f: _FromJsonFn = _FromJsonSingleObjInFileFn( - storage=fs, - compression=compression, - schema=schema, - ) - else: - f = _FromJsonLineDelimitedFn( - storage=fs, - compression=compression, - schema=schema, - form=meta.layout.form, - ) + f = FromJsonLineDelimitedFn( + storage=fs, + compression=compression, + schema=schema, + form=meta.layout.form, + **kwargs, + ) return from_map( - f, urlpaths, label="from-json", token=token, meta=meta, behavior=behavior + f, + paths, + label="from-json", + token=token, + meta=meta, + behavior=behavior, ) def _from_json_bytes( *, - urlpath: str | list[str], - meta: ak.Array, - schema: dict | None, - blocksize: int | str, + source: str | list[str], + schema: str | dict | list | None = None, + blocksize: str, delimiter: Any, behavior: dict | None, storage_options: dict[str, Any] | None, + **kwargs: Any, ) -> Array: - token = tokenize(urlpath, delimiter, blocksize, meta) + token = tokenize(source, delimiter, blocksize) name = f"from-json-{token}" storage_options = storage_options or {} _, bytechunks = read_bytes( - urlpath, + source, delimiter=delimiter, blocksize=blocksize, sample="0", **storage_options, ) flat_chunks = list(flatten(bytechunks)) - f = _FromJsonBytesFn(schema=schema) + f = FromJsonBytesFn(schema=schema, **kwargs) dsk = { (name, i): (f, delayed_chunk.key) for i, delayed_chunk in enumerate(flat_chunks) } @@ -291,102 +258,33 @@ def _from_json_bytes( # ) hlg = HighLevelGraph.from_collections(name, dsk, dependencies=deps) - return new_array_object(hlg, name, meta=meta, behavior=behavior, npartitions=n) + return new_array_object(hlg, name, meta=None, behavior=behavior, npartitions=n) def from_json( - urlpath: str | list[str], - schema: dict | None = None, - # nan_string: str | None = None, - # posinf_string: str | None = None, - # neginf_string: str | None = None, - # complex_record_fields: tuple[str, str] | None = None, - # buffersize: int = 65536, - # initial: int = 1024, - # resize: float = 1.5, - highlevel: bool = True, + source: str | list[str], *, - blocksize: int | str | None = None, + line_delimited: bool = True, + schema: str | dict | list | None = None, + nan_string: str | None = None, + posinf_string: str | None = None, + neginf_string: str | None = None, + complex_record_fields: tuple[str, str] | None = None, + buffersize: int = 65536, + initial: int = 1024, + resize: float = 8, + highlevel: bool = True, + behavior: dict | None = None, + blocksize: str | None = None, delimiter: bytes | None = None, - one_obj_per_file: bool = False, compression: str | None = "infer", - meta: ak.Array | None = None, - behavior: dict | None = None, - derive_meta_kwargs: dict[str, Any] | None = None, storage_options: dict[str, Any] | None = None, ) -> Array: - """Create an Awkward Array collection from JSON data. - - There are three styles supported for reading JSON data: - - 1. Line delimited style: file(s) with one JSON object per line. - The function argument defaults are setup to handle this style. - This method assumes newline characters are not embedded in JSON - values. - 2. Single JSON object per file (this requires `one_obj_per_file` - to be set to ``True``. These objects *must* be arrays. - 3. Reading some number of bytes at a time. If at least one of - `blocksize` or `delimiter` are defined, Dask's - :py:func:`~dask.bytes.read_bytes` function will be used to - lazily read bytes (`blocksize` bytes per partition) and split - on `delimiter`). This method assumes line delimited JSON - without newline characters embedded in JSON values. - - Parameters - ---------- - urlpath : str | list[str] - The source of the JSON dataset. - blocksize : int | str, optional - If defined, each partition will be created from a block of - JSON bytes of this size. If `delimiter` is defined (not - ``None``) but this value remains ``None``, a default value of - ``128 MiB`` will be used. - delimiter : bytes, optional - If defined (not ``None``), this will be the byte(s) to split - on when reading `blocksizes`. If this is ``None`` but - `blocksize` is defined (not ``None``), the default byte - charater will be the newline (``b"\\n"``). - one_obj_per_file : bool - If ``True`` each file will be considered a single JSON object. - compression : str, optional - Compression of the files in the dataset. - meta : Any, optional - The metadata for the collection. If ``None`` (the default), - them metadata will be determined by scanning the beginning of - the dataset. - derive_meta_kwargs : dict[str, Any], optional - Dictionary of arguments to be passed to `derive_json_meta` for - determining the collection metadata if `meta` is ``None``. - storage_options : dict[str, Any], optional - Storage options passed to fsspec. - - Returns - ------- - Array - The resulting Dask Awkward Array collection. - - Examples - -------- - One partition per file: - - >>> import dask_awkard as dak - >>> a = dak.from_json("dataset*.json") - - One partition ber 200 MB of JSON data: - - >>> a = dak.from_json("dataset*.json", blocksize="200 MB") - - Same as previous call (explicit definition of the delimiter): - - >>> a = dak.from_json( - ... "dataset*.json", blocksize="200 MB", delimiter=b"\\n", - ... ) - - """ - if not highlevel: raise ValueError("dask-awkward only supports highlevel awkward Arrays.") + fs, token, paths = _fs_token_paths(source, storage_options=storage_options) + # allow either blocksize or delimieter being not-None to trigger # line deliminated JSON reading. if blocksize is not None and delimiter is None: @@ -394,31 +292,38 @@ def from_json( elif blocksize is None and delimiter == b"\n": blocksize = "128 MiB" - # if delimiter is None and blocksize is None we are expecting to - # read a single file or a list of files. The list of files are - # expected to be line delimited (one JSON object per line) - if delimiter is None and blocksize is None: + if blocksize is None and delimiter is None: return _from_json_files( - urlpath=urlpath, - meta=meta, + fs=fs, + token=token, + paths=paths, schema=schema, - one_obj_per_file=one_obj_per_file, compression=compression, + nan_string=nan_string, + posinf_string=posinf_string, + neginf_string=neginf_string, + complex_record_fields=complex_record_fields, + buffersize=buffersize, + initial=initial, + resize=resize, behavior=behavior, - derive_meta_kwargs=derive_meta_kwargs, - storage_options=storage_options, ) - - # if a `delimiter` and `blocksize` are defined we use Dask's + # if a `delimiter` and `blocksize` are defined we use Dask's # `read_bytes` function to get delayed chunks of bytes. elif delimiter is not None and blocksize is not None: return _from_json_bytes( - urlpath=urlpath, - meta=meta, + source=source, schema=schema, delimiter=delimiter, blocksize=blocksize, behavior=behavior, + nan_string=nan_string, + posinf_string=posinf_string, + neginf_string=neginf_string, + complex_record_fields=complex_record_fields, + buffersize=buffersize, + initial=initial, + resize=resize, storage_options=storage_options, ) @@ -427,7 +332,7 @@ def from_json( raise TypeError("Incompatible combination of arguments.") # pragma: no cover -class _ToJsonFn: +class ToJsonFn: def __init__( self, fs: AbstractFileSystem, @@ -575,7 +480,7 @@ def to_json( fs, _ = url_to_fs(path, **storage_options) nparts = array.npartitions map_res = map_partitions( - _ToJsonFn( + ToJsonFn( fs, path, npartitions=nparts, diff --git a/tests/test_io.py b/tests/test_io.py index 886dbe39..ca42db9b 100644 --- a/tests/test_io.py +++ b/tests/test_io.py @@ -84,7 +84,7 @@ def assert_2(arr): c = b.map_partitions(assert_1)[["b", "a"]].map_partitions(assert_2) # arbitrary order here - assert set(list(dak.lib.necessary_columns(c).values())[0]) == {"b", "a"} + assert set(list(dak.necessary_columns(c).values())[0]) == {"b", "a"} arr = c.compute() assert arr.fields == ["b", "a"] # output has required order diff --git a/tests/test_io_json.py b/tests/test_io_json.py index 4bf8a59a..0e56923a 100644 --- a/tests/test_io_json.py +++ b/tests/test_io_json.py @@ -108,33 +108,6 @@ def test_json_column_projection2(json_data_dir: Path) -> None: assert array.fields == ["goals"] -def test_json_force_by_lines_meta(ndjson_points_file: str) -> None: - daa1 = dak.from_json( - [ndjson_points_file] * 5, - derive_meta_kwargs={"force_by_lines": True}, - ) - daa2 = dak.from_json([ndjson_points_file] * 3) - assert daa1._meta is not None - assert daa2._meta is not None - f1 = daa1._meta.layout.form - f2 = daa2._meta.layout.form - assert f1 == f2 - - -def test_derive_json_meta_trigger_warning(ndjson_points_file: str) -> None: - with pytest.warns(UserWarning): - dak.from_json([ndjson_points_file], derive_meta_kwargs={"bytechunks": 64}) - - -def test_json_one_obj_per_file(single_record_file: str) -> None: - daa = dak.from_json( - [single_record_file] * 5, - one_obj_per_file=True, - ) - caa = ak.concatenate([ak.from_json(Path(single_record_file))] * 5) - assert_eq(daa, caa) - - def test_json_delim_defined(ndjson_points_file: str) -> None: source = [ndjson_points_file] * 6 daa = dak.from_json(source, delimiter=b"\n") @@ -151,24 +124,6 @@ def test_json_delim_defined(ndjson_points_file: str) -> None: ) -def test_json_sample_rows_true(ndjson_points_file: str) -> None: - source = [ndjson_points_file] * 5 - - daa = dak.from_json( - source, - derive_meta_kwargs={"force_by_lines": True, "sample_rows": 2}, - ) - - concretes = [] - for s in source: - with open(s) as f: - for line in f: - concretes.append(json.loads(line)) - caa = ak.from_iter(concretes) - - assert_eq(daa, caa) - - def test_json_bytes_no_delim_defined(ndjson_points_file: str) -> None: source = [ndjson_points_file] * 7 daa = dak.from_json(source, blocksize=650, delimiter=None) From 66cf018a3532a45e41c6c80e20a6bc12572d84b4 Mon Sep 17 00:00:00 2001 From: Doug Davis Date: Wed, 30 Aug 2023 16:05:39 -0500 Subject: [PATCH 15/21] single obj per file back --- src/dask_awkward/lib/io/json.py | 178 ++++++++++++++++++++++++++------ tests/conftest.py | 2 +- tests/test_io_json.py | 27 ++++- 3 files changed, 171 insertions(+), 36 deletions(-) diff --git a/src/dask_awkward/lib/io/json.py b/src/dask_awkward/lib/io/json.py index 467a9af3..27eb5da6 100644 --- a/src/dask_awkward/lib/io/json.py +++ b/src/dask_awkward/lib/io/json.py @@ -27,28 +27,13 @@ if TYPE_CHECKING: from awkward.contents.content import Content from fsspec.spec import AbstractFileSystem + from typing_extensions import Self from dask_awkward.lib.core import Array, Scalar __all__ = ("from_json", "to_json") -def _fs_token_paths( - source: Any, - *, - storage_options: dict[str, Any] | None = None, -) -> tuple[AbstractFileSystem, str, list[str]]: - fs, token, paths = get_fs_token_paths(source, storage_options=storage_options) - - # if paths is length 1, check to see if it's a directory and - # wildcard search for JSON files. Otherwise, just keep whatever - # has already been found. - if len(paths) == 1 and fs.isdir(paths[0]): - paths = list(filter(lambda s: ".json" in s, fs.ls(paths[0]))) - - return fs, token, paths - - class FromJsonFn: def __init__( self, @@ -72,6 +57,29 @@ def __init__( def __call__(self, source: Any) -> ak.Array: ... + def _default_project_columns( + self, + columns: Sequence[str], + original_form: Form | None = None, + ) -> Self: + if self.schema is not None: + return self + + if self.form is not None: + form = self.form.select_columns(columns) + schema = layout_to_jsonschema(form.length_zero_array(highlevel=False)) + + return type(self)( + schema=schema, + form=form, + storage=self.storage, + compression=self.compression, + original_form=original_form, + **self.kwargs, + ) + + return self + class FromJsonLineDelimitedFn(FromJsonFn): def __init__( @@ -105,26 +113,54 @@ def project_columns( self, columns: Sequence[str], original_form: Form | None = None, - ) -> FromJsonLineDelimitedFn: - # if a schema is already defined we are not going to project - # columns; we will keep the schema that already exists. - if self.schema is not None: - return self + ) -> Self: + return self._default_project_columns( + columns=columns, + original_form=original_form, + ) - if self.form is not None: - form = self.form.select_columns(columns) - schema = layout_to_jsonschema(form.length_zero_array(highlevel=False)) - return FromJsonLineDelimitedFn( - schema=schema, - form=form, - storage=self.storage, - compression=self.compression, - original_form=original_form, - **self.kwargs, +class FromJsonSingleObjPerFile(FromJsonFn): + def __init__( + self, + *args: Any, + storage: AbstractFileSystem, + compression: str | None = None, + schema: str | dict | list | None = None, + form: Form | None = None, + **kwargs: Any, + ) -> None: + super().__init__( + *args, + storage=storage, + compression=compression, + schema=schema, + form=form, + **kwargs, + ) + + def __call__(self, source: str) -> ak.Array: + with self.storage.open(source, mode="rt", compression=self.compression) as f: + return ak.Array( + [ + ak.from_json( + f.read(), + line_delimited=False, + schema=self.schema, + **self.kwargs, + ) + ] ) - return self + def project_columns( + self, + columns: Sequence[str], + original_form: Form | None = None, + ) -> Self: + return self._default_project_columns( + columns=columns, + original_form=original_form, + ) class FromJsonBytesFn: @@ -149,6 +185,17 @@ def __call__(self, source: bytes) -> ak.Array: ) +def meta_from_single_file( + fs: AbstractFileSystem, + paths: list[str], + compression: str | None, + **kwargs: Any, +) -> ak.Array: + with fs.open(paths[0], compression=compression) as f: + array = ak.Array([ak.from_json(f.read(), line_delimited=False, **kwargs)]) + return typetracer_array(array) + + def meta_from_bytechunks( fs: AbstractFileSystem, paths: list[str], @@ -188,7 +235,6 @@ def _from_json_files( *, schema: str | dict | list | None = None, compression: str | None = "infer", - behavior: dict | None = None, **kwargs: Any, ) -> Array: if compression == "infer": @@ -215,7 +261,38 @@ def _from_json_files( label="from-json", token=token, meta=meta, - behavior=behavior, + ) + + +def _from_json_sopf( + fs: AbstractFileSystem, + token: str, + paths: list[str], + *, + schema: str | dict | list | None = None, + compression: str | None = None, + **kwargs: Any, +) -> Array: + if compression == "infer": + compression = infer_compression(paths[0]) + + meta = meta_from_single_file(fs, paths, compression, **kwargs) + token = tokenize(token, compression, meta, kwargs) + + f = FromJsonSingleObjPerFile( + storage=fs, + compression=compression, + schema=schema, + form=meta.layout.form, + **kwargs, + ) + + return from_map( + f, + paths, + label="from-json", + token=token, + meta=meta, ) @@ -261,6 +338,22 @@ def _from_json_bytes( return new_array_object(hlg, name, meta=None, behavior=behavior, npartitions=n) +def _fs_token_paths( + source: Any, + *, + storage_options: dict[str, Any] | None = None, +) -> tuple[AbstractFileSystem, str, list[str]]: + fs, token, paths = get_fs_token_paths(source, storage_options=storage_options) + + # if paths is length 1, check to see if it's a directory and + # wildcard search for JSON files. Otherwise, just keep whatever + # has already been found. + if len(paths) == 1 and fs.isdir(paths[0]): + paths = list(filter(lambda s: ".json" in s, fs.find(paths[0]))) + + return fs, token, paths + + def from_json( source: str | list[str], *, @@ -292,6 +385,23 @@ def from_json( elif blocksize is None and delimiter == b"\n": blocksize = "128 MiB" + if not line_delimited: + return _from_json_sopf( + fs=fs, + token=token, + paths=paths, + schema=schema, + compression=compression, + nan_string=nan_string, + posinf_string=posinf_string, + neginf_string=neginf_string, + complex_record_fields=complex_record_fields, + buffersize=buffersize, + initial=initial, + resize=resize, + behavior=behavior, + ) + if blocksize is None and delimiter is None: return _from_json_files( fs=fs, diff --git a/tests/conftest.py b/tests/conftest.py index 881f287d..d5e8a3c8 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -16,7 +16,7 @@ @pytest.fixture(scope="session") def single_record_file(tmp_path_factory: pytest.TempPathFactory) -> str: fname = tmp_path_factory.mktemp("data") / "single_record.json" - record = [{"record": [1, 2, 3]}] + record = {"record": [1, 2, 3]} with fsspec.open(fname, "w") as f: print(json.dumps(record), file=f) return str(fname) diff --git a/tests/test_io_json.py b/tests/test_io_json.py index 0e56923a..075000cf 100644 --- a/tests/test_io_json.py +++ b/tests/test_io_json.py @@ -5,6 +5,7 @@ import awkward as ak import dask +import fsspec import pytest import dask_awkward as dak @@ -60,7 +61,7 @@ def concrete_data(json_data_dir: Path) -> ak.Array: def test_json_sanity(json_data_dir: Path, concrete_data: ak.Array) -> None: - source = os.path.join(str(json_data_dir), "*.json") + source = os.path.join(str(json_data_dir)) ds = dak.from_json(source) assert ds @@ -138,6 +139,30 @@ def test_json_bytes_no_delim_defined(ndjson_points_file: str) -> None: assert_eq(daa, caa) +@pytest.mark.parametrize("compression", [None, "xz"]) +def test_json_one_obj_per_file( + single_record_file: str, + tmp_path_factory: pytest.TempPathFactory, + compression: str | None, +) -> None: + if compression: + d = tmp_path_factory.mktemp("sopf") + p = str(d / f"file.json.{compression}") + with fsspec.open(single_record_file, "rt") as f: + text = f.read() + with fsspec.open(p, mode="w", compression=compression) as f: + print(text, file=f) + file_of_interest = p + else: + file_of_interest = single_record_file + + daa = dak.from_json([file_of_interest] * 5, line_delimited=False) + single_record = ak.from_json(Path(single_record_file), line_delimited=False) + single_entry_array = ak.Array([single_record]) + caa = ak.concatenate([single_entry_array] * 5) + assert_eq(daa, caa) + + @pytest.mark.parametrize("compression", ["xz", "gzip", "zip"]) def test_to_and_from_json( daa: Array, From cd6168f6bb180bf5677dbb964522d1b9e2fdf625 Mon Sep 17 00:00:00 2001 From: Doug Davis Date: Thu, 31 Aug 2023 15:28:40 -0500 Subject: [PATCH 16/21] json byte chunks project_columns compatible --- src/dask_awkward/lib/io/json.py | 278 +++++++++++++++++++++++--------- 1 file changed, 205 insertions(+), 73 deletions(-) diff --git a/src/dask_awkward/lib/io/json.py b/src/dask_awkward/lib/io/json.py index 27eb5da6..416bc316 100644 --- a/src/dask_awkward/lib/io/json.py +++ b/src/dask_awkward/lib/io/json.py @@ -9,19 +9,13 @@ from awkward.forms.form import Form from dask.base import tokenize from dask.blockwise import BlockIndex -from dask.bytes.core import read_bytes from dask.core import flatten from dask.highlevelgraph import HighLevelGraph -from dask.utils import parse_bytes -from fsspec.core import get_fs_token_paths, url_to_fs -from fsspec.utils import infer_compression - -from dask_awkward.lib.core import ( - map_partitions, - new_array_object, - new_scalar_object, - typetracer_array, -) +from dask.utils import is_integer, parse_bytes +from fsspec.core import OpenFile, get_fs_token_paths, url_to_fs +from fsspec.utils import infer_compression, read_block + +from dask_awkward.lib.core import map_partitions, new_scalar_object, typetracer_array from dask_awkward.lib.io.io import from_map if TYPE_CHECKING: @@ -37,7 +31,7 @@ class FromJsonFn: def __init__( self, - *args: Any, + *, storage: AbstractFileSystem, compression: str | None = None, schema: str | dict | list | None = None, @@ -48,7 +42,6 @@ def __init__( self.compression = compression self.storage = storage self.schema = schema - self.args = args self.kwargs = kwargs self.form = form self.original_form = original_form @@ -84,19 +77,20 @@ def _default_project_columns( class FromJsonLineDelimitedFn(FromJsonFn): def __init__( self, - *args: Any, + *, storage: AbstractFileSystem, compression: str | None = None, schema: str | dict | list | None = None, form: Form | None = None, + original_form: Form | None = None, **kwargs: Any, ) -> None: super().__init__( - *args, storage=storage, compression=compression, schema=schema, form=form, + original_form=original_form, **kwargs, ) @@ -123,19 +117,20 @@ def project_columns( class FromJsonSingleObjPerFile(FromJsonFn): def __init__( self, - *args: Any, + *, storage: AbstractFileSystem, compression: str | None = None, schema: str | dict | list | None = None, form: Form | None = None, + original_form: Form | None = None, **kwargs: Any, ) -> None: super().__init__( - *args, storage=storage, compression=compression, schema=schema, form=form, + original_form=original_form, **kwargs, ) @@ -163,27 +158,52 @@ def project_columns( ) -class FromJsonBytesFn: +class FromJsonBytesFn(FromJsonFn): def __init__( self, + *, + storage: AbstractFileSystem, + compression: str | None = None, schema: str | dict | list | None = None, form: Form | None = None, original_form: Form | None = None, **kwargs: Any, ) -> None: - self.schema = schema - self.form = form - self.original_form = original_form - self.kwargs = kwargs + super().__init__( + storage=storage, + compression=compression, + schema=schema, + form=form, + original_form=original_form, + **kwargs, + ) + + def __call__(self, ingrediants: tuple[Any, ...]) -> ak.Array: + (fs, path, compression, offsets, length, delimiter) = ingrediants + + with fs.open(path, compression=compression) as f: + if offsets == 0 and length is None: + bytestring = f.read() + else: + bytestring = read_block(f, offsets, length, delimiter) - def __call__(self, source: bytes) -> ak.Array: return ak.from_json( - source, + bytestring, line_delimited=True, schema=self.schema, **self.kwargs, ) + def project_columns( + self, + columns: Sequence[str], + original_form: Form | None = None, + ) -> Self: + return self._default_project_columns( + columns=columns, + original_form=original_form, + ) + def meta_from_single_file( fs: AbstractFileSystem, @@ -199,7 +219,7 @@ def meta_from_single_file( def meta_from_bytechunks( fs: AbstractFileSystem, paths: list[str], - bytechunks: str | int = "16 KiB", + bytechunks: str | int = "128KiB", **kwargs: Any, ): bytechunks = parse_bytes(bytechunks) @@ -296,49 +316,7 @@ def _from_json_sopf( ) -def _from_json_bytes( - *, - source: str | list[str], - schema: str | dict | list | None = None, - blocksize: str, - delimiter: Any, - behavior: dict | None, - storage_options: dict[str, Any] | None, - **kwargs: Any, -) -> Array: - token = tokenize(source, delimiter, blocksize) - name = f"from-json-{token}" - storage_options = storage_options or {} - _, bytechunks = read_bytes( - source, - delimiter=delimiter, - blocksize=blocksize, - sample="0", - **storage_options, - ) - flat_chunks = list(flatten(bytechunks)) - f = FromJsonBytesFn(schema=schema, **kwargs) - dsk = { - (name, i): (f, delayed_chunk.key) for i, delayed_chunk in enumerate(flat_chunks) - } - deps = flat_chunks - n = len(deps) - - # doesn't work because flat_chunks elements are remaining delayed objects. - # return from_map( - # _from_json_bytes, - # flat_chunks, - # label="from-json", - # token=token, - # produces_tasks=True, - # deps=flat_chunks, - # ) - - hlg = HighLevelGraph.from_collections(name, dsk, dependencies=deps) - return new_array_object(hlg, name, meta=None, behavior=behavior, npartitions=n) - - -def _fs_token_paths( +def json_fs_token_paths( source: Any, *, storage_options: dict[str, Any] | None = None, @@ -370,13 +348,17 @@ def from_json( behavior: dict | None = None, blocksize: str | None = None, delimiter: bytes | None = None, + sample: str | int = "128KiB", compression: str | None = "infer", storage_options: dict[str, Any] | None = None, ) -> Array: if not highlevel: raise ValueError("dask-awkward only supports highlevel awkward Arrays.") - fs, token, paths = _fs_token_paths(source, storage_options=storage_options) + fs, token, paths = json_fs_token_paths(source, storage_options=storage_options) + + if len(paths) == 0: + raise OSError("%s resolved to no files" % source) # allow either blocksize or delimieter being not-None to trigger # line deliminated JSON reading. @@ -385,6 +367,8 @@ def from_json( elif blocksize is None and delimiter == b"\n": blocksize = "128 MiB" + # if line delimited is False we use the single object per file + # implementation. if not line_delimited: return _from_json_sopf( fs=fs, @@ -402,6 +386,8 @@ def from_json( behavior=behavior, ) + # if we are not using blocksize and delimiter we are partitioning + # by file. if blocksize is None and delimiter is None: return _from_json_files( fs=fs, @@ -418,14 +404,18 @@ def from_json( resize=resize, behavior=behavior, ) - # if a `delimiter` and `blocksize` are defined we use Dask's - # `read_bytes` function to get delayed chunks of bytes. + + # if a `delimiter` and `blocksize` are defined we use the byte + # reading implementation elif delimiter is not None and blocksize is not None: return _from_json_bytes( - source=source, + fs=fs, + token=token, + paths=paths, schema=schema, delimiter=delimiter, blocksize=blocksize, + sample=sample, behavior=behavior, nan_string=nan_string, posinf_string=posinf_string, @@ -434,7 +424,6 @@ def from_json( buffersize=buffersize, initial=initial, resize=resize, - storage_options=storage_options, ) # otherwise the arguments are bad @@ -668,6 +657,8 @@ def layout_to_jsonschema( existing_schema["type"] = "integer" elif layout.dtype.kind == "f": existing_schema["type"] = "number" + elif layout.dtype.kind == "b": + existing_schema["type"] = "boolean" elif layout.is_indexed: pass elif layout.is_unknown: @@ -707,3 +698,144 @@ def ak_schema_repr(arr): import yaml return yaml.dump(arr.layout.form) + + +def _from_json_bytes( + fs: AbstractFileSystem, + token: str, + paths: list[str], + *, + schema: str | dict | list | None = None, + compression: str | None = "infer", + delimiter: bytes = b"\n", + not_zero: bool = False, + blocksize: str | int = "128 MiB", + sample: str | int = "128 kiB", + **kwargs, +) -> Array: + if compression == "infer": + compression = infer_compression(paths[0]) + + token = tokenize( + fs, + token, + paths, + schema, + compression, + delimiter, + not_zero, + blocksize, + sample, + kwargs, + ) + + if blocksize is not None: + if isinstance(blocksize, str): + blocksize = parse_bytes(blocksize) + if not is_integer(blocksize): + raise TypeError("blocksize must be an integer") + blocksize = int(blocksize) + + if blocksize is None: + offsets = [[0]] * len(paths) + lengths = [[None]] * len(paths) + else: + offsets = [] + lengths = [] + for path in paths: + if compression == "infer": + comp = infer_compression(path) + else: + comp = compression + if comp is not None: + raise ValueError( + "Cannot do chunked reads on compressed files. " + "To read, set blocksize=None" + ) + size = fs.info(path)["size"] + if size is None: + raise ValueError( + "Backing filesystem couldn't determine file size, cannot " + "do chunked reads. To read, set blocksize=None." + ) + + elif size == 0: + # skip empty + offsets.append([]) + lengths.append([]) + else: + # shrink blocksize to give same number of parts + if size % blocksize and size > blocksize: + blocksize1 = size / (size // blocksize) + else: + blocksize1 = blocksize + place = 0 + off = [0] + length = [] + + # figure out offsets, spreading around spare bytes + while size - place > (blocksize1 * 2) - 1: + place += blocksize1 + off.append(int(place)) + length.append(off[-1] - off[-2]) + length.append(size - off[-1]) + + if not_zero: + off[0] = 1 + length[0] -= 1 + offsets.append(off) + lengths.append(length) + + out = [] + for path, offset, length in zip(paths, offsets, lengths): + values = [ + ( + fs, + path, + compression, + offs, + leng, + delimiter, + ) + for offs, leng in zip(offset, length) + ] + out.append(values) + + if isinstance(sample, str): + sample = parse_bytes(sample) + with OpenFile(fs, paths[0], compression=compression) as f: + # read block without seek (because we start at zero) + if delimiter is None: + sample_bytes = f.read(sample) + else: + sample_buff = f.read(sample) + while True: + new = f.read(sample) + if not new: + break + if delimiter in new: + sample_buff = sample_buff + new.split(delimiter, 1)[0] + delimiter + break + sample_buff = sample_buff + new + sample_bytes = sample_buff + + rfind = sample_bytes.rfind(delimiter) + if rfind > 0: + sample_bytes = sample_bytes[:rfind] + meta = typetracer_array(ak.from_json(sample_bytes, line_delimited=True, **kwargs)) + + fn = FromJsonBytesFn( + storage=fs, + compression=compression, + schema=schema, + form=meta.layout.form, + **kwargs, + ) + + return from_map( + fn, + list(flatten(out)), + label="from-json", + token=token, + meta=meta, + ) From f9bbec1a8f9b0fa2a2aa09c8694a54882be5a4b7 Mon Sep 17 00:00:00 2001 From: Doug Davis Date: Fri, 1 Sep 2023 11:21:06 -0500 Subject: [PATCH 17/21] abstract out bytes reading ingredients --- src/dask_awkward/lib/io/io.py | 104 +++++++++++++++++++++++++++++- src/dask_awkward/lib/io/json.py | 109 ++++---------------------------- 2 files changed, 116 insertions(+), 97 deletions(-) diff --git a/src/dask_awkward/lib/io/io.py b/src/dask_awkward/lib/io/io.py index dde47983..ef184133 100644 --- a/src/dask_awkward/lib/io/io.py +++ b/src/dask_awkward/lib/io/io.py @@ -9,7 +9,7 @@ from awkward.types.numpytype import primitive_to_dtype from dask.base import flatten, tokenize from dask.highlevelgraph import HighLevelGraph -from dask.utils import funcname +from dask.utils import funcname, is_integer, parse_bytes from dask_awkward.layers import AwkwardBlockwiseLayer, AwkwardInputLayer from dask_awkward.layers.layers import AwkwardMaterializedLayer @@ -25,6 +25,7 @@ from dask.bag.core import Bag as DaskBag from dask.dataframe.core import DataFrame as DaskDataFrame from dask.delayed import Delayed + from fsspec.spec import AbstractFileSystem from dask_awkward.lib.core import Array @@ -572,3 +573,104 @@ def from_map( ) return result + + +def bytes_reading_ingredients( + fs: AbstractFileSystem, + paths: list[str], + compression: str | None, + delimiter: bytes | None, + not_zero: bool, + blocksize: str | int, + sample: str | int, +) -> tuple[list[tuple], bytes]: + if blocksize is not None: + if isinstance(blocksize, str): + blocksize = parse_bytes(blocksize) + if not is_integer(blocksize): + raise TypeError("blocksize must be an integer") + blocksize = int(blocksize) + + if blocksize is None: + offsets = [[0]] * len(paths) + lengths = [[None]] * len(paths) + else: + offsets = [] + lengths = [] + for path in paths: + if compression is not None: + raise ValueError( + "Cannot do chunked reads on compressed files. " + "To read, set blocksize=None" + ) + size = fs.info(path)["size"] + if size is None: + raise ValueError( + "Backing filesystem couldn't determine file size, cannot " + "do chunked reads. To read, set blocksize=None." + ) + + elif size == 0: + # skip empty + offsets.append([]) + lengths.append([]) + else: + # shrink blocksize to give same number of parts + if size % blocksize and size > blocksize: + blocksize1 = size / (size // blocksize) + else: + blocksize1 = blocksize + place = 0 + off = [0] + length = [] + + # figure out offsets, spreading around spare bytes + while size - place > (blocksize1 * 2) - 1: + place += blocksize1 + off.append(int(place)) + length.append(off[-1] - off[-2]) + length.append(size - off[-1]) + + if not_zero: + off[0] = 1 + length[0] -= 1 + offsets.append(off) + lengths.append(length) + + out = [] + for path, offset, length in zip(paths, offsets, lengths): + values = [ + ( + fs, + path, + compression, + offs, + leng, + delimiter, + ) + for offs, leng in zip(offset, length) + ] + out.append(values) + + sample_size = parse_bytes(sample) if isinstance(sample, str) else sample + with fs.open(paths[0], compression=compression) as f: + # read block without seek (because we start at zero) + if delimiter is None: + sample_bytes = f.read(sample_size) + else: + sample_buff = f.read(sample_size) + while True: + new = f.read(sample_size) + if not new: + break + if delimiter in new: + sample_buff = sample_buff + new.split(delimiter, 1)[0] + delimiter + break + sample_buff = sample_buff + new + sample_bytes = sample_buff + + rfind = sample_bytes.rfind(delimiter) + if rfind > 0: + sample_bytes = sample_bytes[:rfind] + + return out, sample_bytes diff --git a/src/dask_awkward/lib/io/json.py b/src/dask_awkward/lib/io/json.py index 416bc316..b38e4470 100644 --- a/src/dask_awkward/lib/io/json.py +++ b/src/dask_awkward/lib/io/json.py @@ -11,12 +11,12 @@ from dask.blockwise import BlockIndex from dask.core import flatten from dask.highlevelgraph import HighLevelGraph -from dask.utils import is_integer, parse_bytes -from fsspec.core import OpenFile, get_fs_token_paths, url_to_fs +from dask.utils import parse_bytes +from fsspec.core import get_fs_token_paths, url_to_fs from fsspec.utils import infer_compression, read_block from dask_awkward.lib.core import map_partitions, new_scalar_object, typetracer_array -from dask_awkward.lib.io.io import from_map +from dask_awkward.lib.io.io import bytes_reading_ingredients, from_map if TYPE_CHECKING: from awkward.contents.content import Content @@ -729,99 +729,16 @@ def _from_json_bytes( kwargs, ) - if blocksize is not None: - if isinstance(blocksize, str): - blocksize = parse_bytes(blocksize) - if not is_integer(blocksize): - raise TypeError("blocksize must be an integer") - blocksize = int(blocksize) + bytes_ingredients, sample_bytes = bytes_reading_ingredients( + fs, + paths, + compression, + delimiter, + not_zero, + blocksize, + sample, + ) - if blocksize is None: - offsets = [[0]] * len(paths) - lengths = [[None]] * len(paths) - else: - offsets = [] - lengths = [] - for path in paths: - if compression == "infer": - comp = infer_compression(path) - else: - comp = compression - if comp is not None: - raise ValueError( - "Cannot do chunked reads on compressed files. " - "To read, set blocksize=None" - ) - size = fs.info(path)["size"] - if size is None: - raise ValueError( - "Backing filesystem couldn't determine file size, cannot " - "do chunked reads. To read, set blocksize=None." - ) - - elif size == 0: - # skip empty - offsets.append([]) - lengths.append([]) - else: - # shrink blocksize to give same number of parts - if size % blocksize and size > blocksize: - blocksize1 = size / (size // blocksize) - else: - blocksize1 = blocksize - place = 0 - off = [0] - length = [] - - # figure out offsets, spreading around spare bytes - while size - place > (blocksize1 * 2) - 1: - place += blocksize1 - off.append(int(place)) - length.append(off[-1] - off[-2]) - length.append(size - off[-1]) - - if not_zero: - off[0] = 1 - length[0] -= 1 - offsets.append(off) - lengths.append(length) - - out = [] - for path, offset, length in zip(paths, offsets, lengths): - values = [ - ( - fs, - path, - compression, - offs, - leng, - delimiter, - ) - for offs, leng in zip(offset, length) - ] - out.append(values) - - if isinstance(sample, str): - sample = parse_bytes(sample) - with OpenFile(fs, paths[0], compression=compression) as f: - # read block without seek (because we start at zero) - if delimiter is None: - sample_bytes = f.read(sample) - else: - sample_buff = f.read(sample) - while True: - new = f.read(sample) - if not new: - break - if delimiter in new: - sample_buff = sample_buff + new.split(delimiter, 1)[0] + delimiter - break - sample_buff = sample_buff + new - sample_bytes = sample_buff - - rfind = sample_bytes.rfind(delimiter) - if rfind > 0: - sample_bytes = sample_bytes[:rfind] meta = typetracer_array(ak.from_json(sample_bytes, line_delimited=True, **kwargs)) fn = FromJsonBytesFn( @@ -834,7 +751,7 @@ def _from_json_bytes( return from_map( fn, - list(flatten(out)), + list(flatten(bytes_ingredients)), label="from-json", token=token, meta=meta, From 4dc383274a895db77708af3140cd8513ac8d2fa8 Mon Sep 17 00:00:00 2001 From: Doug Davis Date: Fri, 1 Sep 2023 11:54:13 -0500 Subject: [PATCH 18/21] rough outline for read_text API --- src/dask_awkward/__init__.py | 1 + src/dask_awkward/lib/__init__.py | 1 + src/dask_awkward/lib/io/io.py | 47 ++++++++++++++++++++++++++++++++ 3 files changed, 49 insertions(+) diff --git a/src/dask_awkward/__init__.py b/src/dask_awkward/__init__.py index 794c4ff1..af92f0be 100644 --- a/src/dask_awkward/__init__.py +++ b/src/dask_awkward/__init__.py @@ -24,6 +24,7 @@ from_delayed, from_lists, from_map, + from_text, to_dask_array, to_dask_bag, to_dataframe, diff --git a/src/dask_awkward/lib/__init__.py b/src/dask_awkward/lib/__init__.py index 197d141c..a9b0887a 100644 --- a/src/dask_awkward/lib/__init__.py +++ b/src/dask_awkward/lib/__init__.py @@ -15,6 +15,7 @@ from_delayed, from_lists, from_map, + from_text, to_dask_array, to_dask_bag, to_dataframe, diff --git a/src/dask_awkward/lib/io/io.py b/src/dask_awkward/lib/io/io.py index ef184133..947d89b9 100644 --- a/src/dask_awkward/lib/io/io.py +++ b/src/dask_awkward/lib/io/io.py @@ -10,6 +10,7 @@ from dask.base import flatten, tokenize from dask.highlevelgraph import HighLevelGraph from dask.utils import funcname, is_integer, parse_bytes +from fsspec.utils import read_block from dask_awkward.layers import AwkwardBlockwiseLayer, AwkwardInputLayer from dask_awkward.layers.layers import AwkwardMaterializedLayer @@ -674,3 +675,49 @@ def bytes_reading_ingredients( sample_bytes = sample_bytes[:rfind] return out, sample_bytes + + +class FromTextFn: + def __init__(self): + pass + + def __call__(self, ingredients: tuple) -> ak.Array: + (fs, path, compression, offsets, length, delimiter) = ingredients + + with fs.open(path, compression=compression) as f: + if offsets == 0 and length is None: + bytestring = f.read() + else: + bytestring = read_block(f, offsets, length, delimiter) + + buffer = np.frombuffer(bytestring, dtype=np.uint8) + array = ak.from_numpy(buffer) + array = ak.unflatten(array, len(array)) + array = ak.enforce_type(array, "string") + array_split = ak.str.split_pattern(array, "\n") + lines = array_split[0] + return lines + + +def from_text(source, blocksize, delimiter, storage_options: dict | None = None): + from fsspec.core import get_fs_token_paths + + fs, token, paths = get_fs_token_paths(source, storage_options=storage_options or {}) + + bytes_ingredients, sample_bytes = bytes_reading_ingredients( + fs, + paths, + None, + delimiter, + False, + blocksize, + "128 KiB", + ) + + return from_map( + FromTextFn(), + list(flatten(bytes_ingredients)), + label="from-text", + token=token, + meta=None, + ) From e3bff179747f64b16ed04c931424ed06e8181a7b Mon Sep 17 00:00:00 2001 From: Doug Davis Date: Fri, 1 Sep 2023 12:36:43 -0500 Subject: [PATCH 19/21] typing, some sensible defaults --- src/dask_awkward/lib/io/io.py | 37 +++++++++++++++++++++++++---------- 1 file changed, 27 insertions(+), 10 deletions(-) diff --git a/src/dask_awkward/lib/io/io.py b/src/dask_awkward/lib/io/io.py index 947d89b9..b96905ec 100644 --- a/src/dask_awkward/lib/io/io.py +++ b/src/dask_awkward/lib/io/io.py @@ -677,6 +677,16 @@ def bytes_reading_ingredients( return out, sample_bytes +def _string_array_from_bytestring(bytestring): + buffer = np.frombuffer(bytestring, dtype=np.uint8) + array = ak.from_numpy(buffer) + array = ak.unflatten(array, len(array)) + array = ak.enforce_type(array, "string") + array_split = ak.str.split_pattern(array, "\n") + lines = array_split[0] + return lines + + class FromTextFn: def __init__(self): pass @@ -690,20 +700,22 @@ def __call__(self, ingredients: tuple) -> ak.Array: else: bytestring = read_block(f, offsets, length, delimiter) - buffer = np.frombuffer(bytestring, dtype=np.uint8) - array = ak.from_numpy(buffer) - array = ak.unflatten(array, len(array)) - array = ak.enforce_type(array, "string") - array_split = ak.str.split_pattern(array, "\n") - lines = array_split[0] - return lines + return _string_array_from_bytestring(bytestring) -def from_text(source, blocksize, delimiter, storage_options: dict | None = None): +def from_text( + source: str | list[str], + blocksize: str | int = "128 MiB", + delimiter: bytes = b"\n", + sample_size: str | int = "128 KiB", + storage_options: dict | None = None, +) -> Array: from fsspec.core import get_fs_token_paths fs, token, paths = get_fs_token_paths(source, storage_options=storage_options or {}) + token = tokenize(source, token, blocksize, delimiter, sample_size) + bytes_ingredients, sample_bytes = bytes_reading_ingredients( fs, paths, @@ -711,13 +723,18 @@ def from_text(source, blocksize, delimiter, storage_options: dict | None = None) delimiter, False, blocksize, - "128 KiB", + sample_size, ) + rfind = sample_bytes.rfind(delimiter) + if rfind > 0: + sample_bytes = sample_bytes[:rfind] + meta = typetracer_array(_string_array_from_bytestring(sample_bytes)) + return from_map( FromTextFn(), list(flatten(bytes_ingredients)), label="from-text", token=token, - meta=None, + meta=meta, ) From 85316c3800242d1f5961a12f37fb48054deffba9 Mon Sep 17 00:00:00 2001 From: Doug Davis Date: Fri, 1 Sep 2023 12:48:07 -0500 Subject: [PATCH 20/21] delimiter use --- src/dask_awkward/lib/io/io.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/src/dask_awkward/lib/io/io.py b/src/dask_awkward/lib/io/io.py index b96905ec..9d7dac0a 100644 --- a/src/dask_awkward/lib/io/io.py +++ b/src/dask_awkward/lib/io/io.py @@ -677,12 +677,14 @@ def bytes_reading_ingredients( return out, sample_bytes -def _string_array_from_bytestring(bytestring): +def _string_array_from_bytestring(bytestring, delimiter): buffer = np.frombuffer(bytestring, dtype=np.uint8) array = ak.from_numpy(buffer) array = ak.unflatten(array, len(array)) array = ak.enforce_type(array, "string") - array_split = ak.str.split_pattern(array, "\n") + if isinstance(delimiter, bytes): + delimiter = delimiter.decode() + array_split = ak.str.split_pattern(array, delimiter) lines = array_split[0] return lines @@ -700,7 +702,7 @@ def __call__(self, ingredients: tuple) -> ak.Array: else: bytestring = read_block(f, offsets, length, delimiter) - return _string_array_from_bytestring(bytestring) + return _string_array_from_bytestring(bytestring, delimiter) def from_text( @@ -729,7 +731,7 @@ def from_text( rfind = sample_bytes.rfind(delimiter) if rfind > 0: sample_bytes = sample_bytes[:rfind] - meta = typetracer_array(_string_array_from_bytestring(sample_bytes)) + meta = typetracer_array(_string_array_from_bytestring(sample_bytes, delimiter)) return from_map( FromTextFn(), From 94de75d7ee029ae6a1bce0f6c5fb63ec160f228d Mon Sep 17 00:00:00 2001 From: Doug Davis Date: Fri, 1 Sep 2023 14:18:41 -0500 Subject: [PATCH 21/21] support compressed files; drop trailing newline --- src/dask_awkward/lib/io/io.py | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/src/dask_awkward/lib/io/io.py b/src/dask_awkward/lib/io/io.py index 9d7dac0a..bdde080e 100644 --- a/src/dask_awkward/lib/io/io.py +++ b/src/dask_awkward/lib/io/io.py @@ -10,7 +10,8 @@ from dask.base import flatten, tokenize from dask.highlevelgraph import HighLevelGraph from dask.utils import funcname, is_integer, parse_bytes -from fsspec.utils import read_block +from fsspec.core import get_fs_token_paths +from fsspec.utils import infer_compression, read_block from dask_awkward.layers import AwkwardBlockwiseLayer, AwkwardInputLayer from dask_awkward.layers.layers import AwkwardMaterializedLayer @@ -682,10 +683,12 @@ def _string_array_from_bytestring(bytestring, delimiter): array = ak.from_numpy(buffer) array = ak.unflatten(array, len(array)) array = ak.enforce_type(array, "string") - if isinstance(delimiter, bytes): - delimiter = delimiter.decode() array_split = ak.str.split_pattern(array, delimiter) lines = array_split[0] + if len(lines) == 0: + return lines + if lines[-1] == "": + lines = lines[:-1] return lines @@ -710,18 +713,20 @@ def from_text( blocksize: str | int = "128 MiB", delimiter: bytes = b"\n", sample_size: str | int = "128 KiB", + compression: str | None = "infer", storage_options: dict | None = None, ) -> Array: - from fsspec.core import get_fs_token_paths - fs, token, paths = get_fs_token_paths(source, storage_options=storage_options or {}) token = tokenize(source, token, blocksize, delimiter, sample_size) + if compression == "infer": + compression = infer_compression(paths[0]) + bytes_ingredients, sample_bytes = bytes_reading_ingredients( fs, paths, - None, + compression, delimiter, False, blocksize,