Skip to content

Commit

Permalink
Use pdbufr to read DWD radar data in bufr format into DataFrame
Browse files Browse the repository at this point in the history
  • Loading branch information
gutzbenj committed May 12, 2023
1 parent b411cae commit f98a8b6
Show file tree
Hide file tree
Showing 10 changed files with 143 additions and 5 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/install.sh
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ if [ "${flavor}" = "testing" ]; then
--extras=radar \
--extras=radarplus \
--extras=restapi \
--extras=sql
--extras=sql \
--extras=bufr

elif [ "${flavor}" = "docs" ]; then
poetry install --verbose --no-interaction --with=docs --extras=interpolation
Expand Down
6 changes: 6 additions & 0 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@ jobs:
architecture: x64
cache: poetry

- name: Install eccodes (Mac only)
run: |
if [ "$RUNNER_OS" == "macOS" ]; then
brew install eccodes && export WD_ECCODES_DIR=$(brew --prefix eccodes)
fi
- name: Install project
run: .github/workflows/install.sh testing

Expand Down
2 changes: 2 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ Changelog
Development
***********

- Radar: read bufr data into pandas.DataFrame when eccodes library is provided

0.56.2 (11.05.2023)
*******************

Expand Down
1 change: 1 addition & 0 deletions docs/usage/python-api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ and to set it back to standard
The environmental settings recognized by our settings are

- WD_CACHE_DISABLE
- WD_ECCODES_DIR
- WD_FSSPEC_CLIENT_KWARGS
- WD_TS_HUMANIZE
- WD_TS_SHAPE
Expand Down
6 changes: 4 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ dash = { version = "^2.8", optional = true } # Explo
dash-bootstrap-components = { version = "^1.4", optional = true } # Explorer UI feature.
dash-leaflet = { version = "^0.1.23", optional = true } # Explorer UI feature.
duckdb = { version = "^0.7.1", optional = true } # Export feature.
eccodes = { version = "1.2.0", optional = true }
fastapi = { version = "^0.95.1", optional = true } # HTTP REST API feature.
geojson = { version = "^2.5.0", optional = true } # Explorer UI feature.
httpx = {version = "^0.24.0", optional = true}
Expand All @@ -142,6 +143,7 @@ influxdb-client = { version = "^1.18", optional = true } # Expo
matplotlib = { version = "^3.3", optional = true }
mysqlclient = { version = "^2.0", optional = true } # Export feature.
openpyxl = { version = "^3.0", optional = true }
pdbufr = { version = "^0.9.0", optional = true, extras = ["eccodes"] }
plotly = { version = "^5.11", optional = true } # Explorer UI feature.
psycopg2-binary = { version = "^2.8", optional = true } # Export feature.
pyarrow = { version = "^10.0", optional = true}
Expand Down Expand Up @@ -208,7 +210,7 @@ sphinxcontrib-svg2pdfconverter = "^1.1"
tomlkit = "^0.7"

[tool.poetry.extras]
bufr = ["pybufrkit"]
bufr = ["pybufrkit", "pdbufr"]
cratedb = ["crate"]
duckdb = ["duckdb"]
explorer = ["dash", "dash-bootstrap-components", "dash-leaflet", "geojson", "plotly"]
Expand All @@ -220,7 +222,7 @@ mpl = ["matplotlib"]
mysql = ["mysqlclient"]
postgresql = ["psycopg2-binary"]
radar = ["h5py"]
radarplus = ["pybufrkit", "wradlib", "xradar"]
radarplus = ["h5py", "pybufrkit", "wradlib", "xradar", "pdbufr"]
restapi = ["fastapi", "httpx", "uvicorn"]
sql = ["duckdb"]

Expand Down
30 changes: 30 additions & 0 deletions tests/provider/dwd/radar/test_api_historic.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import requests
from dirty_equals import IsDatetime, IsDict, IsInt, IsList, IsNumeric, IsStr

from wetterdienst.eccodes import ensure_eccodes
from wetterdienst.provider.dwd.radar import (
DwdRadarDataFormat,
DwdRadarDataSubset,
Expand Down Expand Up @@ -480,6 +481,24 @@ def test_radar_request_site_historic_pe_bufr(default_settings):
decoder = pybufrkit.decoder.Decoder()
decoder.process(payload, info_only=True)

if ensure_eccodes():
df = results[0].df

assert not df.empty

assert df.columns.tolist() == [
"station_id",
"latitude",
"longitude",
"height",
"projectionType",
"pictureType",
"date",
"echotops",
]

assert not df.dropna().empty


@pytest.mark.xfail(reason="month_year not matching start_date")
@pytest.mark.remote
Expand Down Expand Up @@ -533,6 +552,13 @@ def test_radar_request_site_historic_pe_timerange(fmt, default_settings):
)
assert re.match(bytes(header, encoding="ascii"), payload[:115])

