Skip to content

Commit

Permalink
get stage data from cnrfc deterministic
Browse files Browse the repository at this point in the history
get stage data from cnrfc deterministic

add keep_stage option for deterministic forecasts:

fix up deterministic

remove other changes'

remove extra change

remove print statements

fix default string

fix up stage data pull

rename argument to multiindex and add assertion
  • Loading branch information
klggill authored and CAbrahamMBK committed Nov 28, 2023
1 parent b185b1a commit 408f61e
Showing 1 changed file with 60 additions and 16 deletions.
76 changes: 60 additions & 16 deletions collect/cnrfc/cnrfc.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ def get_water_year_trend_tabular(cnrfc_id, water_year):
'downloaded': dt.datetime.now().strftime('%Y-%m-%d %H:%M')}}


def get_deterministic_forecast(cnrfc_id, truncate_historical=False, release=False):
def get_deterministic_forecast(cnrfc_id, truncate_historical=False, release=False, stage=False):
"""
Adapted from SAFCA portal project
---
Expand All @@ -173,6 +173,9 @@ def get_deterministic_forecast(cnrfc_id, truncate_historical=False, release=Fals
forecast_type = 'Release' if release else 'RVF'
flow_prefix = 'Release ' if release else ''

variable = f'{flow_prefix}Stage (Feet)' if stage else f'{flow_prefix}Flow (CFS)'
units = 'ft' if stage else 'cfs'

# default deterministic URL and index name
url = 'https://www.cnrfc.noaa.gov/graphical{0}_csv.php?id={1}'.format(forecast_type, cnrfc_id)
date_column_header = 'Valid Date/Time (Pacific)'
Expand Down Expand Up @@ -209,10 +212,9 @@ def get_deterministic_forecast(cnrfc_id, truncate_historical=False, release=Fals
df.index.name = 'PDT/PST'

# Trend value is null for first historical and first forecast entry; select forecast entry
first_ordinate = df.where(df['Trend'].isnull()).dropna(subset=[f'{flow_prefix}Flow (CFS)']).last_valid_index()
first_ordinate = df.where(df['Trend'].isnull()).dropna(subset=[variable]).last_valid_index()

# deterministic forecast inflow series
df['forecast'] = df.loc[(df.index >= first_ordinate), f'{flow_prefix}Flow (CFS)']
df['forecast'] = df.loc[(df.index >= first_ordinate), variable]

# optional limit for start of historical data (2 days before start of forecast)
if truncate_historical:
Expand All @@ -221,8 +223,8 @@ def get_deterministic_forecast(cnrfc_id, truncate_historical=False, release=Fals
else:
mask = True

# historical inflow series
df['historical'] = df.loc[(df['forecast'].isnull()) & mask][f'{flow_prefix}Flow (CFS)']
# historical series
df['historical'] = df.loc[(df['forecast'].isnull()) & mask][variable]

# additional issuance, plot-type information
time_issued, next_issue_time, title, plot_type = get_forecast_meta_deterministic(cnrfc_id)
Expand All @@ -234,11 +236,11 @@ def get_deterministic_forecast(cnrfc_id, truncate_historical=False, release=Fals
'first_ordinate': first_ordinate.strftime('%Y-%m-%d %H:%M'),
'issue_time': time_issued.strftime('%Y-%m-%d %H:%M'),
'next_issue': next_issue_time.strftime('%Y-%m-%d %H:%M'),
'units': 'cfs',
'units': units,
'downloaded': dt.datetime.now().strftime('%Y-%m-%d %H:%M')}}


def get_deterministic_forecast_watershed(watershed, date_string, acre_feet=False, pdt_convert=False, as_pdt=False, cnrfc_id=None):
def get_deterministic_forecast_watershed(watershed, date_string, acre_feet=False, pdt_convert=False, as_pdt=False, cnrfc_id=None, multiindex=False):
"""
from: https://www.cnrfc.noaa.gov/deterministicHourlyProductCSV.php
https://www.cnrfc.noaa.gov/csv/2019040318_american_csv_export.zip
Expand All @@ -250,6 +252,7 @@ def get_deterministic_forecast_watershed(watershed, date_string, acre_feet=False
pdt_convert (bool):
as_pdt (bool):
cnrfc_id (str):
multiindex (bool): return df with MultiIndex (column headers with 2 rows)
Returns:
(dict):
"""
Expand Down Expand Up @@ -286,24 +289,25 @@ def get_deterministic_forecast_watershed(watershed, date_string, acre_feet=False
date_string = stamp.strftime('%Y%m%d%H')
csvdata = _get_forecast_csv(url)

# parse forecast data from CSV
# Read in as MultiIndex for Stage Data
df = pd.read_csv(csvdata,
header=0,
skiprows=[1,],
header=[0, 1],
parse_dates=True,
index_col=0,
float_precision='high',
dtype={'GMT': str})

# filter watershed for single forecast point ensemble
if cnrfc_id is not None:
columns = [x for x in df.columns if cnrfc_id in x]
else:
columns = df.columns
df = df[[x for x in df.columns if cnrfc_id in x]]

# convert kcfs to cfs; optional timezone conversions and optional conversion to acre-feet
df, units = _apply_conversions(df, 'hourly', acre_feet, pdt_convert, as_pdt)

# Drop MultiIndex if not multindex (to preserve backwards compatibility)
if not multiindex:
df.columns = df.columns.droplevel(level=1)

# clean up
csvdata.close()

Expand All @@ -317,6 +321,44 @@ def get_deterministic_forecast_watershed(watershed, date_string, acre_feet=False
'units': units,
'downloaded': dt.datetime.now().strftime('%Y-%m-%d %H:%M')}}

def get_deterministic_forecast_watershed_stage(watershed, date_string, pdt_convert=False, as_pdt=False, cnrfc_id=None, multiindex=False):
"""
Return deterministic forecast for stage
from: https://www.cnrfc.noaa.gov/deterministicHourlyProductCSV.php
https://www.cnrfc.noaa.gov/csv/2019040318_american_csv_export.zip
Arguments:
watershed (str):
date_string (str):
acre_feet (bool):
pdt_convert (bool):
as_pdt (bool):
cnrfc_id (str):
multiindex (bool): return df with MultiIndex (column headers with 2 rows)
Returns:
(dict):
"""

results = get_deterministic_forecast_watershed(watershed,
date_string,
acre_feet=False,
pdt_convert=pdt_convert,
as_pdt=as_pdt,
cnrfc_id=cnrfc_id,
multiindex=True)

info = results['info']

df = results['data'].filter(regex='SSTG', axis=1)

assert not df.empty, 'no stage deterministic forecast'

if not multiindex:
df.columns = df.columns.droplevel(level=1)

info.update({'units': 'ft'})

return {'data': df, 'info': info}

def get_forecast_meta_deterministic(cnrfc_id, first_ordinate=False, release=False):
"""
Expand Down Expand Up @@ -770,8 +812,10 @@ def esp_trace_analysis_wrapper():

def _apply_conversions(df, duration, acre_feet, pdt_convert, as_pdt):

# convert kcfs/day to cfs/day
df = df * 1000.0
# convert kcfs/day to cfs/day (unless stage data)
filter_columns = [x for x in df.columns if x[1] != 'SSTG']
df[filter_columns] = df[filter_columns] * 1000.0

units = 'cfs'

if acre_feet:
Expand Down

0 comments on commit 408f61e

Please sign in to comment.