Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Listkeys #333

Merged
merged 5 commits into from
Mar 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions admin/config/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ max_tcp_connections: 100 # max number of inflight tcp connections
head_sleep_time: 10 # max sleep time between health checks for head node
node_sleep_time: 10 # max sleep time between health checks for SN/DN nodes
async_sleep_time: 1 # max sleep time between async task runs
scan_sleep_time: 10 # max sleep time between scaning runs
scan_wait_time: 10 # min time to wait after a domain update before starting a scan
gc_sleep_time: 10 # max time between runs to delete unused objects
s3_sync_interval: 1 # time to wait between s3_sync checks (in sec)
s3_age_time: 1 # time to wait since last update to write an object to S3
s3_sync_task_timeout: 10 # time to cancel write task if no response
Expand Down
6 changes: 3 additions & 3 deletions hsds/basenode.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,13 +224,13 @@ async def docker_update_dn_info(app):
log.warn("head_url is not set, can not register yet")
return
req_reg = head_url + "/register"
log.info(f"register: {req_reg}")
log.debug(f"register: {req_reg}")

body = {"id": app["id"], "port": app["node_port"], "node_type": app["node_type"]}
rsp_json = None

try:
log.info(f"register req: {req_reg} body: {body}")
log.debug(f"register req: {req_reg} body: {body}")
rsp_json = await http_post(app, req_reg, data=body)
except HTTPInternalServerError:
log.error("HEAD node seems to be down.")
Expand All @@ -242,7 +242,7 @@ async def docker_update_dn_info(app):
app["dn_ids"] = []

if rsp_json is not None:
log.info(f"register response: {rsp_json}")
log.debug(f"register response: {rsp_json}")
app["dn_urls"] = rsp_json["dn_urls"]
app["dn_ids"] = rsp_json["dn_ids"]

Expand Down
22 changes: 11 additions & 11 deletions hsds/datanode.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,9 @@ async def bucketScan(app):
"""Scan v2 keys and update .info.json"""
log.info("bucketScan start")

async_sleep_time = int(config.get("async_sleep_time"))
short_sleep_time = float(async_sleep_time) / 10.0
scan_wait_time = async_sleep_time # default to ~1min
scan_sleep_time = int(config.get("scan_sleep_time", default=10.0))
short_sleep_time = float(scan_sleep_time) / 10.0
scan_wait_time = int(config.get("scan_wait_time", default=60.0)) # default to ~1min
log.info(f"scan_wait_time: {scan_wait_time}")
last_action = time.time() # keep track of the last time any work was done

Expand All @@ -106,7 +106,7 @@ async def bucketScan(app):
while True:
if app["node_state"] != "READY":
log.info("bucketScan waiting for Node state to be READY")
await asyncio.sleep(async_sleep_time)
await asyncio.sleep(scan_sleep_time)
continue # wait for READY state

root_scan_ids = app["root_scan_ids"]
Expand Down Expand Up @@ -168,8 +168,8 @@ async def bucketScan(app):
last_action = time.time()

now = time.time()
if (now - last_action) > async_sleep_time:
sleep_time = async_sleep_time # long nap
if (now - last_action) > scan_sleep_time:
sleep_time = scan_sleep_time # long nap
else:
sleep_time = short_sleep_time # shot nap

Expand All @@ -193,15 +193,15 @@ def get_gc_count(app):

async def bucketGC(app):
"""remove objects from db for any deleted root groups or datasets"""
async_sleep_time = int(config.get("async_sleep_time"))
log.info(f"bucketGC start - async_sleep_time: {async_sleep_time}")
gc_sleep_time = int(config.get("gc_sleep_time", default=10))
log.info(f"bucketGC start - gc_sleep_time: {gc_sleep_time}")

# update/initialize root object before starting GC

while True:
if app["node_state"] not in ("READY", "TERMINATING"):
log.info("bucketGC - waiting for Node state to be READY")
await asyncio.sleep(async_sleep_time)
await asyncio.sleep(gc_sleep_time)
continue # wait for READY state

gc_buckets = app["gc_buckets"]
Expand Down Expand Up @@ -231,8 +231,8 @@ async def bucketGC(app):
else:
log.error(f"bucketGC - unexpected obj_id class: {bucket}/{obj_id}")

log.info(f"bucketGC - sleep: {async_sleep_time}")
await asyncio.sleep(async_sleep_time)
log.info(f"bucketGC - sleep: {gc_sleep_time}")
await asyncio.sleep(gc_sleep_time)

# shouldn't ever get here
log.error("bucketGC terminating unexpectedly")
Expand Down
4 changes: 2 additions & 2 deletions hsds/datanode_lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -1202,7 +1202,7 @@ async def s3sync(app, s3_age_time=0):

