Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Chunkinfo #234

Merged
merged 32 commits into from
Jul 12, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
8163b4c
add hdf5 to Docker image
jreadey May 24, 2023
019c5fc
added IndexIterator
jreadey May 24, 2023
1c13a7b
added chunklocator.py
jreadey May 25, 2023
49d4969
update SingleObject design
jreadey May 25, 2023
5347d0e
add test case for chunk init
jreadey May 25, 2023
834a0e8
chunkinitializer support
jreadey May 31, 2023
e3247b5
fix flake8 error
jreadey May 31, 2023
76b9ae6
filter out stray log messages from chunk initializer
jreadey Jun 1, 2023
75960d2
updates based on pr review
jreadey Jun 2, 2023
c83e15c
added arange chunk initializer
jreadey Jun 6, 2023
26c7238
fix falke8 errors
jreadey Jun 6, 2023
5c7d884
added intelligent range get support, remove range_proxy
jreadey Jun 19, 2023
9ede8e3
reduced debug log verbosity
jreadey Jun 19, 2023
ccdda94
use hyper_dims creation property
jreadey Jun 19, 2023
79e1988
added munger for rangeget requests
jreadey Jun 20, 2023
07dafad
flake8 cleanup
jreadey Jun 21, 2023
ddca08d
updates per code review
jreadey Jun 21, 2023
e582e9b
fix comment
jreadey Jul 3, 2023
87ed37a
fix numpy warning
jreadey Jul 3, 2023
84c0649
update aiobotocore to 2.5.0
jreadey Jul 3, 2023
39391c8
change urllib requirement
jreadey Jul 3, 2023
b996c8b
match requirements.txt with setup.py
jreadey Jul 3, 2023
d949a1a
remove version restriction on urllib3
jreadey Jul 6, 2023
9b0fad6
remove version restriction on botocore3
jreadey Jul 6, 2023
663627e
remove urlib3 from dependencies
jreadey Jul 6, 2023
36829ff
fix flake8 errors
jreadey Jul 6, 2023
115f088
fix for numpy deprecation if truth value of empty array
jreadey Jul 10, 2023
e8d32ed
fix flake8 errors
jreadey Jul 10, 2023
b61bd8f
more flake8 errors
jreadey Jul 10, 2023
073c9ec
merge changes from master
jreadey Jul 10, 2023
80e74e2
optimzie ndarray_compare for non-vlen arrays
jreadey Jul 12, 2023
4b29aa6
fix flake8 error
jreadey Jul 12, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions hsds/chunk_dn.py
Original file line number Diff line number Diff line change
Expand Up @@ -366,8 +366,8 @@ async def GET_Chunk(request):
num_bytes = s3size
else:
# list
num_bytes = np.prod(s3size)
log.debug(f"reading {num_bytes} from {s3path}")
num_bytes = np.sum(s3size)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the sum instead of product? If s3size is a list, isn't each entry in the list a dimension describing the size of the data in s3?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, it should be sum. In this case s3size is a colon separated list of range lengths, so the sum is the total number of bytes to read.

