Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Chunkinfo #234

Merged
merged 32 commits into from
Jul 12, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
8163b4c
add hdf5 to Docker image
jreadey May 24, 2023
019c5fc
added IndexIterator
jreadey May 24, 2023
1c13a7b
added chunklocator.py
jreadey May 25, 2023
49d4969
update SingleObject design
jreadey May 25, 2023
5347d0e
add test case for chunk init
jreadey May 25, 2023
834a0e8
chunkinitializer support
jreadey May 31, 2023
e3247b5
fix flake8 error
jreadey May 31, 2023
76b9ae6
filter out stray log messages from chunk initializer
jreadey Jun 1, 2023
75960d2
updates based on pr review
jreadey Jun 2, 2023
c83e15c
added arange chunk initializer
jreadey Jun 6, 2023
26c7238
fix falke8 errors
jreadey Jun 6, 2023
5c7d884
added intelligent range get support, remove range_proxy
jreadey Jun 19, 2023
9ede8e3
reduced debug log verbosity
jreadey Jun 19, 2023
ccdda94
use hyper_dims creation property
jreadey Jun 19, 2023
79e1988
added munger for rangeget requests
jreadey Jun 20, 2023
07dafad
flake8 cleanup
jreadey Jun 21, 2023
ddca08d
updates per code review
jreadey Jun 21, 2023
e582e9b
fix comment
jreadey Jul 3, 2023
87ed37a
fix numpy warning
jreadey Jul 3, 2023
84c0649
update aiobotocore to 2.5.0
jreadey Jul 3, 2023
39391c8
change urllib requirement
jreadey Jul 3, 2023
b996c8b
match requirements.txt with setup.py
jreadey Jul 3, 2023
d949a1a
remove version restriction on urllib3
jreadey Jul 6, 2023
9b0fad6
remove version restriction on botocore3
jreadey Jul 6, 2023
663627e
remove urlib3 from dependencies
jreadey Jul 6, 2023
36829ff
fix flake8 errors
jreadey Jul 6, 2023
115f088
fix for numpy deprecation if truth value of empty array
jreadey Jul 10, 2023
e8d32ed
fix flake8 errors
jreadey Jul 10, 2023
b61bd8f
more flake8 errors
jreadey Jul 10, 2023
073c9ec
merge changes from master
jreadey Jul 10, 2023
80e74e2
optimzie ndarray_compare for non-vlen arrays
jreadey Jul 12, 2023
4b29aa6
fix flake8 error
jreadey Jul 12, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions admin/config/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ dn_port: 6101 # Start dn ports at 6101
dn_ram: 3g # memory for DN container (per container)
sn_port: 5101 # Start sn ports at 5101
sn_ram: 3g # memory for SN container
rangeget_port: 6900 # singleton proxy at port 6900
rangeget_ram: 2g # memory for RANGEGET container
target_sn_count: 0 # desired number of SN containers
target_dn_count: 0 # desire number of DN containers
log_level: INFO # log level. One of ERROR, WARNING, INFO, DEBUG
Expand Down Expand Up @@ -54,11 +52,6 @@ metadata_mem_cache_size: 128m # 128 MB - metadata cache size per DN node
metadata_mem_cache_expire: 3600 # expire cache items after one hour
chunk_mem_cache_size: 128m # 128 MB - chunk cache size per DN node
chunk_mem_cache_expire: 3600 # expire cache items after one hour
data_cache_size: 128m # cache for rangegets
data_cache_max_req_size: 128k # max size for rangeget fetches
data_cache_expire_time: 3600 # expire cache items after one hour
data_cache_page_size: 4m # page size for range get cache, set to zero to disable proxy
data_cache_max_concurrent_read: 16 # maximum number of inflight storage read requests
timeout: 30 # http timeout - 30 sec
password_file: /config/passwd.txt # filepath to a text file of username/passwords. set to '' for no-auth access
groups_file: /config/groups.txt # filepath to text file defining user groups
Expand Down Expand Up @@ -97,3 +90,10 @@ aws_lambda_gateway: null # use lambda endpoint for region HSDS is running in
k8s_app_label: null # The app label for k8s deployments (use k8s_dn_label_selector instead)
write_zero_chunks: False # write chunk to storage even when it's all zeros (or in general equal to the fill value)
max_chunks_per_request: 1000 # maximum number of chunks to be serviced by one request
rangeget_port: 6900 # singleton proxy at port 6900
rangeget_ram: 2g # memory for RANGEGET container
data_cache_size: 128m # cache for rangegets
data_cache_max_req_size: 128k # max size for rangeget fetches
data_cache_expire_time: 3600 # expire cache items after one hour
data_cache_page_size: 4m # page size for range get cache, set to zero to disable proxy
data_cache_max_concurrent_read: 16 # maximum number of inflight storage read requests
20 changes: 0 additions & 20 deletions admin/docker/docker-compose.aws.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ services:
- ${PWD}/admin/config/:/config/
links:
- head
- rangeget

