Skip to content

Commit

Permalink
refactor out weather model weighting into its own functions
Browse files Browse the repository at this point in the history
  • Loading branch information
jlmaurer committed Sep 22, 2023
1 parent a346dcb commit 004f3b0
Show file tree
Hide file tree
Showing 2 changed files with 206 additions and 122 deletions.
325 changes: 204 additions & 121 deletions tools/RAiDER/cli/raider.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,13 +253,11 @@ def calcDelays(iargs=None):
logger.warning('interp_method is not specified, defaulting to \'none\', i.e. nearest datetime for delay '
'calculation')

# Grab the closest two times unless the user specifies 'nearest' via 'none' or None.
# If the model time_delta is not specified then use 6
# The two datetimes will be combined to a single file and processed
# TODO: make more transparent control flow for GUNW and non-GUNW workflow
if (interp_method in ['none', 'center_time']):
times = get_nearest_wmtimes(t, [model.dtime() if \
model.dtime() is not None else 6][0]) if interp_method == 'center_time' else [t]
if (interp_method is not 'azimuth_time_grid'):
times = get_nearest_wmtimes(
t, [model.dtime() if model.dtime() is not None else 6][0]
) if interp_method == 'center_time' else [t]

elif interp_method == 'azimuth_time_grid':
step = model.dtime()
time_step_hours = step if step is not None else 6
Expand All @@ -278,7 +276,7 @@ def calcDelays(iargs=None):
wfiles.append(wfile)

# catch when requested datetime fails
except RuntimeError as re:
except RuntimeError:
continue

# catch when something else within weather model class fails
Expand All @@ -295,119 +293,8 @@ def calcDelays(iargs=None):
if dl_only:
continue


############################################################################
# Time interpolation
#
# Need to handle various cases, including if the exact weather model time is
# requested, or if one or more datetimes are not available from the weather
# model data provider
############################################################################

# Case 1 - No datetimes were retrieved
if len(wfiles) == 0:
logger.error('No weather model data was successfully processed.')
if len(params['date_list']) == 1:
raise RuntimeError
# skip date and continue processing if multiple dates are requested
else:
continue

# Case 2 - nearest weather model time is requested and retrieved
elif (interp_method == 'none') and (len(wfiles) == 1) and (len(times) == 1):
weather_model_file = wfiles[0]

# Case 3 - two times were requested (e.g., for interpolation) but only one was
# retrieved. In this case the code will complete but a warning is issued
# that only one time was available
elif len(wfiles)==1 and len(times)==2:
logger.warning('Time interpolation did not succeed, defaulting to nearest available date')
weather_model_file = wfiles[0]

# Case 4 - Time interpolation is requested, but if the requested time exactly matching
# the available (within _THRESHOLD_SECONDS), only a single time is returned and no
# interpolation is required
elif (interp_method == 'center_time') and len(times)==1:
logger.info('Requested time is provided exactly, will use only one weather model datetime')
weather_model_file = wfiles[0]

# Case 5 - two times requested for interpolation and two retrieved
elif (interp_method == 'center_time') and (len(wfiles) == 2):
ds1 = xr.open_dataset(wfiles[0])
ds2 = xr.open_dataset(wfiles[1])

# calculate relative weights of each dataset
date1 = datetime.datetime.strptime(ds1.attrs['datetime'], '%Y_%m_%dT%H_%M_%S')
date2 = datetime.datetime.strptime(ds2.attrs['datetime'], '%Y_%m_%dT%H_%M_%S')
wgts = [ 1 - get_dt(t, date1) / get_dt(date2, date1), 1 - get_dt(date2, t) / get_dt(date2, date1)]
try:
assert np.isclose(np.sum(wgts), 1)
except AssertionError:
logger.error('Time interpolation weights do not sum to one; something is off with query datetime: %s', t)
continue

# combine datasets
ds = ds1
for var in ['wet', 'hydro', 'wet_total', 'hydro_total']:
ds[var] = (wgts[0] * ds1[var]) + (wgts[1] * ds2[var])
ds.attrs['Date1'] = 0
ds.attrs['Date2'] = 0
weather_model_file = os.path.join(
os.path.dirname(wfiles[0]),
os.path.basename(wfiles[0]).split('_')[0] + '_' + t.strftime('%Y_%m_%dT%H_%M_%S') + '_timeInterp_' + '_'.join(wfiles[0].split('_')[-4:]),
)
ds.to_netcdf(weather_model_file)

# Case 6 - Azimuth grid time, should end up with three times in total
elif (interp_method == 'azimuth_time_grid') and (len(wfiles) == 3):
datasets = [xr.open_dataset(f) for f in wfiles]