log.debug(f"reading {num_bytes} bytes from {s3path}")
if num_bytes == 0:
log.warn(f"GET_Chunk for s3path: {s3path} with empty byte range")
raise HTTPNotFound()
Expand Down
16 changes: 9 additions & 7 deletions hsds/chunk_sn.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,19 +279,21 @@ def getChunkItem(chunkid):
default_chunktable_dims = get_chunktable_dims(dims, chunk_dims)
log.debug(f"default_chunktable_dims: {default_chunktable_dims}")
table_factors = []
hyper_dims = []
if "hyper_dims" in layout:
hyper_dims = layout["hyper_dims"]
else:
# assume 1 to 1 matching
hyper_dims = chunk_dims
ref_num_chunks = num_chunks
for dim in range(rank):
if chunktable_dims[dim] % default_chunktable_dims[dim] != 0:
msg = f"expected chunktable shape[{dim}] to be a factor"
msg += f" of {default_chunktable_dims[dim]}"
if chunk_dims[dim] % hyper_dims[dim] != 0:
msg = f"expected hyper_dims [{hyper_dims[dim]}] to be a factor"
msg += f" of {chunk_dims[dim]}"
log.warn(msg)
raise HTTPBadRequest(reason=msg)
factor = chunktable_dims[dim] // default_chunktable_dims[dim]
factor = chunk_dims[dim] // hyper_dims[dim]
table_factors.append(factor)
ref_num_chunks *= factor
hyper_dim = chunk_dims[dim] // factor
hyper_dims.append(hyper_dim)
log.debug(f"table_factors: {table_factors}")
log.debug(f"ref_num_chunks: {ref_num_chunks}")
log.debug(f"hyper_dims: {hyper_dims}")
Expand Down
18 changes: 11 additions & 7 deletions hsds/datanode_lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -853,7 +853,9 @@ async def get_chunk_bytes(
# create a buffer for the hsds chunk and arrange h5 chunks within it
chunk_size = np.prod(chunk_dims) * item_size
# number of bytes in the hd5 chunk
# hyper_dims = [4000,] # test
h5_size = np.prod(hyper_dims) * item_size
log.debug(f"h5 chunk size: {h5_size}")
chunk_bytes = bytearray(chunk_size)
if num_chunks > chunk_size // h5_size:
# shouldn't have more than this many hyperchunks
Expand All @@ -874,27 +876,29 @@ async def get_chunk_bytes(
"length": length[i],
"bucket": bucket
}
log.debug(f"get_chunk_bytes - h5_chunk[{i}, offset: {offset[i]}, length: {length[i]}")
log.debug(f"get_chunk_bytes - h5_chunk[{i}], offset: {offset[i]}, length: {length[i]}")
tasks.append(getStorBytes(app, s3key, **kwargs))

log.debug(f"running asyncio.gather on {len(tasks)} tasks")
results = await asyncio.gather(*tasks)
log.debug(f"asyncio.gather got results: {results}")
log.debug(f"asyncio.gather got {len(results)} results")
if len(results) != num_chunks:
log.error("unexpected number of gather results")
raise HTTPInternalServerError()
for i in range(num_chunks):
h5_chunk = results[i]
if h5_chunk is None:
h5_chunk_bytes = results[i]
if h5_chunk_bytes is None:
log.warning(f"get_chunk_bytes - None returned for h5_chunk[{i}]")
continue
if len(h5_chunk) != h5_size:
msg = f"get_chunk_bytes - got {len(h5_chunk)} bytes for h5_chunk[{i}], "

if len(h5_chunk_bytes) != h5_size:
msg = f"get_chunk_bytes - got {len(h5_chunk_bytes)} bytes for h5_chunk[{i}], "
msg += f"expected: {h5_size}"
log.error(msg)
continue
pos = h5_size * i
chunk_bytes[pos:(pos + h5_size)] = h5_chunk
chunk_bytes[pos:(pos + h5_size)] = h5_chunk_bytes
log.debug(f"setting chunk_bytes[{pos}:{(pos+h5_size)}]")

"""
# serial version
Expand Down
32 changes: 32 additions & 0 deletions hsds/util/chunkUtil.py
Original file line number Diff line number Diff line change
Expand Up @@ -1170,3 +1170,35 @@ def chunkQuery(
log.debug(f" {i}: {rsp_arr[i]}")

return rsp_arr

def _find_min_pair(h5chunks, max_gap=None):
""" given a dict of chunk_map entries, return the two
chunks nearest to each other in the file.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return the two chunks -> return the indices of the two chunks

If max_gap is set, chunms must be within max_gap bytes """
if len(h5chunks) < 2:
return None
chunk_indices = list(h5chunks.keys())
min_pair = None
min_gap = None
for index_left in chunk_indices:
for index_right in chunk_indices:
if index_left == index_right:
continue
chunk_left = h5chunks[index_left]
chunk_right = h5chunks[index_right]
if chunk_left["offset"] > chunk_right["offset"]:
continue
gap = chunk_right["offset"] - (chunk_left["offset"] + chunk_left["length"])
if gap == 0:
# these two are contiguous
return (index_left, index_right)
if max_gap is not None and gap > max_gap:
# too far apart
continue
if min_gap is None or gap < min_gap:
min_gap = gap
min_pair = (index_left, index_right)
return min_pair



1 change: 0 additions & 1 deletion tests/integ/link_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,6 @@ def testGetLinks(self):
#

self.assertEqual(ret_names, expected_names)
print(expected_names)

# get links with a result limit of 4
limit = 4
Expand Down
3 changes: 2 additions & 1 deletion tests/integ/value_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -2998,7 +2998,8 @@ def testIntelligentRangeGet(self):
"class": "H5D_CHUNKED_REF_INDIRECT",
"file_uri": file_uri,
"dims": chunk_dims,
"chunk_table": chunkinfo_uuid,
"hyper_dims": [chunk_extent,],
"chunk_table": chunkinfo_uuid
}
# the linked dataset uses gzip, so set it here
gzip_filter = {
Expand Down