dirty_count = len(dirty_ids)
if not dirty_count:
log.info("s3sync nothing to update")
log.debug("s3sync nothing to update")
return 0
msg = f"s3sync update - dirtyid count: {dirty_count}, "
msg += f"active write tasks: {len(pending_s3_write_tasks)}/"
Expand Down Expand Up @@ -1378,5 +1378,5 @@ async def s3syncCheck(app):
sleep_time = last_update_delta
msg = "s3syncCheck no objects to write, "
msg += f"sleeping for {sleep_time:.2f}"
log.info(msg)
log.debug(msg)
await asyncio.sleep(sleep_time)
4 changes: 1 addition & 3 deletions hsds/dset_sn.py
Original file line number Diff line number Diff line change
Expand Up @@ -906,9 +906,7 @@ async def POST_Dataset(request):
msg += f"{min_chunk_size}, expanding"
log.debug(msg)
kwargs = {"chunk_min": min_chunk_size, "layout_class": layout_class}
adjusted_chunk_dims = expandChunk(
chunk_dims, item_size, shape_json, **kwargs
)
adjusted_chunk_dims = expandChunk(chunk_dims, item_size, shape_json, **kwargs)
elif chunk_size > max_chunk_size:
msg = f"chunk size: {chunk_size} greater than max size: "
msg += f"{max_chunk_size}, shrinking"
Expand Down
5 changes: 0 additions & 5 deletions hsds/util/chunkUtil.py
Original file line number Diff line number Diff line change
Expand Up @@ -786,7 +786,6 @@ def chunkReadSelection(chunk_arr, slices=None, select_dt=None):
if len(fields) > 1:
for field in fields:
arr[field] = output_arr[field]
log.debug(f"arr: {arr}")
else:
arr[...] = output_arr[fields[0]]
output_arr = arr # return this
Expand All @@ -801,7 +800,6 @@ def chunkWriteSelection(chunk_arr=None, slices=None, data=None):

log.debug(f"chunkWriteSelection for slices: {slices}")
dims = chunk_arr.shape
log.debug(f"data: {data}")

rank = len(dims)

Expand Down Expand Up @@ -846,7 +844,6 @@ def chunkWriteSelection(chunk_arr=None, slices=None, data=None):
else:
# check if the new data modifies the array or not
# TBD - is this worth the cost of comparing two arrays element by element?
log.debug(f"ndcompare: {chunk_arr[slices]} to {data}")
if not ndarray_compare(chunk_arr[slices], data):
# update chunk array
chunk_arr[slices] = data
Expand All @@ -858,7 +855,6 @@ def chunkWriteSelection(chunk_arr=None, slices=None, data=None):
raise

log.debug(f"ChunkWriteSelection - chunk updated: {updated}")
log.debug(f"chunk_arr: {chunk_arr}")

return updated