first = results[0]

if fmt == DwdRadarDataFormat.BUFR:
assert not first.df.dropna().empty

assert first.df.columns == [""]


@pytest.mark.remote
def test_radar_request_site_historic_px250_bufr_yesterday(default_settings):
Expand Down Expand Up @@ -601,6 +627,10 @@ def test_radar_request_site_historic_px250_bufr_timerange(default_settings):

assert len(results) == 12

first = results[0]

assert not first.df.dropna().empty


@pytest.mark.remote
def test_radar_request_site_historic_sweep_pcp_v_bufr_yesterday(default_settings):
Expand Down
2 changes: 2 additions & 0 deletions tests/test_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ def test_default_settings(caplog):
default_settings = Settings.default()
assert not default_settings.cache_disable
assert re.match(WD_CACHE_DIR_PATTERN, default_settings.cache_dir)
assert default_settings.eccodes_dir is None
assert default_settings.fsspec_client_kwargs == {}
assert default_settings.ts_humanize
assert default_settings.ts_shape == "long"
Expand All @@ -41,6 +42,7 @@ def test_default_settings(caplog):
assert default_settings.ts_skip_threshold == 0.95
assert not default_settings.ts_dropna
assert default_settings.ts_interpolation_use_nearby_station_distance == 1
assert not default_settings.read_bufr
log_message = caplog.messages[0]
assert re.match(WD_CACHE_ENABLED_PATTERN, log_message)

Expand Down
13 changes: 13 additions & 0 deletions wetterdienst/eccodes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2018-2022, earthobservations developers.
# Distributed under the MIT License. See LICENSE for more info.
def ensure_eccodes() -> bool:
"""Function to ensure that eccodes is loaded"""
try:
import eccodes

eccodes.eccodes.codes_get_api_version()
except (ModuleNotFoundError, RuntimeError):
return False

return True
70 changes: 69 additions & 1 deletion wetterdienst/provider/dwd/radar/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,19 @@
import logging
import re
import tarfile
from dataclasses import dataclass
import tempfile
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from io import BytesIO
from typing import Generator, Optional, Union

import pandas as pd
import pdbufr as pdbufr
from fsspec.implementations.tar import TarFileSystem

from wetterdienst.eccodes import ensure_eccodes
from wetterdienst.exceptions import FailedDownload
from wetterdienst.metadata.columns import Columns
from wetterdienst.metadata.extension import Extension
from wetterdienst.metadata.period import Period
from wetterdienst.metadata.resolution import Resolution
Expand Down Expand Up @@ -44,6 +48,15 @@

log = logging.getLogger(__name__)

BUFR_PARAMETER_MAPPING = {
DwdRadarParameter.PE_ECHO_TOP: ["echoTops"],
DwdRadarParameter.PG_REFLECTIVITY: ["horizontalReflectivity"],
DwdRadarParameter.LMAX_VOLUME_SCAN: ["horizontalReflectivity"],
DwdRadarParameter.PX250_REFLECTIVITY: ["horizontalReflectivity"],
}

ECCODES_FOUND = ensure_eccodes()


