Skip to content

Commit

Permalink
Fetch usgs without cache or error and grdc return dataset (#429)
Browse files Browse the repository at this point in the history
* Fetch usgs without cache or error

Fixes #414
Fixes #240
Fixes #209

* Make get_grdc_data return an xarray dataset

As xarray/netcdf has official support for attrs while pandas attrs are experimental.

Fix #253

* Fix test

* Lower case

* Make get_grdc_data return netcdf in same shape as you can download from https://portal.grdc.bafg.de/

* Make consistent

* Use datetime + massage time coordinate into datetime64[ns] + Added test for usgs xarray conversion

* Update example

* replace try with if + test else
  • Loading branch information
sverhoeven committed Jul 5, 2024
1 parent 98f3162 commit 01564ee
Show file tree
Hide file tree
Showing 8 changed files with 710 additions and 263 deletions.
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,15 @@ Formatted as described on [https://keepachangelog.com](https://keepachangelog.co
- `ewatercycle.esmvaltool.search.search_esgf` can now be used to find climate model ensembles on ESGF that have the required input variables for generating forcing data ([#422](https://github.com/eWaterCycle/ewatercycle/pull/422)).
- `ewatercycle.observation.caravan.get_caravan_data()` ([#432](https://github.com/eWaterCycle/ewatercycle/issues/432))

### Fixed

- `get_usgs_data()` throws error ([#414](https://github.com/eWaterCycle/ewatercycle/issues/414))
- `get_usgs_data()` and 1get_grdc_data()` both return xarray.Dataset ([#253](https://github.com/eWaterCycle/ewatercycle/issues/253))

### Removed

- Caching mechanism from `get_usgs_data()` ([#240](https://github.com/eWaterCycle/ewatercycle/issues/240))

## [2.1.1] (2024-06-03)

### Added
Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,12 +93,12 @@ cfg_file, cfg_dir = model.setup(

model.initialize(cfg_file)

observations_df, station_info = ewatercycle.observation.grdc.get_grdc_data(
observations_df = ewatercycle.observation.grdc.get_grdc_data(
station_id=4147380,
start_time=model.start_time_as_isostr,
end_time=model.end_time_as_isostr,
column='observation',
)
).observation.to_dataframe()

simulated_discharge = []
timestamps = []
Expand Down
2 changes: 1 addition & 1 deletion docs/observations.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ The eWaterCycle platform supports observations relevant for calibrating and vali
USGS
----

The `U.S. Geological Survey Water Services <https://waterservices.usgs.gov/>`_ provides public discharge data for a large number of US based stations. In eWaterCycle we make use of the `USGS web service <https://waterservices.usgs.gov/test-tools/?service=iv>`_ to automatically retrieve this data.
The `U.S. Geological Survey Water Services <https://waterservices.usgs.gov/>`_ provides public discharge data for a large number of US based stations. In eWaterCycle (:py:func:`ewatercycle.observation.usgs.get_usgs_data`) we make use of the `USGS web service <https://waterservices.usgs.gov/test-tools/?service=iv>`_ to automatically retrieve this data.
The Discharge timestamp is corrected to the UTC timezone. Units are converted from cubic feet per second to cubic meter per second.

GRDC
Expand Down
6 changes: 3 additions & 3 deletions docs/user_guide/03_models_obs_analysis.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -605,14 +605,14 @@
"source": [
"grdc_station_id = \"6335020\"\n",
"\n",
"observations, metadata = ewatercycle.observation.grdc.get_grdc_data(\n",
"observations = ewatercycle.observation.grdc.get_grdc_data(\n",
" station_id=grdc_station_id,\n",
" start_time=\"1990-01-01T00:00:00Z\", # or: model_instance.start_time_as_isostr\n",
" end_time=\"1990-12-15T00:00:00Z\",\n",
" column=\"GRDC\",\n",
")\n",
"\n",
"observations.head()"
"observations.GRDC.to_dataframe().head()"
]
},
{
Expand All @@ -639,7 +639,7 @@
}
],
"source": [
"print(metadata)"
"print(observations.attrs)"
]
},
{
Expand Down
261 changes: 163 additions & 98 deletions src/ewatercycle/observation/grdc.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
"""Global Runoff Data Centre module."""

import logging
import os
from typing import Dict, Optional, Tuple, Union
from typing import Dict, Optional, Union

import pandas as pd
import xarray as xr
from numpy import nan

from ewatercycle import CFG
from ewatercycle.util import get_time, to_absolute_path
Expand All @@ -17,15 +20,18 @@ def get_grdc_data(
station_id: str,
start_time: str,
end_time: str,
parameter: str = "Q",
data_home: Optional[str] = None,
column: str = "streamflow",
) -> Tuple[pd.core.frame.DataFrame, MetaDataType]:
) -> xr.Dataset:
"""Get river discharge data from Global Runoff Data Centre (GRDC).
Requires the GRDC daily data files in a local directory. The GRDC daily data
files can be ordered at
https://www.bafg.de/GRDC/EN/02_srvcs/21_tmsrs/riverdischarge_node.html
NetCDF file can be downloaded at
https://www.bafg.de/GRDC/EN/02_srvcs/21_tmsrs/riverdischarge_node.html .
The downloaded zip file contains a file named GRDC-Daily.nc.
This function will first try to read data from the GRDC-Daily.nc file in the ``data_home`` directory.
If that fails it will look for the GRDC Export (ASCII text) formatted file for example ``6435060_Q_Day.Cmd.txt``.
Args:
station_id: The station id to get. The station id can be found in the
Expand All @@ -35,55 +41,50 @@ def get_grdc_data(
'YYYY-MM-DDTHH:MM:SSZ'.
end_time: End time of model in UTC and ISO format string e.g.
'YYYY-MM-DDTHH:MM:SSZ'.
parameter: optional. The parameter code to get, e.g. ('Q') discharge,
cubic meters per second.
data_home : optional. The directory where the daily grdc data is
located. If left out will use the grdc_location in the eWaterCycle
configuration file.
column: optional. Name of column in dataframe. Default: "streamflow".
Returns:
grdc data in a dataframe and metadata.
grdc data in a xarray dataset. Shaped like a filtered version of the GRDC daily NetCDF file.
Raises:
ValueError: If no data for the requested station id and period could not be found.
Examples:
.. code-block:: python
from ewatercycle.observation.grdc import get_grdc_data
df, meta = get_grdc_data('6335020',
'2000-01-01T00:00Z',
'2001-01-01T00:00Z')
df.describe()
streamflow
count 4382.000000
mean 2328.992469
std 1190.181058
min 881.000000
25% 1550.000000
50% 2000.000000
75% 2730.000000
max 11300.000000
meta
{'grdc_file_name': '/home/myusername/git/eWaterCycle/ewatercycle/6335020_Q_Day.Cmd.txt',
'id_from_grdc': 6335020,
'file_generation_date': '2019-03-27',
'river_name': 'RHINE RIVER',
'station_name': 'REES',
'country_code': 'DE',
'grdc_latitude_in_arc_degree': 51.756918,
'grdc_longitude_in_arc_degree': 6.395395,
'grdc_catchment_area_in_km2': 159300.0,
'altitude_masl': 8.0,
'dataSetContent': 'MEAN DAILY DISCHARGE (Q)',
'units': 'm³/s',
'time_series': '1814-11 - 2016-12',
'no_of_years': 203,
'last_update': '2018-05-24',
'nrMeasurements': 'NA',
'UserStartTime': '2000-01-01T00:00Z',
'UserEndTime': '2001-01-01T00:00Z',
'nrMissingData': 0}
ds = get_grdc_data('6435060',
'2000-01-01T00:00Z',
'2001-01-01T00:00Z')
ds
<xarray.Dataset> Size: 5kB
Dimensions: (time: 367)
Coordinates:
* time (time) datetime64[ns] 3kB 2000-01-01 ... 2001-01-01
id int32 4B 6435060
Data variables:
streamflow (time) float32 1kB ...
area float32 4B ...
country <U2 8B ...
geo_x float32 4B ...
geo_y float32 4B ...
geo_z float32 4B ...
owneroforiginaldata <U38 152B ...
river_name <U11 44B 'RHINE RIVER'
station_name <U6 24B 'LOBITH'
timezone float32 4B ...
Attributes:
title: Mean daily discharge (Q)
Conventions: CF-1.7
references: grdc.bafg.de
institution: GRDC
history: Download from GRDC Database, 21/06/2024
missing_value: -999.000
""" # noqa: E501
if data_home:
data_path = to_absolute_path(data_home)
Expand All @@ -98,30 +99,133 @@ def get_grdc_data(
if not data_path.exists():
raise ValueError(f"The grdc directory {data_path} does not exist!")

# Read the raw data
raw_file = data_path / f"{station_id}_{parameter}_Day.Cmd.txt"
# Read the NetCDF file
nc_file = data_path / "GRDC-Daily.nc"
station_not_in_nc = False
if nc_file.exists():
ds = xr.open_dataset(nc_file)
if int(station_id) in ds["id"]:
ds = ds.sel(
id=int(station_id),
time=slice(get_time(start_time).date(), get_time(end_time).date()),
)
return ds.rename({"runoff_mean": column})
station_not_in_nc = True

# Read the text data
raw_file = data_path / f"{station_id}_Q_Day.Cmd.txt"
if not raw_file.exists():
raise ValueError(f"The grdc file {raw_file} does not exist!")
if nc_file.exists() and station_not_in_nc:
raise ValueError(
f"The grdc station {station_id} is not in the {nc_file} file and {raw_file} does not exist!" # noqa: E501
)
else:
raise ValueError(f"The grdc file {raw_file} does not exist!")

# Convert the raw data to an xarray
# Convert the raw data to an dataframe
metadata, df = _grdc_read(
raw_file,
start=get_time(start_time).date(),
end=get_time(end_time).date(),
column=column,
)

# Add start/end_time to metadata
metadata["UserStartTime"] = start_time
metadata["UserEndTime"] = end_time

# Add number of missing data to metadata
metadata["nrMissingData"] = _count_missing_data(df, column)

# Shpw info about data
_log_metadata(metadata)
ds = xr.Dataset.from_dict(
{
"coords": {
"time": {
"dims": ("time",),
"attrs": {"long_name": "time"},
"data": df.index.values,
},
"id": {
"dims": (),
"attrs": {"long_name": "grdc number"},
"data": int(station_id),
},
},
"dims": {
"time": len(df.index),
},
"attrs": {
"title": metadata["dataSetContent"],
"Conventions": "CF-1.7",
"references": "grdc.bafg.de",
"institution": "GRDC",
"history": f"Converted from {raw_file.name} of {metadata['file_generation_date']} to netcdf by eWaterCycle Python package",
"missing_value": "-999.000",
},
"data_vars": {
column: {
"dims": ("time",),
"attrs": {"units": "m3/s", "long_name": "Mean daily discharge (Q)"},
"data": df[column].values,
},
"area": {
"dims": (),
"attrs": {"units": "km2", "long_name": "catchment area"},
"data": metadata["grdc_catchment_area_in_km2"],
},
"country": {
"dims": (),
"attrs": {
"long_name": "country name",
"iso2": "ISO 3166-1 alpha-2 - two-letter country code",
},
"data": metadata["country_code"],
},
"geo_x": {
"dims": (),
"attrs": {
"units": "degree_east",
"long_name": "station longitude (WGS84)",
},
"data": metadata["grdc_longitude_in_arc_degree"],
},
"geo_y": {
"dims": (),
"attrs": {
"units": "degree_north",
"long_name": "station latitude (WGS84)",
},
"data": metadata["grdc_latitude_in_arc_degree"],
},
"geo_z": {
"dims": (),
"attrs": {
"units": "m",
"long_name": "station altitude (m above sea level)",
},
"data": metadata["altitude_masl"],
},
"owneroforiginaldata": {
"dims": (),
"attrs": {"long_name": "Owner of original data"},
"data": metadata["Owner of original data"],
},
"river_name": {
"dims": (),
"attrs": {"long_name": "river name"},
"data": metadata["river_name"],
},
"station_name": {
"dims": (),
"attrs": {"long_name": "station name"},
"data": metadata["station_name"],
},
"timezone": {
"dims": (),
"attrs": {
"units": "00:00",
"long_name": "utc offset, in relation to the national capital",
},
"data": nan,
},
},
}
)

return df, metadata
return ds


def _grdc_read(grdc_station_path, start, end, column):
Expand Down Expand Up @@ -166,7 +270,6 @@ def _grdc_metadata_reader(grdc_station_path, all_lines):
# that function was based on earlier work by Edwin Sutanudjaja
# from Utrecht University.
# https://github.com/edwinkost/discharge_analysis_IWMI
# Modified by Susan Branchett

# initiating a dictionary that will contain all GRDC attributes:
attribute_grdc = {}
Expand Down Expand Up @@ -257,48 +360,10 @@ def _grdc_metadata_reader(grdc_station_path, all_lines):
attribute_grdc["units"] = "NA"

try:
attribute_grdc["time_series"] = str(all_lines[23].split(":")[1].strip())
except (IndexError, ValueError):
attribute_grdc["time_series"] = "NA"

try:
attribute_grdc["no_of_years"] = int(all_lines[24].split(":")[1].strip())
except (IndexError, ValueError):
attribute_grdc["no_of_years"] = "NA"

try:
attribute_grdc["last_update"] = str(all_lines[25].split(":")[1].strip())
except (IndexError, ValueError):
attribute_grdc["last_update"] = "NA"

try:
attribute_grdc["nrMeasurements"] = int(
str(all_lines[33].split(":")[1].strip())
attribute_grdc["Owner of original data"] = (
all_lines[18].split(":")[1].strip()
)
except (IndexError, ValueError):
attribute_grdc["nrMeasurements"] = "NA"
attribute_grdc["Owner of original data"] = "Unknown"

return attribute_grdc


def _count_missing_data(df, column):
"""Return number of missing data."""
return df[column].isna().sum()


def _log_metadata(metadata):
"""Print some information about data."""
coords = (
metadata["grdc_latitude_in_arc_degree"],
metadata["grdc_longitude_in_arc_degree"],
)
message = (
f"GRDC station {metadata['id_from_grdc']} is selected. "
f"The river name is: {metadata['river_name']}."
f"The coordinates are: {coords}."
f"The catchment area in km2 is: {metadata['grdc_catchment_area_in_km2']}. "
f"There are {metadata['nrMissingData']} missing values during "
f"{metadata['UserStartTime']}_{metadata['UserEndTime']} at this station. "
f"See the metadata for more information."
)
logger.info("%s", message)
Loading

0 comments on commit 01564ee

Please sign in to comment.