-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
prototype dak.from_text
#7
Changes from 34 commits
7b00694
df3db18
d2b7a67
30e7748
8826cf8
5aa5d01
4e83fd2
4e5e883
bc686d2
a235067
31802a9
8471d93
c030a9b
4ed1aff
b197665
7a97c02
a49c1cd
ad97c61
c8bc1ec
d1dcc72
84a0dee
d7957be
f2cb69c
bd65e00
5a07892
8764ec6
1345d27
ad3cb35
8ec7acb
74c0d79
66cf018
cd6168f
f9bbec1
4dc3832
e3bff17
85316c3
94de75d
50d4aaa
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,6 +15,7 @@ | |
from_delayed, | ||
from_lists, | ||
from_map, | ||
from_text, | ||
to_dask_array, | ||
to_dask_bag, | ||
to_dataframe, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -9,7 +9,8 @@ | |
from awkward.types.numpytype import primitive_to_dtype | ||
from dask.base import flatten, tokenize | ||
from dask.highlevelgraph import HighLevelGraph | ||
from dask.utils import funcname | ||
from dask.utils import funcname, is_integer, parse_bytes | ||
from fsspec.utils import read_block | ||
|
||
from dask_awkward.layers import AwkwardBlockwiseLayer, AwkwardInputLayer | ||
from dask_awkward.layers.layers import AwkwardMaterializedLayer | ||
|
@@ -25,6 +26,7 @@ | |
from dask.bag.core import Bag as DaskBag | ||
from dask.dataframe.core import DataFrame as DaskDataFrame | ||
from dask.delayed import Delayed | ||
from fsspec.spec import AbstractFileSystem | ||
|
||
from dask_awkward.lib.core import Array | ||
|
||
|
@@ -572,3 +574,150 @@ def from_map( | |
) | ||
|
||
return result | ||
|
||
|
||
def bytes_reading_ingredients( | ||
fs: AbstractFileSystem, | ||
paths: list[str], | ||
compression: str | None, | ||
delimiter: bytes | None, | ||
not_zero: bool, | ||
blocksize: str | int, | ||
sample: str | int, | ||
) -> tuple[list[tuple], bytes]: | ||
if blocksize is not None: | ||
if isinstance(blocksize, str): | ||
blocksize = parse_bytes(blocksize) | ||
if not is_integer(blocksize): | ||
raise TypeError("blocksize must be an integer") | ||
blocksize = int(blocksize) | ||
|
||
if blocksize is None: | ||
offsets = [[0]] * len(paths) | ||
lengths = [[None]] * len(paths) | ||
else: | ||
offsets = [] | ||
lengths = [] | ||
for path in paths: | ||
if compression is not None: | ||
raise ValueError( | ||
"Cannot do chunked reads on compressed files. " | ||
"To read, set blocksize=None" | ||
) | ||
size = fs.info(path)["size"] | ||
if size is None: | ||
raise ValueError( | ||
"Backing filesystem couldn't determine file size, cannot " | ||
"do chunked reads. To read, set blocksize=None." | ||
) | ||
|
||
elif size == 0: | ||
# skip empty | ||
offsets.append([]) | ||
lengths.append([]) | ||
else: | ||
# shrink blocksize to give same number of parts | ||
if size % blocksize and size > blocksize: | ||
blocksize1 = size / (size // blocksize) | ||
else: | ||
blocksize1 = blocksize | ||
place = 0 | ||
off = [0] | ||
length = [] | ||
|
||
# figure out offsets, spreading around spare bytes | ||
while size - place > (blocksize1 * 2) - 1: | ||
place += blocksize1 | ||
off.append(int(place)) | ||
length.append(off[-1] - off[-2]) | ||
length.append(size - off[-1]) | ||
|
||
if not_zero: | ||
off[0] = 1 | ||
length[0] -= 1 | ||
offsets.append(off) | ||
lengths.append(length) | ||
|
||
out = [] | ||
for path, offset, length in zip(paths, offsets, lengths): | ||
values = [ | ||
( | ||
fs, | ||
path, | ||
compression, | ||
offs, | ||
leng, | ||
delimiter, | ||
) | ||
for offs, leng in zip(offset, length) | ||
] | ||
out.append(values) | ||
|
||
sample_size = parse_bytes(sample) if isinstance(sample, str) else sample | ||
with fs.open(paths[0], compression=compression) as f: | ||
# read block without seek (because we start at zero) | ||
if delimiter is None: | ||
sample_bytes = f.read(sample_size) | ||
else: | ||
sample_buff = f.read(sample_size) | ||
while True: | ||
new = f.read(sample_size) | ||
if not new: | ||
break | ||
if delimiter in new: | ||
sample_buff = sample_buff + new.split(delimiter, 1)[0] + delimiter | ||
break | ||
sample_buff = sample_buff + new | ||
sample_bytes = sample_buff | ||
|
||
rfind = sample_bytes.rfind(delimiter) | ||
if rfind > 0: | ||
sample_bytes = sample_bytes[:rfind] | ||
|
||
return out, sample_bytes | ||
|
||
|
||
class FromTextFn: | ||
def __init__(self): | ||
pass | ||
|
||
def __call__(self, ingredients: tuple) -> ak.Array: | ||
(fs, path, compression, offsets, length, delimiter) = ingredients | ||
|
||
with fs.open(path, compression=compression) as f: | ||
if offsets == 0 and length is None: | ||
bytestring = f.read() | ||
else: | ||
bytestring = read_block(f, offsets, length, delimiter) | ||
|
||
buffer = np.frombuffer(bytestring, dtype=np.uint8) | ||
array = ak.from_numpy(buffer) | ||
array = ak.unflatten(array, len(array)) | ||
array = ak.enforce_type(array, "string") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To ask our friends: it feels like we should be able to pass the buffer/bytestring directly to awkward and declare it a string rather than take four lines to do it. OTOH, I don't suppose these lines cost anything. |
||
array_split = ak.str.split_pattern(array, "\n") | ||
lines = array_split[0] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. -> list[string]? but what was array_split, then, why the [0]? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
return lines | ||
|
||
|
||
def from_text(source, blocksize, delimiter, storage_options: dict | None = None): | ||
from fsspec.core import get_fs_token_paths | ||
|
||
fs, token, paths = get_fs_token_paths(source, storage_options=storage_options or {}) | ||
|
||
bytes_ingredients, sample_bytes = bytes_reading_ingredients( | ||
fs, | ||
paths, | ||
None, | ||
delimiter, | ||
False, | ||
blocksize, | ||
"128 KiB", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. = sample size? It's a reasonable value, but user might need to change it. We don't actually need it at all, since we know the form of the output is array-of-string (if delimiter is None) or array-of-list-of-string (otherwise). Do we allow for delimiter=None? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yea it's the sample size; definitely planning on making it user definable! On delimiter- right now I'm strictly passing in b"\n" (temporary just to get the ball rolling), how to handle delimiters (a sensible default and what to supprt in general) was something I had in mind to discuss There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually we make no use of it, so the value should be None/False (whatever it takes not to read it) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. something I'm perhaps over complicating but I'm getting caught up wrapping my head around the difference between the bytes reading delimiter and then the "awkward-level" delimiter. At the bytes reading delimiter we use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The two should be the same: a chunk should end on an element-ending delimiter. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OK great that was my feeling but was worried I wasn't accounting for some extra case |
||
) | ||
|
||
return from_map( | ||
FromTextFn(), | ||
list(flatten(bytes_ingredients)), | ||
label="from-text", | ||
token=token, | ||
meta=None, | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the upstream dask function which will find the next delimited after the start and stop of the block, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea these lines of code are from upstream dask (https://github.com/dask/dask/blob/4178feb58a7e708345ce4e41e018a746b0d1fd06/dask/bytes/core.py#L190)