Skip to content

Commit

Permalink
IOC: Add API support
Browse files Browse the repository at this point in the history
  • Loading branch information
tomsail committed Aug 4, 2023
1 parent 07ab9ed commit 903f3be
Show file tree
Hide file tree
Showing 2 changed files with 273 additions and 0 deletions.
269 changes: 269 additions & 0 deletions searvey/ioc.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
For the IOC stations we parse their website_.
:: _website: http://www.ioc-sealevelmonitoring.org/list.php?showall=all
:: _api: http://www.ioc-sealevelmonitoring.org/service.php?query=stationlist
This page contains 3 tables:
Expand All @@ -14,6 +15,7 @@
from __future__ import annotations

import functools
import json
import logging
import warnings
from typing import Optional
Expand All @@ -38,6 +40,7 @@
from .utils import get_region
from .utils import merge_datasets
from .utils import NOW
from .utils import resolve_start_date
from .utils import resolve_timestamp


Expand All @@ -47,6 +50,8 @@
IOC_RATE_LIMIT = limits.parse("5/second")
IOC_MAX_DAYS_PER_REQUEST = 30
IOC_BASE_URL = "http://www.ioc-sealevelmonitoring.org/bgraph.php?code={ioc_code}&output=tab&period={period}&endtime={endtime}"
IOC_BASE_URL_API = "https://www.ioc-sealevelmonitoring.org/service.php?query=data&format=json&code={ioc_code}&timestart={starttime}&timestop={endtime}"