sn:
image: hdfgroup/hsds
Expand All @@ -64,22 +63,3 @@ services:
links:
- head

rangeget:
image: hdfgroup/hsds
restart: ${RESTART_POLICY}
mem_limit: ${RANGEGET_RAM}
environment:
- NODE_TYPE=rangeget
- AWS_S3_GATEWAY=${AWS_S3_GATEWAY}
- AWS_REGION=${AWS_REGION}
- AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY}
- AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID}
- LOG_LEVEL=${LOG_LEVEL}
ports:
- ${RANGEGET_PORT}
depends_on:
- head
volumes:
- ${PWD}/admin/config/:/config/
links:
- head
20 changes: 1 addition & 19 deletions admin/docker/docker-compose.azure.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ services:
- ${PWD}/admin/config/:/config/
links:
- head
- rangeget

sn:
image: hdfgroup/hsds
Expand All @@ -53,21 +52,4 @@ services:
volumes:
- ${PWD}/admin/config/:/config/
links:
- head

rangeget:
image: hdfgroup/hsds
restart: ${RESTART_POLICY}
mem_limit: ${RANGEGET_RAM}
environment:
- NODE_TYPE=rangeget
- AZURE_CONNECTION_STRING=${AZURE_CONNECTION_STRING}
- LOG_LEVEL=${LOG_LEVEL}
ports:
- ${RANGEGET_PORT}
depends_on:
- head
volumes:
- ${PWD}/admin/config/:/config/
links:
- head
- head
19 changes: 0 additions & 19 deletions admin/docker/docker-compose.posix.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ services:
- ${PWD}/admin/config/:/config/
links:
- head
- rangeget
sn:
image: hdfgroup/hsds
restart: ${RESTART_POLICY}
Expand All @@ -53,21 +52,3 @@ services:
- ${PWD}/admin/config/:/config/
links:
- head

rangeget:
image: hdfgroup/hsds
restart: ${RESTART_POLICY}
mem_limit: ${RANGEGET_RAM}
environment:
- NODE_TYPE=rangeget
- ROOT_DIR=/data
- LOG_LEVEL=${LOG_LEVEL}
ports:
- ${RANGEGET_PORT}
depends_on:
- head
volumes:
- ${ROOT_DIR}:/data
- ${PWD}/admin/config/:/config/
links:
- head
20 changes: 0 additions & 20 deletions admin/docker/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ services:
- ${PWD}/admin/config/:/config/
links:
- head
- rangeget

sn:
image: hdfgroup/hsds
Expand All @@ -64,22 +63,3 @@ services:
links:
- head

rangeget:
image: hdfgroup/hsds
restart: ${RESTART_POLICY}
mem_limit: ${RANGEGET_RAM}
environment:
- NODE_TYPE=rangeget
- AWS_S3_GATEWAY=${AWS_S3_GATEWAY}
- AWS_REGION=${AWS_REGION}
- AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY}
- AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID}
- LOG_LEVEL=${LOG_LEVEL}
ports:
- ${RANGEGET_PORT}
depends_on:
- head
volumes:
- ${PWD}/admin/config/:/config/
links:
- head
4 changes: 0 additions & 4 deletions hsds/basenode.py
Original file line number Diff line number Diff line change
Expand Up @@ -628,10 +628,6 @@ def baseInit(node_type):

app["dn_urls"] = dn_urls
app["dn_ids"] = dn_ids
rangeget_url = config.getCmdLineArg("rangeget_url")
if rangeget_url:
log.debug(f"store rangeget_url: {rangeget_url}")
app["rangeget_url"] = rangeget_url

