Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

prototype dak.from_text #7

Closed
wants to merge 38 commits into from
Closed
Show file tree
Hide file tree
Changes from 34 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
7b00694
schema <--> form/layout
douglasdavis Oct 31, 2022
df3db18
json IO dev
douglasdavis Mar 8, 2023
d2b7a67
lint
douglasdavis Mar 8, 2023
30e7748
Merge remote-tracking branch 'upstream/main' into more-json-dev
douglasdavis Jun 5, 2023
8826cf8
Merge remote-tracking branch 'upstream/main' into more-json-dev
douglasdavis Jun 14, 2023
5aa5d01
Merge remote-tracking branch 'upstream/main' into more-json-dev
douglasdavis Jul 13, 2023
4e83fd2
update w.r.t. upstream awkward changes
douglasdavis Jul 13, 2023
4e5e883
Merge remote-tracking branch 'upstream/main' into more-json-dev
douglasdavis Jul 17, 2023
bc686d2
unpolished column projection works
douglasdavis Jul 17, 2023
a235067
Merge remote-tracking branch 'upstream/main' into more-json-dev
douglasdavis Jul 17, 2023
31802a9
passing tests
douglasdavis Jul 19, 2023
8471d93
Merge branch 'main' into more-json-dev
douglasdavis Jul 25, 2023
c030a9b
Merge remote-tracking branch 'upstream/main' into more-json-dev
douglasdavis Jul 25, 2023
4ed1aff
Merge branch 'main' into more-json-dev
douglasdavis Jul 27, 2023
b197665
Merge remote-tracking branch 'upstream/main' into more-json-dev
douglasdavis Jul 28, 2023
7a97c02
to_json correct function args; some typing
douglasdavis Aug 1, 2023
a49c1cd
Merge remote-tracking branch 'upstream/main' into more-json-dev
douglasdavis Aug 1, 2023
ad97c61
Merge branch 'main' into more-json-dev
douglasdavis Aug 9, 2023
c8bc1ec
more layout type supported
douglasdavis Aug 10, 2023
d1dcc72
handle minItems maxItems for awkward's regular arrays
douglasdavis Aug 15, 2023
84a0dee
Merge branch 'main' into more-json-dev
douglasdavis Aug 15, 2023
d7957be
Merge branch 'main' into more-json-dev
douglasdavis Aug 16, 2023
f2cb69c
add tests (and move json specific tests)
douglasdavis Aug 17, 2023
bd65e00
generic paths
douglasdavis Aug 17, 2023
5a07892
typing
douglasdavis Aug 17, 2023
8764ec6
handle unknown type; add layout_to_jsonschema to top level API
douglasdavis Aug 17, 2023
1345d27
Merge remote-tracking branch 'upstream/main' into more-json-dev
douglasdavis Aug 25, 2023
ad3cb35
Merge branch 'main' into more-json-dev
douglasdavis Aug 25, 2023
8ec7acb
rework meta determination; remove single obj per file support
douglasdavis Aug 29, 2023
74c0d79
Merge remote-tracking branch 'upstream/main' into more-json-dev
douglasdavis Aug 29, 2023
66cf018
single obj per file back
douglasdavis Aug 30, 2023
cd6168f
json byte chunks project_columns compatible
douglasdavis Aug 31, 2023
f9bbec1
abstract out bytes reading ingredients
douglasdavis Sep 1, 2023
4dc3832
rough outline for read_text API
douglasdavis Sep 1, 2023
e3bff17
typing, some sensible defaults
douglasdavis Sep 1, 2023
85316c3
delimiter use
douglasdavis Sep 1, 2023
94de75d
support compressed files; drop trailing newline
douglasdavis Sep 1, 2023
50d4aaa
Merge remote-tracking branch 'upstream/main' into read-text
douglasdavis Sep 5, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/dask_awkward/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,13 @@
from_delayed,
from_lists,
from_map,
from_text,
to_dask_array,
to_dask_bag,
to_dataframe,
to_delayed,
)
from dask_awkward.lib.io.json import from_json, to_json
from dask_awkward.lib.io.json import from_json, layout_to_jsonschema, to_json
from dask_awkward.lib.io.parquet import from_parquet, to_parquet
from dask_awkward.lib.operations import concatenate
from dask_awkward.lib.reducers import (
Expand Down
1 change: 1 addition & 0 deletions src/dask_awkward/lib/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from_delayed,
from_lists,
from_map,
from_text,
to_dask_array,
to_dask_bag,
to_dataframe,
Expand Down
4 changes: 4 additions & 0 deletions src/dask_awkward/lib/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ def set_form_keys(form: Form, *, key: str) -> Form:
elif form.is_numpy:
form.form_key = key

elif form.is_union:
for entry in form.contents:
set_form_keys(entry, key=key)

# Anything else grab the content and keep recursing
else:
set_form_keys(form.content, key=key)
Expand Down
12 changes: 11 additions & 1 deletion src/dask_awkward/lib/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -1089,7 +1089,17 @@ def _getitem_single(self, where: Any) -> Array:

raise DaskAwkwardNotImplemented(f"__getitem__ doesn't support where={where}.")

def __getitem__(self, where: Any) -> AwkwardDaskCollection:
@overload
def __getitem__(self, where: Array | str | Sequence[str] | slice) -> Array:
...

@overload
def __getitem__(self, where: int) -> Scalar:
...

def __getitem__(
self, where: Array | str | Sequence[str] | int | slice
) -> Array | Scalar:
"""Select items from the collection.

Heavily under construction.
Expand Down
151 changes: 150 additions & 1 deletion src/dask_awkward/lib/io/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@
from awkward.types.numpytype import primitive_to_dtype
from dask.base import flatten, tokenize
from dask.highlevelgraph import HighLevelGraph
from dask.utils import funcname
from dask.utils import funcname, is_integer, parse_bytes
from fsspec.utils import read_block

from dask_awkward.layers import AwkwardBlockwiseLayer, AwkwardInputLayer
from dask_awkward.layers.layers import AwkwardMaterializedLayer
Expand All @@ -25,6 +26,7 @@
from dask.bag.core import Bag as DaskBag
from dask.dataframe.core import DataFrame as DaskDataFrame
from dask.delayed import Delayed
from fsspec.spec import AbstractFileSystem

from dask_awkward.lib.core import Array

Expand Down Expand Up @@ -572,3 +574,150 @@ def from_map(
)

return result


def bytes_reading_ingredients(
fs: AbstractFileSystem,
paths: list[str],
compression: str | None,
delimiter: bytes | None,
not_zero: bool,
blocksize: str | int,
sample: str | int,
) -> tuple[list[tuple], bytes]:
if blocksize is not None:
if isinstance(blocksize, str):
blocksize = parse_bytes(blocksize)
if not is_integer(blocksize):
raise TypeError("blocksize must be an integer")
blocksize = int(blocksize)

if blocksize is None:
offsets = [[0]] * len(paths)
lengths = [[None]] * len(paths)
else:
offsets = []
lengths = []
for path in paths:
if compression is not None:
raise ValueError(
"Cannot do chunked reads on compressed files. "
"To read, set blocksize=None"
)
size = fs.info(path)["size"]
if size is None:
raise ValueError(
"Backing filesystem couldn't determine file size, cannot "
"do chunked reads. To read, set blocksize=None."
)

elif size == 0:
# skip empty
offsets.append([])
lengths.append([])
else:
# shrink blocksize to give same number of parts
if size % blocksize and size > blocksize:
blocksize1 = size / (size // blocksize)
else:
blocksize1 = blocksize
place = 0
off = [0]
length = []

# figure out offsets, spreading around spare bytes
while size - place > (blocksize1 * 2) - 1:
place += blocksize1
off.append(int(place))
length.append(off[-1] - off[-2])
length.append(size - off[-1])

if not_zero:
off[0] = 1
length[0] -= 1
offsets.append(off)
lengths.append(length)

out = []
for path, offset, length in zip(paths, offsets, lengths):
values = [
(
fs,
path,
compression,
offs,
leng,
delimiter,
)
for offs, leng in zip(offset, length)
]
out.append(values)

sample_size = parse_bytes(sample) if isinstance(sample, str) else sample
with fs.open(paths[0], compression=compression) as f:
# read block without seek (because we start at zero)
if delimiter is None:
sample_bytes = f.read(sample_size)
else:
sample_buff = f.read(sample_size)
while True:
new = f.read(sample_size)
if not new:
break
if delimiter in new:
sample_buff = sample_buff + new.split(delimiter, 1)[0] + delimiter
break
sample_buff = sample_buff + new
sample_bytes = sample_buff

rfind = sample_bytes.rfind(delimiter)
if rfind > 0:
sample_bytes = sample_bytes[:rfind]

return out, sample_bytes


class FromTextFn:
def __init__(self):
pass

def __call__(self, ingredients: tuple) -> ak.Array:
(fs, path, compression, offsets, length, delimiter) = ingredients

with fs.open(path, compression=compression) as f:
if offsets == 0 and length is None:
bytestring = f.read()
else:
bytestring = read_block(f, offsets, length, delimiter)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the upstream dask function which will find the next delimited after the start and stop of the block, right?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


buffer = np.frombuffer(bytestring, dtype=np.uint8)
array = ak.from_numpy(buffer)
array = ak.unflatten(array, len(array))
array = ak.enforce_type(array, "string")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To ask our friends: it feels like we should be able to pass the buffer/bytestring directly to awkward and declare it a string rather than take four lines to do it. OTOH, I don't suppose these lines cost anything.

array_split = ak.str.split_pattern(array, "\n")
lines = array_split[0]

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

-> list[string]? but what was array_split, then, why the [0]?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

array_split ends up being of type 1 * var * string; so by grabbing [0] we get an array of strings N * string (used the awkward docs https://awkward-array.org/doc/main/user-guide/how-to-strings-read-binary.html)

return lines


def from_text(source, blocksize, delimiter, storage_options: dict | None = None):
from fsspec.core import get_fs_token_paths

fs, token, paths = get_fs_token_paths(source, storage_options=storage_options or {})

bytes_ingredients, sample_bytes = bytes_reading_ingredients(
fs,
paths,
None,
delimiter,
False,
blocksize,
"128 KiB",

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

= sample size? It's a reasonable value, but user might need to change it. We don't actually need it at all, since we know the form of the output is array-of-string (if delimiter is None) or array-of-list-of-string (otherwise). Do we allow for delimiter=None?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea it's the sample size; definitely planning on making it user definable! On delimiter- right now I'm strictly passing in b"\n" (temporary just to get the ball rolling), how to handle delimiters (a sensible default and what to supprt in general) was something I had in mind to discuss

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually we make no use of it, so the value should be None/False (whatever it takes not to read it)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

delimiter=b"\n" is a reasonable default

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

something I'm perhaps over complicating but I'm getting caught up wrapping my head around the difference between the bytes reading delimiter and then the "awkward-level" delimiter. At the bytes reading delimiter we use b"\n" as a default; but then at the awkward level I currently have hardcoded in
ak.str.split_pattern(array, "\n"). Is this something we should make user configurable?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The two should be the same: a chunk should end on an element-ending delimiter.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK great that was my feeling but was worried I wasn't accounting for some extra case

)

return from_map(
FromTextFn(),
list(flatten(bytes_ingredients)),
label="from-text",
token=token,
meta=None,
)
Loading
Loading