Skip to content

Commit

Permalink
combined era downloader classes
Browse files Browse the repository at this point in the history
  • Loading branch information
bnb32 committed Aug 18, 2023
1 parent 4c0d4a0 commit 3f056c6
Show file tree
Hide file tree
Showing 3 changed files with 154 additions and 165 deletions.
96 changes: 77 additions & 19 deletions sup3r/postprocessing/collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,22 @@ def get_slices(
raise RuntimeError(msg)

row_slice = slice(np.min(row_loc), np.max(row_loc) + 1)
col_slice = slice(np.min(col_loc), np.max(col_loc) + 1)

msg = (
f'row_slice={row_slice} conflict with row_indices={row_loc}. '
'Indices do not seem to be increasing and/or contiguous.'
)
assert (row_slice.stop - row_slice.start) == len(row_loc), msg

msg = (
f'col_slice={col_slice} conflict with col_indices={col_loc}. '
'Indices do not seem to be increasing and/or contiguous.'
)
check = (col_slice.stop - col_slice.start) == len(col_loc)
if not check:
logger.warning(msg)
warn(msg)

return row_slice, col_loc

Expand Down Expand Up @@ -466,6 +482,61 @@ def get_collection_attrs(

return time_index, target_final_meta, masked_meta, shape, global_attrs

def _write_flist_data(
self,
out_file,
feature,
time_index,
subset_masked_meta,
target_masked_meta,
):
"""Write spatiotemporal file list data to output file for given
feature
Parameters
----------
out_file : str
Name of output file
feature : str
Name of feature for output chunk
time_index : pd.DateTimeIndex
Time index for corresponding file list data
subset_masked_meta : pd.DataFrame
Meta for corresponding file list data
target_masked_meta : pd.DataFrame
Meta for full output file
"""
with RexOutputs(out_file, mode='r') as f:
target_ti = f.time_index
y_write_slice, x_write_slice = Collector.get_slices(
target_ti,
target_masked_meta,
time_index,
subset_masked_meta,
)
Collector._ensure_dset_in_output(out_file, feature)

with RexOutputs(out_file, mode='a') as f:
try:
f[feature, y_write_slice, x_write_slice] = self.data
except Exception as e:
msg = (
f'Problem with writing data to {out_file} with '
f't_slice={y_write_slice}, '
f's_slice={x_write_slice}. {e}'
)
logger.error(msg)
raise OSError(msg) from e

logger.debug(
'Finished writing "{}" for row {} and col {} to: {}'.format(
feature,
y_write_slice,
x_write_slice,
os.path.basename(out_file),
)
)

def _collect_flist(
self,
feature,
Expand Down Expand Up @@ -587,25 +658,12 @@ def _collect_flist(
msg += f'{futures[future]}'
logger.exception(msg)
raise RuntimeError(msg) from e
with RexOutputs(out_file, mode='r') as f:
target_ti = f.time_index
y_write_slice, x_write_slice = Collector.get_slices(
target_ti,
target_masked_meta,
time_index,
subset_masked_meta,
)
Collector._ensure_dset_in_output(out_file, feature)
with RexOutputs(out_file, mode='a') as f:
f[feature, y_write_slice, x_write_slice] = self.data

logger.debug(
'Finished writing "{}" for row {} and col {} to: {}'.format(
feature,
y_write_slice,
x_write_slice,
os.path.basename(out_file),
)
self._write_flist_data(
out_file,
feature,
time_index,
subset_masked_meta,
target_masked_meta,
)
else:
msg = (
Expand Down
9 changes: 5 additions & 4 deletions sup3r/preprocessing/data_handling.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime as dt
from fnmatch import fnmatch
from typing import ClassVar

import numpy as np
import pandas as pd
Expand Down Expand Up @@ -2114,7 +2115,7 @@ def lin_bc(self, bc_files, threshold=0.1):
class DataHandlerNC(DataHandler):
"""Data Handler for NETCDF data"""

CHUNKS = {
CHUNKS: ClassVar[dict] = {
'XTIME': 100,
'XLAT': 150,
'XLON': 150,
Expand Down Expand Up @@ -2649,7 +2650,7 @@ def get_raster_index(self):
class DataHandlerNCforCC(DataHandlerNC):
"""Data Handler for NETCDF climate change data"""

CHUNKS = {'time': 5, 'lat': 20, 'lon': 20}
CHUNKS: ClassVar[dict] = {'time': 5, 'lat': 20, 'lon': 20}
"""CHUNKS sets the chunk sizes to extract from the data in each dimension.
Chunk sizes that approximately match the data volume being extracted
typically results in the most efficient IO."""
Expand Down Expand Up @@ -3327,8 +3328,8 @@ def __init__(self, *args, **kwargs):
'Cannot initialize DataHandlerH5SolarCC without required '
'features {}. All three are necessary to get the daily '
'average clearsky ratio (ghi sum / clearsky ghi sum), even '
'though only the clearsky ratio will be passed to the GAN.'
.format(required)
'though only the clearsky ratio will be passed to the '
'GAN.'.format(required)
)
logger.error(msg)
raise KeyError(msg)
Expand Down
Loading

0 comments on commit 3f056c6

Please sign in to comment.