# check to see if we are running in a DCOS cluster
elif "IS_DOCKER" in os.environ:
Expand Down
30 changes: 22 additions & 8 deletions hsds/chunk_crawl.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,10 +202,27 @@ async def read_chunk_hyperslab(
# params["select"] = select
if "s3path" in chunk_info:
params["s3path"] = chunk_info["s3path"]

if "s3offset" in chunk_info:
params["s3offset"] = chunk_info["s3offset"]
s3offset = chunk_info["s3offset"]
if isinstance(s3offset, list):
# convert to a colon seperated string
s3offset = ":".join(map(str, s3offset))
params["s3offset"] = s3offset

if "s3size" in chunk_info:
params["s3size"] = chunk_info["s3size"]
s3size = chunk_info["s3size"]
if isinstance(s3size, list):
# convert to a colon seperated string
s3size = ":".join(map(str, s3size))
params["s3size"] = s3size

if "hyper_dims" in chunk_info:
hyper_dims = chunk_info["hyper_dims"]
if isinstance(hyper_dims, list):
# convert to colon seperated string
hyper_dims = ":".join(map(str, hyper_dims))
params["hyper_dims"] = hyper_dims

# set query-based params
if query is not None:
Expand Down Expand Up @@ -633,16 +650,13 @@ async def work(self):
start = time.time()
chunk_id = await self._q.get()
if self._limit > 0 and self._hits >= self._limit:
log.debug(
"ChunkCrawler - max hits exceeded, skipping fetch for chunk: {chunk_id}"
)
msg = f"ChunkCrawler - maxhits exceeded, skipping fetch for chunk: {chunk_id}"
log.debug(msg)
else:
dn_url = getDataNodeUrl(self._app, chunk_id)
if isUnixDomainUrl(dn_url):
# need a client per url for unix sockets
client = get_http_client(
self._app, url=dn_url, cache_client=True
)
client = get_http_client(self._app, url=dn_url, cache_client=True)
else:
# create a pool of clients and store the handles in the app dict
if client_name not in self._clients:
Expand Down
58 changes: 53 additions & 5 deletions hsds/chunk_dn.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,8 @@ async def GET_Chunk(request):
s3path = None
s3offset = None
s3size = None
hyper_dims = None
dims = None
query = None
limit = 0

Expand Down Expand Up @@ -306,25 +308,69 @@ async def GET_Chunk(request):
log.debug(f"GET_Chunk - using bucket: {bucket}")

if "s3offset" in params:
param_s3offset = params["s3offset"]
log.debug(f"s3offset param: {param_s3offset}")
try:
s3offset = int(params["s3offset"])
if param_s3offset.find(":") > 0:
# colon seperated index values, convert to list
s3offset = list(map(int, param_s3offset.split(":")))
else:
s3offset = int(param_s3offset)
except ValueError:
log.error(f"invalid s3offset params: {params['s3offset']}")
log.error(f"invalid s3offset params: {param_s3offset}")
raise HTTPBadRequest()
log.debug(f"s3offset: {s3offset}")

if "s3size" in params:
param_s3size = params["s3size"]
log.debug(f"s3size param: {param_s3size}")
try:
s3size = int(params["s3size"])
if param_s3size.find(":") > 0:
s3size = list(map(int, param_s3size.split(":")))
else:
s3size = int(param_s3size)
except ValueError:
log.error(f"invalid s3size params: {params['s3size']}")
log.error(f"invalid s3size params: {param_s3size}")
raise HTTPBadRequest()
log.debug(f"s3size: {s3size}")

if "hyper_dims" in params:
param_hyper_dims = params["hyper_dims"]
try:
if param_hyper_dims.find(":") > 0:
hyper_dims = list(map(int, param_hyper_dims.split(":")))
else:
hyper_dims = [int(param_hyper_dims), ]
except ValueError:
log.error(f"invalid hyper_dims params: {param_hyper_dims}")
raise HTTPBadRequest()
log.debug(f"hyper_dims: {hyper_dims}")

if "query" in params:
query = params["query"]
log.debug(f"got query: {query}")

if "Limit" in params:
limit = int(params["Limit"])
param_limit = params["Limit"]
log.debug(f"limit: {limit}")
try:
limit = int(param_limit)
except ValueError:
log.error(f"invalid Limit param: {param_limit}")
raise HTTPBadRequest()

if s3path:
# calculate how many chunk bytes we'll read
num_bytes = 0
if isinstance(s3size, int):
num_bytes = s3size
else:
# list
num_bytes = np.prod(s3size)
log.debug(f"reading {num_bytes} from {s3path}")
if num_bytes == 0:
log.warn(f"GET_Chunk for s3path: {s3path} with empty byte range")
raise HTTPNotFound()

dset_id = getDatasetId(chunk_id)

Expand Down Expand Up @@ -356,6 +402,8 @@ async def GET_Chunk(request):
kwargs["s3path"] = s3path
kwargs["s3offset"] = s3offset
kwargs["s3size"] = s3size
if hyper_dims:
kwargs["hyper_dims"] = hyper_dims
else:
kwargs["bucket"] = bucket

Expand Down
Loading