# Each model will require some inspection here
# the subsequent s1 azimuth time grid requires dimension
# inputs to all have same dimensions and either be
# 1d or 3d.
if model._dataset in ['hrrr', 'hrrrak']:
lat_2d = datasets[0].latitude.data
lon_2d = datasets[0].longitude.data
z_1d = datasets[0].z.data
m, n, p = z_1d.shape[0], lat_2d.shape[0], lat_2d.shape[1]

lat = np.broadcast_to(lat_2d, (m, n, p))
lon = np.broadcast_to(lon_2d, (m, n, p))
hgt = np.broadcast_to(z_1d[:, None, None], (m, n, p))

else:
raise NotImplementedError('Azimuth Time is currently only implemented for HRRR')

time_grid = RAiDER.s1_azimuth_timing.get_s1_azimuth_time_grid(lon,
lat,
hgt,
# This is the acq time from loop
t)

if np.any(np.isnan(time_grid)):
raise ValueError('The azimuth time grid return nans meaning no orbit was downloaded.')
wgts = get_inverse_weights_for_dates(time_grid,
times,
temporal_window_hours=model._time_res)
# combine datasets
ds_out = datasets[0]
for var in ['wet', 'hydro', 'wet_total', 'hydro_total']:
ds_out[var] = sum([wgt * ds[var] for (wgt, ds) in zip(wgts, datasets)])
ds_out.attrs['Date1'] = 0
ds_out.attrs['Date2'] = 0
weather_model_file = os.path.join(
os.path.dirname(wfiles[0]),
# TODO: clean up
os.path.basename(wfiles[0]).split('_')[0] + '_' + t.strftime('%Y_%m_%dT%H_%M_%S') + '_timeInterpAziGrid_' + '_'.join(wfiles[0].split('_')[-4:]),
)
ds_out.to_netcdf(weather_model_file)

# Case 7 - Anything else errors out
else:
n = len(wfiles)
raise NotImplementedError(f'The {interp_method} with {n} retrieved weather model files was not well posed '
'for the the delay workflow.')
# Get the weather model file
weather_model_file = getWeatherFile(wfiles, params['date_list'], times, t, model._dataset, interp_method)

# Now process the delays
try:
Expand Down Expand Up @@ -696,3 +583,199 @@ def combineZTDFiles():
outName=args.out_name,
localTime=args.local_time
)


def getWeatherFile(wfiles, times, t, interp_method='none'):
'''
# Time interpolation
#
# Need to handle various cases, including if the exact weather model time is
# requested, or if one or more datetimes are not available from the weather
# model data provider
'''

# time interpolation method: number of expected files
ALLOWED_METHODS = {'none': 1, 'center_time': 2, 'azimuth_time_grid': 3}

Nfiles = len(wfiles)
Ntimes = len(times)

try:
Nfiles_expected = ALLOWED_METHODS[interp_method]
except KeyError:
raise ValueError('getWeatherFile: interp_method {} is not known'.format(interp_method))

Nmatch = (Nfiles_expected == Nfiles)
Tmatch = (Nfiles == Ntimes)

# Case 1: no files downloaded
if Nfiles==0:
logger.error('No weather model data was successfully processed.')
return None

# Case 2 - nearest weather model time is requested and retrieved
if (interp_method == 'none'):
weather_model_file = wfiles[0]


elif (interp_method == 'center_time'):

if Nmatch: # Case 3: two weather files downloaded
weather_model_file = combine_weather_files(
wfiles,
t,
interp_method='center_time'
)
elif Tmatch: # Case 4: Exact time is available without interpolation
logger.warning('Time interpolation is not needed as exact time is available')
weather_model_file = wfiles[0]
elif Nfiles == 1: # Case 5: one file does not download for some reason
logger.warning('getWeatherFile: One datetime is not available to download, defaulting to nearest available date')
weather_model_file = wfiles[0]
else:
raise RuntimeError('getWeatherFile: the number of files downloaded does not match the requested')

elif (interp_method) == 'azimuth_time_grid':

if Nmatch: # Case 6: all files downloaded
weather_model_file = combine_weather_files(
wfiles,
t,
interp_method='azimuth_time_grid'
)
else:
raise RuntimeError(
'getWeatherFile: the number of files downloaded ({})'
' does not match the requested ({}) using azimuth '
'time grid'.format(Nfiles, Nfiles_expected)
)

# Case 7 - Anything else errors out
else:
N = len(wfiles)
raise NotImplementedError(f'The {interp_method} with {N} retrieved weather model files was not well posed '
'for the current workflow.')

return weather_model_file


def combine_weather_files(wfiles, t, interp_method='center_time'):
'''Interpolate downloaded weather files and save to a single file'''

STYLE = {'center_time': '_timeInterp_', 'azimuth_time_grid': '_timeInterpAziGrid_'}

# read the individual datetime datasets
datasets = [xr.open_dataset(f) for f in wfiles]

# Pull the datetimes from the datasets
times = []
for ds in datasets:
times.append(datetime.datetime.strptime(ds.attrs['datetime'], '%Y_%m_%dT%H_%M_%S'))

