Skip to content

Commit

Permalink
Use pdbufr to read DWD radar data in bufr format into DataFrame
Browse files Browse the repository at this point in the history
  • Loading branch information
gutzbenj committed Mar 12, 2024
1 parent 739c49b commit 722c9d5
Show file tree
Hide file tree
Showing 6 changed files with 129 additions and 1 deletion.
3 changes: 2 additions & 1 deletion .github/workflows/install.sh
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ if [ "${flavor}" = "testing" ]; then
--extras=radar \
--extras=radarplus \
--extras=restapi \
--extras=sql
--extras=sql \
--extras=bufr

elif [ "${flavor}" = "docs" ]; then
poetry install --verbose --no-interaction --with=docs --extras=interpolation
Expand Down
6 changes: 6 additions & 0 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,12 @@ jobs:
brew install eccodes
export WD_ECCODES_DIR=$(brew --prefix eccodes)
- name: Install eccodes (Mac only)
run: |
if [ "$RUNNER_OS" == "macOS" ]; then
brew install eccodes && export WD_ECCODES_DIR=$(brew --prefix eccodes)
fi
- name: Install project
run: .github/workflows/install.sh testing

Expand Down
32 changes: 32 additions & 0 deletions tests/provider/dwd/radar/test_api_historic.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import pytest
from dirty_equals import IsDatetime, IsDict, IsInt, IsList, IsNumeric, IsStr

from wetterdienst.eccodes import ensure_eccodes
from wetterdienst.provider.dwd.radar import (
DwdRadarDataFormat,
DwdRadarDataSubset,
Expand Down Expand Up @@ -516,6 +517,26 @@ def test_radar_request_site_historic_pe_bufr(default_settings):
decoder = pybufrkit.decoder.Decoder()
decoder.process(payload, info_only=True)

if ensure_eccodes():
df = results[0].df

assert not df.empty

print(df.dropna().query("value != 0"))

assert df.columns.tolist() == [
"station_id",
"latitude",
"longitude",
"height",
"projectionType",
"pictureType",
"date",
"echotops",
]

assert not df.dropna().empty


@pytest.mark.xfail(reason="month_year not matching start_date")
@pytest.mark.remote
Expand Down Expand Up @@ -569,6 +590,13 @@ def test_radar_request_site_historic_pe_timerange(fmt, default_settings):
)
assert re.match(bytes(header, encoding="ascii"), payload[:115])

first = results[0]

if fmt == DwdRadarDataFormat.BUFR:
assert not first.df.dropna().empty

assert first.df.columns == [""]


@pytest.mark.remote
def test_radar_request_site_historic_px250_bufr_yesterday(default_settings):
Expand Down Expand Up @@ -637,6 +665,10 @@ def test_radar_request_site_historic_px250_bufr_timerange(default_settings):

assert len(results) == 12

first = results[0]

assert not first.df.dropna().empty


@pytest.mark.remote
def test_radar_request_site_historic_sweep_vol_v_hdf5_yesterday(default_settings):
Expand Down
2 changes: 2 additions & 0 deletions tests/test_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ def test_default_settings(caplog):
default_settings = Settings.default()
assert not default_settings.cache_disable
assert re.match(WD_CACHE_DIR_PATTERN, default_settings.cache_dir)
assert default_settings.eccodes_dir is None
assert default_settings.fsspec_client_kwargs == {}
assert default_settings.ts_humanize
assert default_settings.ts_shape == "long"
Expand All @@ -44,6 +45,7 @@ def test_default_settings(caplog):
"precipitation_height": 20.0,
}
assert default_settings.ts_interpolation_use_nearby_station_distance == 1
assert not default_settings.read_bufr
log_message = caplog.messages[0]
assert re.match(WD_CACHE_ENABLED_PATTERN, log_message)

Expand Down
13 changes: 13 additions & 0 deletions wetterdienst/eccodes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2018-2022, earthobservations developers.
# Distributed under the MIT License. See LICENSE for more info.
def ensure_eccodes() -> bool:
"""Function to ensure that eccodes is loaded"""
try:
import eccodes

eccodes.eccodes.codes_get_api_version()
except (ModuleNotFoundError, RuntimeError):
return False

return True
74 changes: 74 additions & 0 deletions wetterdienst/provider/dwd/radar/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,15 @@

log = logging.getLogger(__name__)

BUFR_PARAMETER_MAPPING = {
DwdRadarParameter.PE_ECHO_TOP: ["echoTops"],
DwdRadarParameter.PG_REFLECTIVITY: ["horizontalReflectivity"],
DwdRadarParameter.LMAX_VOLUME_SCAN: ["horizontalReflectivity"],
DwdRadarParameter.PX250_REFLECTIVITY: ["horizontalReflectivity"],
}

ECCODES_FOUND = ensure_eccodes()


@dataclass
class RadarResult:
Expand All @@ -64,6 +73,8 @@ class RadarResult:
"""

data: BytesIO
# placeholder for bufr files, which are read into pandas.DataFrame if eccodes available
df: pl.DataFrame = field(default_factory=pl.DataFrame)
timestamp: dt.datetime = None
url: str = None
filename: str = None
Expand Down Expand Up @@ -415,6 +426,69 @@ def query(self) -> Iterator[RadarResult]:
verify_hdf5(result.data)
except Exception as e: # pragma: no cover
log.exception(f"Unable to read HDF5 file. {e}")

if self.format == DwdRadarDataFormat.BUFR:
if ECCODES_FOUND and self.settings.read_bufr:
buffer = result.data

# TODO: pdbufr currently doesn't seem to allow reading directly from BytesIO
tf = tempfile.NamedTemporaryFile("w+b")
tf.write(buffer.read())
tf.seek(0)

df = pdbufr.read_bufr(
tf.name,
columns="data",
flat=True
)

value_vars = []
parameters = BUFR_PARAMETER_MAPPING[self.parameter]
for par in parameters:
value_vars.extend([col for col in df.columns if par in col])
value_vars = set(value_vars)
id_vars = df.columns.difference(value_vars)
id_vars = [iv for iv in id_vars if iv.startswith("#1#")]

df = df.melt(id_vars=id_vars,var_name="parameter",value_vars=value_vars, value_name="value")
df.columns = [col[3:] if col.startswith("#1#") else col for col in df.columns]

df = df.rename(
columns={
"stationNumber": Columns.STATION_ID.value,
"latitude": Columns.LATITUDE.value,
"longitude": Columns.LONGITUDE.value,
"heightOfStation": Columns.HEIGHT.value,
}
)


# df[Columns.STATION_ID.value] = df[Columns.STATION_ID.value].astype(int).astype(str)

date_columns = ["year", "month", "day", "hour", "minute"]
dates = df.loc[:, date_columns].apply(
lambda x: datetime(
year=x.year, month=x.month, day=x.day, hour=x.hour, minute=x.minute
),
axis=1,
)
df.insert(len(df.columns) - 1, Columns.DATE.value, dates)
df = df.drop(columns=date_columns)

def split_index_parameter(text: str):
split_index = text.index("#", 1)
if split_index == -1:
return text, None
index = text[1:split_index]
parameter = text[split_index+1:]
return parameter, float(index)

df[["parameter", "index"]] = df.pop("parameter").map(split_index_parameter).tolist()

df = df.sort_values(["parameter", "index"])

result.df = df

yield result

@staticmethod
Expand Down

0 comments on commit 722c9d5

Please sign in to comment.