IOC_STATIONS_KWARGS = [
{"output": "general", "skip_table_rows": 4},
{"output": "contacts", "skip_table_rows": 4},
Expand Down Expand Up @@ -97,6 +102,43 @@
"view",
],
}
IOC_STATIONS_COLUMN_NAMES_API = {
"general": [
"ioc_code",
"GlossID",
"country",
"Location",
"connect",
"DCP_ID",
"last_observation_level",
"last_observation_time",
"delay",
"interval",
"view",
],
"contacts": [
"ioc_code",
"GlossID",
"lat",
"lon",
"country",
"Location",
"connect",
"contacts",
],
"performance": [
"ioc_code",
"GlossID",
"country",
"Location",
"connect",
"added_to_system",
"observations",
"sample_interval",
"average_delay_per_day",
"transmit_interval",
],
}
IOC_STATION_DATA_COLUMNS_TO_DROP = [
"bat",
"sw1",
Expand Down Expand Up @@ -147,6 +189,17 @@ def get_ioc_stations_by_output(output: str, skip_table_rows: int) -> pd.DataFram
return df


def get_ioc_stations_api_request() -> pd.DataFrame:
url = "http://www.ioc-sealevelmonitoring.org/service.php?query=stationlist"
logger.debug("Downloading: %s", url)
response = requests.get(url)
assert response.ok, f"failed to download: {url}"
logger.debug("Downloaded: %s", url)
di = json.loads(response.content)
df = pd.DataFrame(di).rename(columns={"Code": "ioc_code"})
return df


def normalize_ioc_stations(df: pd.DataFrame) -> gpd.GeoDataFrame:
df = df.assign(
# fmt: off
Expand All @@ -163,6 +216,20 @@ def normalize_ioc_stations(df: pd.DataFrame) -> gpd.GeoDataFrame:
return gdf


def normalize_ioc_stations_api(df: pd.DataFrame) -> gpd.GeoDataFrame:
df = df.assign(
# fmt: off
GlossID=df.GlossID.astype(pd.Int64Dtype()),
observations_ratio_per_month=df.observations,
# fmt: on
)
gdf = gpd.GeoDataFrame(
data=df,
geometry=gpd.points_from_xy(df.lon, df.lat, crs="EPSG:4326"),
)
return gdf


@functools.lru_cache
def _get_ioc_stations() -> gpd.GeoDataFrame:
"""
Expand All @@ -183,6 +250,18 @@ def _get_ioc_stations() -> gpd.GeoDataFrame:
return ioc_stations


@functools.lru_cache
def _get_ioc_stations_api() -> gpd.GeoDataFrame:
"""
Return IOC station metadata from: http://www.ioc-sealevelmonitoring.org/service.php?query=stationlist
:return: ``pandas.DataFrame`` with the station metadata
"""
ioc_stations = get_ioc_stations_api_request()
ioc_stations = normalize_ioc_stations_api(ioc_stations)
return ioc_stations


def get_ioc_stations(
region: Optional[Union[Polygon, MultiPolygon]] = None,
lon_min: Optional[float] = None,
Expand Down Expand Up @@ -222,6 +301,45 @@ def get_ioc_stations(
return ioc_stations


def get_ioc_stations_api(
region: Optional[Union[Polygon, MultiPolygon]] = None,
lon_min: Optional[float] = None,
lon_max: Optional[float] = None,
lat_min: Optional[float] = None,
lat_max: Optional[float] = None,
) -> gpd.GeoDataFrame:
"""
Return IOC station metadata from: http://www.ioc-sealevelmonitoring.org/service.php?query=stationlist
If `region` is defined then the stations that are outside of the region are
filtered out.. If the coordinates of the Bounding Box are defined then
stations outside of the BBox are filtered out. If both ``region`` and the
Bounding Box are defined, then an exception is raised.
Note: The longitudes of the IOC stations are in the [-180, 180] range.
:param region: ``Polygon`` or ``MultiPolygon`` denoting region of interest
:param lon_min: The minimum Longitude of the Bounding Box.
:param lon_max: The maximum Longitude of the Bounding Box.
:param lat_min: The minimum Latitude of the Bounding Box.
:param lat_max: The maximum Latitude of the Bounding Box.
:return: ``pandas.DataFrame`` with the station metadata
"""
region = get_region(
region=region,
lon_min=lon_min,
lon_max=lon_max,
lat_min=lat_min,
lat_max=lat_max,
symmetric=True,
)

ioc_stations = _get_ioc_stations_api()
if region:
ioc_stations = ioc_stations[ioc_stations.within(region)]
return ioc_stations


def normalize_ioc_station_data(ioc_code: str, df: pd.DataFrame, truncate_seconds: bool) -> pd.DataFrame:
# Each station may have more than one sensors.
# Some of the sensors have nothing to do with sea level height. We drop these sensors
Expand Down Expand Up @@ -249,6 +367,33 @@ def normalize_ioc_station_data(ioc_code: str, df: pd.DataFrame, truncate_seconds
return df


def normalize_ioc_station_data_api(ioc_code: str, df: pd.DataFrame, truncate_seconds: bool) -> pd.DataFrame:
# Each station may have more than one sensors.
# Some of the sensors have nothing to do with sea level height. We drop these sensors
df = df.rename(columns=IOC_STATION_DATA_COLUMNS)
logger.debug("%s: df contains the following columns: %s", ioc_code, df.columns)
df = df.drop(columns=IOC_STATION_DATA_COLUMNS_TO_DROP, errors="ignore")
if len(df.columns) == 1:
# all the data columns have been dropped!
msg = f"{ioc_code}: The table does not contain any sensor data!"
logger.info(msg)
raise ValueError(msg)
df = df.assign(
ioc_code=ioc_code,
time=pd.to_datetime(df.time),
)
if truncate_seconds:
# Truncate seconds from timestamps: https://stackoverflow.com/a/28783971/592289
# WARNING: This can potentially lead to duplicates!
df = df.assign(time=df.time.dt.floor("min"))
if df.time.duplicated().any():
# There are duplicates. Keep the first datapoint per minute.
msg = f"{ioc_code}: Duplicate timestamps have been detected after the truncation of seconds. Keeping the first datapoint per minute"
warnings.warn(msg)
df = df.iloc[df.time.drop_duplicates().index].reset_index(drop=True)
return df


def get_ioc_station_data(
ioc_code: str,
endtime: DateTimeLike = NOW,
Expand Down Expand Up @@ -283,6 +428,41 @@ def get_ioc_station_data(
return df


def get_ioc_station_data_api(
ioc_code: str,
endtime: DateTimeLike = NOW,
period: float = IOC_MAX_DAYS_PER_REQUEST,
truncate_seconds: bool = True,
rate_limit: Optional[RateLimit] = None,
) -> pd.DataFrame:
"""Retrieve the TimeSeries of a single IOC station."""

if rate_limit:
while rate_limit.reached(identifier="IOC"):
wait()

endtime = resolve_timestamp(endtime)
starttime = resolve_start_date(endtime, period)
url = IOC_BASE_URL_API.format(
ioc_code=ioc_code, starttime=starttime.isoformat(), endtime=endtime.isoformat()
)
logger.info("%s: Retrieving data from: %s", ioc_code, url)
try:
response = requests.get(url)
assert response.ok, f"failed to download: {url}"
logger.debug("Downloaded: %s", url)
di = json.loads(response.content)
df = pd.DataFrame(di).rename(columns={"stime": "time", "Code": "ioc_code"})
except ValueError as exc:
if str(exc) == "No tables found":
logger.info("%s: No data", ioc_code)
else:
logger.exception("%s: Something went wrong", ioc_code)
raise
df = normalize_ioc_station_data_api(ioc_code=ioc_code, df=df, truncate_seconds=truncate_seconds)
return df


def get_ioc_data(
ioc_metadata: pd.DataFrame,
endtime: DateTimeLike = NOW,
Expand Down Expand Up @@ -370,3 +550,92 @@ def get_ioc_data(
# Do the final merging
ds = xr.merge(datasets)
return ds


def get_ioc_data_api(
ioc_metadata: pd.DataFrame,
endtime: DateTimeLike = NOW,
period: float = 1, # one day
truncate_seconds: bool = True,
rate_limit: RateLimit = RateLimit(),
disable_progress_bar: bool = False,
) -> xr.Dataset:
"""
Return the data of the stations specified in ``ioc_metadata`` as an ``xr.Dataset``.
``truncate_seconds`` needs some explaining. IOC has more than 1000 stations.
When you retrieve data from all (or at least most of) these stations, you
end up with thousands of timestamps that only contain a single datapoint.
This means that the returned ``xr.Dataset`` will contain a huge number of ``NaN``
which means that you will need a huge amount of RAM.
In order to reduce the amount of the required RAM we reduce the number of timestamps
by truncating the seconds. This is how this works:
2014-01-03 14:53:02 -> 2014-01-03 14:53:00
2014-01-03 14:53:32 -> 2014-01-03 14:53:00
2014-01-03 14:53:48 -> 2014-01-03 14:53:00
2014-01-03 14:54:09 -> 2014-01-03 14:54:00
2014-01-03 14:54:48 -> 2014-01-03 14:54:00
Nevertheless this approach has a downside. If a station returns multiple datapoints
within the same minute, then we end up with duplicate timestamps. When this happens
we only keep the first datapoint and drop the subsequent ones. So potentially you
may not retrieve all of the available data.
If you don't want this behavior, set ``truncate_seconds`` to ``False`` and you
will retrieve the full data.
:param ioc_metadata: A ``pd.DataFrame`` returned by ``get_ioc_stations``
:param endtime: The date of the "end" of the data. Defaults to ``datetime.date.today()``
:param period: The number of days to be requested. IOC does not support values greater than 30
:param truncate_seconds: If ``True`` then timestamps are truncated to minutes (seconds are dropped)
:param rate_limit: The default rate limit is 5 requests/second.
:param disable_progress_bar: If ``True`` then the progress bar is not displayed.
"""
if period > IOC_MAX_DAYS_PER_REQUEST:
msg = (
f"Unsupported period. Please choose a period smaller than {IOC_MAX_DAYS_PER_REQUEST}: {period}"
)
raise ValueError(msg)

func_kwargs = []
for ioc_code in ioc_metadata.ioc_code:
func_kwargs.append(
dict(
period=period,
endtime=endtime,
ioc_code=ioc_code,
rate_limit=rate_limit,
truncate_seconds=truncate_seconds,
),
)

results = multithread(
func=get_ioc_station_data_api,
func_kwargs=func_kwargs,
n_workers=5,
print_exceptions=False,
disable_progress_bar=disable_progress_bar,
)

datasets = []
for result in results:
if result.result is not None:
df = result.result
meta = ioc_metadata[ioc_metadata.ioc_code == result.kwargs["ioc_code"]] # type: ignore[index]
ds = df.set_index(["ioc_code", "time"]).to_xarray()
ds["lon"] = ("ioc_code", meta.lon.unique())
ds["lat"] = ("ioc_code", meta.lat.unique())
ds["country"] = ("ioc_code", meta.country.unique())
ds["Location"] = ("ioc_code", meta.Location.unique())
datasets.append(ds)

# in order to keep memory consumption low, let's group the datasets
# and merge them in batches
while len(datasets) > 5:
datasets = merge_datasets(datasets)
# Do the final merging
ds = xr.merge(datasets)
return ds
4 changes: 4 additions & 0 deletions searvey/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ def resolve_timestamp(
return ts


def resolve_start_date(enddate: DateTimeLike, period: float) -> pd.Timestamp:
return resolve_timestamp(enddate) - pd.Timedelta("30 days")


def get_region_from_bbox_corners(
lon_min: Union[float, None] = None,
lon_max: Union[float, None] = None,
Expand Down

0 comments on commit 903f3be

Please sign in to comment.