Expand Down Expand Up @@ -954,7 +950,6 @@ def chunkWritePoints(chunk_id=None,
if select_dt is None:
select_dt = dset_dtype # no field selection
log.debug(f"dtype: {dset_dtype}")
log.debug(f"point_arr: {point_arr}")

# point_arr should have the following type:
# (coord1, coord2, ...) | select_dtype
Expand Down
5 changes: 4 additions & 1 deletion hsds/util/fileClient.py
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,6 @@ async def list_keys(

filesep = pp.normpath("/") # '/' on linux, '\\' on windows

await asyncio.sleep(0) # for async compat
basedir = pp.join(self._root_dir, bucket)
if prefix:
basedir = pp.join(basedir, prefix)
Expand All @@ -354,6 +353,8 @@ async def list_keys(
files.append(filename)
if limit and len(files) >= limit:
break
if len(files) % 1000 == 0:
await asyncio.sleep(0)
break # don't recurse into subdirs

else:
Expand All @@ -366,6 +367,8 @@ async def list_keys(
files.append(filepath)
if limit and len(files) >= limit:
break
if len(files) % 1000:
await asyncio.sleep(0)

# use a dictionary to hold return values if stats are needed
key_names = {} if include_stats else []
Expand Down
35 changes: 32 additions & 3 deletions hsds/util/httpUtil.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# httpUtil:
# http-related helper functions
#
from asyncio import CancelledError
from asyncio import CancelledError, TimeoutError
import os
import socket
import numpy as np
Expand Down Expand Up @@ -327,6 +327,12 @@ async def http_get(app, url, params=None, client=None):
except CancelledError as cle:
log.warn(f"CancelledError for http_get({url}): {cle}")
raise HTTPInternalServerError()
except ConnectionResetError as cre:
log.warn(f"ConnectionResetError for http_get({url}): {cre}")
raise HTTPInternalServerError()
except TimeoutError as toe:
log.warn(f"TimeoutError for http_get({url}: {toe})")
raise HTTPServiceUnavailable()

return retval

Expand All @@ -335,10 +341,18 @@ async def http_post(app, url, data=None, params=None, client=None):
"""
Helper function - async HTTP POST
"""
if not url:
log.error("http_post with no url")
return
if url.startswith("http://head"):
# just use debug for health check traffic
logmsg = log.debug
else:
logmsg = log.info
msg = f"http_post('{url}'"
if isinstance(data, bytes):
msg += f" {len(data)} bytes"
log.info(msg)
logmsg(msg)
if client is None:
client = get_http_client(app, url=url)
url = get_http_std_url(url)
Expand All @@ -355,7 +369,7 @@ async def http_post(app, url, data=None, params=None, client=None):

try:
async with client.post(url, **kwargs) as rsp:
log.info(f"http_post status: {rsp.status}")
logmsg(f"http_post status: {rsp.status}")
if rsp.status == 200:
pass # ok
elif rsp.status == 201:
Expand Down Expand Up @@ -394,6 +408,12 @@ async def http_post(app, url, data=None, params=None, client=None):
except CancelledError as cle:
log.warn(f"CancelledError for http_post({url}): {cle}")
raise HTTPInternalServerError()
except ConnectionResetError as cre:
log.warn(f"ConnectionResetError for http_post({url}): {cre}")
raise HTTPInternalServerError()
except TimeoutError as toe:
log.warn(f"TimeoutError for http_post({url}: {toe})")
raise HTTPServiceUnavailable()

return retval

Expand Down Expand Up @@ -451,6 +471,12 @@ async def http_put(app, url, data=None, params=None, client=None):
except CancelledError as cle:
log.warn(f"CancelledError for http_put({url}): {cle}")
raise HTTPInternalServerError()
except ConnectionResetError as cre:
log.warn(f"ConnectionResetError for http_put({url}): {cre}")
raise HTTPInternalServerError()
except TimeoutError as toe:
log.warn(f"TimeoutError for http_put({url}: {toe})")
raise HTTPServiceUnavailable()
return retval


Expand Down Expand Up @@ -499,6 +525,9 @@ async def http_delete(app, url, data=None, params=None, client=None):
except ConnectionResetError as cre:
log.warn(f"ConnectionResetError for http_delete({url}): {cre}")
raise HTTPInternalServerError()
except TimeoutError as toe:
log.warn(f"TimeoutError for http_delete({url}: {toe})")
raise HTTPServiceUnavailable()

return rsp_json

Expand Down
5 changes: 1 addition & 4 deletions tests/integ/dataset_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,6 @@ def testScalarDataset(self):
# try to do a GET with a different domain (should fail)
another_domain = self.base_domain + "/testScalarDataset2.h5"
helper.setupDomain(another_domain)
print("testScalarDataset2", another_domain)
headers = helper.getRequestHeaders(domain=another_domain)
rsp = self.session.get(req, headers=headers)
self.assertEqual(rsp.status_code, 400)
Expand Down Expand Up @@ -246,9 +245,7 @@ def testGet(self):
req = helper.getEndpoint() + "/"
rsp = self.session.get(req, headers=headers)
if rsp.status_code != 200:
print(
"WARNING: Failed to get domain: {}. Is test data setup?".format(domain)
)
print(f"WARNING: Failed to get domain: {domain}. Is test data setup?")
return # abort rest of test
domainJson = json.loads(rsp.text)
root_uuid = domainJson["root"]
Expand Down
10 changes: 10 additions & 0 deletions tests/unit/chunk_util_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,16 @@ def testExpandChunk(self):
self.assertTrue(num_bytes > CHUNK_MIN)
self.assertTrue(num_bytes < CHUNK_MAX)

shape = {"class": "H5S_SIMPLE", "dims": [1000,]}
layout = (10,)
num_bytes = getChunkSize(layout, "H5T_VARIABLE")
self.assertTrue(num_bytes < CHUNK_MIN)
expanded = expandChunk(layout, "H5T_VARIABLE", shape, chunk_min=CHUNK_MIN)
print("expanded:", expanded)
num_bytes = getChunkSize(expanded, "H5T_VARIABLE")
self.assertTrue(num_bytes > CHUNK_MIN)
self.assertTrue(num_bytes < CHUNK_MAX)

shape = {
"class": "H5S_SIMPLE",
"dims": [1000, 10, 1000],
Expand Down
Loading