model = datasets[0].attrs['model_name']

# calculate relative weights of each dataset
if interp_method == 'center_time':
wgts = get_weights_time_interp(times, t)
elif interp_method == 'azimuth_time_grid':
time_grid = get_time_grid_for_aztime_interp(datasets, t, model)
wgts = get_inverse_weights_for_dates(time_grid, times)

# combine datasets
ds_out = datasets[0]
for var in ['wet', 'hydro', 'wet_total', 'hydro_total']:
ds_out[var] = sum([wgt * ds[var] for (wgt, ds) in zip(wgts, datasets)])
ds_out.attrs['Date1'] = 0
ds_out.attrs['Date2'] = 0

# Give the weighted combination a new file name
weather_model_file = os.path.join(
os.path.dirname(wfiles[0]),
os.path.basename(wfiles[0]).split('_')[0] + '_' +
t.strftime('%Y_%m_%dT%H_%M_%S') + STYLE[interp_method] +
'_'.join(wfiles[0].split('_')[-4:]),
)

# write the combined results to disk
ds_out.to_netcdf(weather_model_file)

return weather_model_file


def combine_files_using_azimuth_time(wfiles, t, times):
'''Combine files using azimuth time interpolation'''

# read the individual datetime datasets
datasets = [xr.open_dataset(f) for f in wfiles]

# Pull the datetimes from the datasets
times = []
for ds in datasets:
times.append(datetime.datetime.strptime(ds.attrs['datetime'], '%Y_%m_%dT%H_%M_%S'))

model = datasets[0].attrs['model_name']

time_grid = get_time_grid_for_aztime_interp(datasets, times, t, model)

wgts = get_inverse_weights_for_dates(time_grid, times)

# combine datasets
ds_out = datasets[0]
for var in ['wet', 'hydro', 'wet_total', 'hydro_total']:
ds_out[var] = sum([wgt * ds[var] for (wgt, ds) in zip(wgts, datasets)])
ds_out.attrs['Date1'] = 0
ds_out.attrs['Date2'] = 0

# Give the weighted combination a new file name
weather_model_file = os.path.join(
os.path.dirname(wfiles[0]),
os.path.basename(wfiles[0]).split('_')[0] + '_' + t.strftime('%Y_%m_%dT%H_%M_%S') + '_timeInterpAziGrid_' + '_'.join(wfiles[0].split('_')[-4:]),
)

# write the combined results to disk
ds_out.to_netcdf(weather_model_file)

return weather_model_file


def get_weights_time_interp(times, t):
'''Calculate weights for time interpolation using simple inverse linear weighting'''
date1,date2 = times
wgts = [ 1 - get_dt(t, date1) / get_dt(date2, date1), 1 - get_dt(date2, t) / get_dt(date2, date1)]

try:
assert np.isclose(np.sum(wgts), 1)
except AssertionError:
logger.error('Time interpolation weights do not sum to one; something is off with query datetime: %s', t)
return None

return wgts


def get_time_grid_for_aztime_interp(datasets, t, model):
'''Calculate the time-varying grid for use with azimuth time interpolation'''

# Each model will require some inspection here
# the subsequent s1 azimuth time grid requires dimension
# inputs to all have same dimensions and either be
# 1d or 3d.
AZ_TIME_ALLOWED_MODELS = ['hrrr', 'hrrrak']

if model.lower() in AZ_TIME_ALLOWED_MODELS:
lat_2d = datasets[0].latitude.data
lon_2d = datasets[0].longitude.data
z_1d = datasets[0].z.data
m, n, p = z_1d.shape[0], lat_2d.shape[0], lat_2d.shape[1]

lat = np.broadcast_to(lat_2d, (m, n, p))
lon = np.broadcast_to(lon_2d, (m, n, p))
hgt = np.broadcast_to(z_1d[:, None, None], (m, n, p))

else:
raise NotImplementedError('Azimuth Time is currently only implemented for HRRR')

time_grid = get_s1_azimuth_time_grid(lon, lat, hgt, t) # This is the acq time from loop
if np.any(np.isnan(time_grid)):
raise ValueError('The Time Grid return nans meaning no orbit was downloaded.')

return time_grid
3 changes: 2 additions & 1 deletion tools/RAiDER/models/weatherModel.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def __init__(self):

self._classname = None
self._dataset = None
self._name = None
self._Name = None
self._wmLoc = None

self._model_level_type = 'ml'
Expand Down Expand Up @@ -748,6 +748,7 @@ def write(self):
"datetime": datetime.datetime.strftime(self._time, "%Y_%m_%dT%H_%M_%S"),
'date_created': datetime.datetime.now().strftime("%Y_%m_%dT%H_%M_%S"),
'title': 'Weather model data and delay calculations',
'model_name': self._Name

}

Expand Down

0 comments on commit 004f3b0

Please sign in to comment.