@dataclass
class RadarResult:
Expand All @@ -53,6 +66,8 @@ class RadarResult:
"""

data: BytesIO
# placeholder for bufr files, which are read into pandas.DataFrame if eccodes available
df: pd.DataFrame = field(default_factory=pd.DataFrame)
timestamp: datetime = None
url: str = None
filename: str = None
Expand Down Expand Up @@ -389,6 +404,59 @@ def query(self) -> Generator[RadarResult, None, None]:
verify_hdf5(result.data)
except Exception as e: # pragma: no cover
log.exception(f"Unable to read HDF5 file. {e}")

if self.format == DwdRadarDataFormat.BUFR:
if ECCODES_FOUND and self.settings.read_bufr:
buffer = result.data

# TODO: pdbufr currently doesn't seem to allow reading directly from BytesIO
tf = tempfile.NamedTemporaryFile("w+b")
tf.write(buffer.read())
tf.seek(0)

df = pdbufr.read_bufr(
tf.name,
columns="data",
flat=True
)

value_vars = []
parameters = BUFR_PARAMETER_MAPPING[self.parameter]
for par in parameters:
value_vars.extend([col for col in df.columns if par in col])
value_vars = set(value_vars)
id_vars = df.columns.difference(value_vars)
id_vars = [iv for iv in id_vars if iv.startswith("#1#")]

df = df.melt(id_vars=id_vars,var_name="parameter",value_vars=value_vars, value_name="value")
df.columns = [col[3:] if col.startswith("#1#") else col for col in df.columns]

df = df.rename(
columns={
"stationNumber": Columns.STATION_ID.value,
"latitude": Columns.LATITUDE.value,
"longitude": Columns.LONGITUDE.value,
"heightOfStation": Columns.HEIGHT.value,
}
)


# df[Columns.STATION_ID.value] = df[Columns.STATION_ID.value].astype(int).astype(str)

date_columns = ["year", "month", "day", "hour", "minute"]
dates = df.loc[:, date_columns].apply(
lambda x: datetime(
year=x.year, month=x.month, day=x.day, hour=x.hour, minute=x.minute
),
axis=1,
)
df.insert(len(df.columns) - 1, Columns.DATE.value, dates)
df = df.drop(columns=date_columns)

print(df)

result.df = df

yield result

@staticmethod
Expand Down
15 changes: 14 additions & 1 deletion wetterdienst/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ class Settings:
_defaults = {
"cache_disable": False,
"cache_dir": platformdirs.user_cache_dir(appname="wetterdienst"),
"eccodes_dir": None,
"fsspec_client_kwargs": {},
"ts_humanize": True,
"ts_shape": "long",
Expand All @@ -39,12 +40,14 @@ class Settings:
"ts_skip_criteria": "min",
"ts_dropna": False,
"ts_interpolation_use_nearby_station_distance": 1,
"radar_read_bufr": False
}

def __init__(
self,
cache_disable: Optional[bool] = None,
cache_dir: Optional[pathlib.Path] = None,
eccodes_dir: Optional[pathlib.Path] = None,
fsspec_client_kwargs: Optional[dict] = None,
ts_humanize: Optional[bool] = None,
ts_shape: Optional[Literal["wide", "long"]] = None,
Expand All @@ -54,6 +57,7 @@ def __init__(
ts_skip_criteria: Optional[Literal["min", "mean", "max"]] = None,
ts_dropna: Optional[bool] = None,
ts_interpolation_use_nearby_station_distance: Optional[Union[float, int]] = None,
radar_read_bufr: Optional[bool] = None,
ignore_env: bool = False,
) -> None:
_defaults = deepcopy(self._defaults) # make sure mutable objects are not changed
Expand All @@ -65,6 +69,9 @@ def __init__(
# cache
self.cache_disable: bool = _da(cache_disable, env.bool("CACHE_DISABLE", None), _defaults["cache_disable"])
self.cache_dir: pathlib.Path = _da(cache_dir, env.path("CACHE_DIR", None), _defaults["cache_dir"])
# eccodes
self.eccodes_dir: pathlib.Path = _da(eccodes_dir, env.path("ECCODES_DIR", None), _defaults["eccodes_dir"])

# FSSPEC aiohttp client kwargs, may be used to pass extra arguments
# such as proxies to aiohttp
self.fsspec_client_kwargs: dict = _da(
Expand All @@ -88,7 +95,7 @@ def __init__(
env.str("SKIP_CRITERIA", None, validate=OneOf(["min", "mean", "max"])),
_defaults["ts_skip_criteria"],
)
self.ts_dropna: bool = _da(ts_dropna, env.bool("DROPNA", ts_dropna), _defaults["ts_dropna"])
self.ts_dropna: bool = _da(ts_dropna, env.bool("DROPNA", None), _defaults["ts_dropna"])

with env.prefixed("INTERPOLATION_"):
self.ts_interpolation_use_nearby_station_distance: float = _da(
Expand All @@ -97,6 +104,10 @@ def __init__(
_defaults["ts_interpolation_use_nearby_station_distance"],
)

with env.prefixed("RADAR_"):
# radar related
self.read_bufr: bool = _da(radar_read_bufr, env.bool("READ_BUFR", None), _defaults["radar_read_bufr"])

if self.cache_disable:
log.info("Wetterdienst cache is disabled")
else:
Expand All @@ -117,6 +128,7 @@ def to_dict(self) -> dict:
return {
"cache_disable": self.cache_disable,
"cache_dir": self.cache_dir,
"eccodes_dir": self.eccodes_dir,
"fsspec_client_kwargs": self.fsspec_client_kwargs,
"ts_humanize": self.ts_humanize,
"ts_shape": self.ts_shape,
Expand All @@ -126,6 +138,7 @@ def to_dict(self) -> dict:
"ts_skip_criteria": self.ts_skip_criteria,
"ts_dropna": self.ts_dropna,
"ts_interpolation_use_nearby_station_distance": self.ts_interpolation_use_nearby_station_distance,
"radar_read_bufr": self.read_bufr
}

def reset(self) -> "Settings":
Expand Down

0 comments on commit f98a8b6

Please sign in to comment.