Skip to content

Commit

Permalink
ADD: Add new xarray dataset builder
Browse files Browse the repository at this point in the history
  • Loading branch information
mgrover1 committed Jan 18, 2024
1 parent 34b52b0 commit 9330b17
Showing 1 changed file with 182 additions and 12 deletions.
194 changes: 182 additions & 12 deletions xradar/io/backends/nexrad_archive.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,20 @@
import warnings

import numpy as np

from ...model import (
import xarray as xr
from xarray.backends.common import BackendEntrypoint
from xarray.core import indexing
from xarray.core.variable import Variable

from xradar import util
from xradar.io.backends.common import LazyLoadDict, prepare_for_read
from xradar.io.backends.nexrad_common import get_nexrad_location
from xradar.io.backends.nexrad_interpolate import (
_fast_interpolate_scan_2,
_fast_interpolate_scan_4,
)
from xradar.io.backends.nexrad_level2 import NEXRADLevel2File
from xradar.model import (
get_altitude_attrs,
get_azimuth_attrs,
get_elevation_attrs,
Expand All @@ -12,10 +24,6 @@
get_range_attrs,
get_time_attrs,
)
from .common import LazyLoadDict, prepare_for_read
from .nexrad_common import get_nexrad_location
from .nexrad_interpolate import _fast_interpolate_scan_2, _fast_interpolate_scan_4
from .nexrad_level2 import NEXRADLevel2File

nexrad_mapping = {
"REF": "DBZH",
Expand Down Expand Up @@ -106,7 +114,7 @@ def _interpolate_scan(mdata, start, end, moment_ngates, multiplier, linear_inter
mdata[:] = np.ma.array(data, mask=(data == fill_value))


def open_dataset(
def format_nexrad_data(
filename,
field_names=None,
additional_metadata=None,
Expand All @@ -118,10 +126,14 @@ def open_dataset(
scans=None,
linear_interp=True,
storage_options={"anon": True},
first_dimension=None,
group=None,
**kwargs,
):
# Load the data file in using NEXRADLevel2File Class
nfile = NEXRADLevel2File(prepare_for_read(filename))
nfile = NEXRADLevel2File(
prepare_for_read(filename, storage_options=storage_options)
)

# Access the scan info and load in the available moments
scan_info = nfile.scan_info(scans)
Expand Down Expand Up @@ -179,9 +191,9 @@ def open_dataset(
lat, lon, alt = get_nexrad_location(nfile.volume_header["icao"].decode())
else:
lat, lon, alt = nfile.location()
latitude["data"] = np.array([lat], dtype="float64")
longitude["data"] = np.array([lon], dtype="float64")
altitude["data"] = np.array([alt], dtype="float64")
latitude["data"] = np.array(lat, dtype="float64")
longitude["data"] = np.array(lon, dtype="float64")
altitude["data"] = np.array(alt, dtype="float64")

# Sweep information
sweep_number = {
Expand Down Expand Up @@ -284,4 +296,162 @@ def open_dataset(
)
dic["data"] = mdata
fields[nexrad_mapping[moment]] = dic
return fields
instrument_parameters = {}

return create_dataset_from_fields(
time,
_range,
fields,
metadata,
latitude,
longitude,
altitude,
sweep_number,
sweep_mode,
fixed_angle,
sweep_start_ray_index,
sweep_end_ray_index,
azimuth,
elevation,
instrument_parameters,
first_dimension=first_dimension,
group=group,
)


def create_dataset_from_fields(
time,
_range,
fields,
metadata,
latitude,
longitude,
altitude,
sweep_number,
sweep_mode,
fixed_angle,
sweep_start_ray_index,
sweep_end_ray_index,
azimuth,
elevation,
instrument_parameters,
first_dimension=None,
group=None,
):
"""
Creates an xarray dataset given a selection of fields defined as a collection of dictionaries
"""
if first_dimension is None:
first_dimension = "azimuth"

if group is not None:
encoding = {"group": f"sweep_{group}"}
else:
encoding = {}
group = 0

group_slice = slice(
sweep_start_ray_index["data"][group], sweep_end_ray_index["data"][group]
)

# Track the number of sweeps
nsweeps = len(sweep_number["data"])

# Handle the coordinates
azimuth["data"] = indexing.LazilyOuterIndexedArray(azimuth["data"][group_slice])
elevation["data"] = indexing.LazilyOuterIndexedArray(elevation["data"][group_slice])
time["data"] = indexing.LazilyOuterIndexedArray(time["data"][group_slice])
sweep_mode["data"] = sweep_mode["data"][group]
sweep_number["data"] = sweep_number["data"][group]
fixed_angle["data"] = fixed_angle["data"][group]

coords = {
"azimuth": dictionary_to_xarray_variable(azimuth, encoding, first_dimension),
"elevation": dictionary_to_xarray_variable(
elevation, encoding, first_dimension
),
"time": dictionary_to_xarray_variable(time, encoding, first_dimension),
"range": dictionary_to_xarray_variable(_range, encoding, "range"),
"longitude": dictionary_to_xarray_variable(longitude),
"latitude": dictionary_to_xarray_variable(latitude),
"altitude": dictionary_to_xarray_variable(altitude),
"sweep_mode": dictionary_to_xarray_variable(sweep_mode),
"sweep_number": dictionary_to_xarray_variable(sweep_number),
"sweep_fixed_angle": dictionary_to_xarray_variable(fixed_angle),
}
data_vars = {
field: dictionary_to_xarray_variable(
data, dims=(first_dimension, "range"), subset=group_slice
)
for (field, data) in fields.items()
}

ds = xr.Dataset(data_vars=data_vars, coords=coords, attrs=metadata)

ds.attrs["nsweeps"] = nsweeps

return ds


def dictionary_to_xarray_variable(variable_dict, encoding=None, dims=None, subset=None):
attrs = variable_dict.copy()
data = variable_dict.copy()
attrs.pop("data")
if dims is None:
dims = ()
if subset is not None:
data["data"] = data["data"][subset]
return Variable(dims, data["data"], encoding, attrs)


class NexradBackendEntrypoint(BackendEntrypoint):
"""
Xarray BackendEntrypoint for NEXRAD Level2 Data
"""

description = "Open NEXRAD Level2 files in Xarray"
url = "tbd"

def open_dataset(
self,
filename_or_obj,
*,
mask_and_scale=True,
decode_times=True,
concat_characters=True,
decode_coords=True,
drop_variables=None,
use_cftime=None,
decode_timedelta=None,
group=None,
first_dim="auto",
reindex_angle=False,
fix_second_angle=False,
site_coords=True,
optional=True,
):
ds = format_nexrad_data(filename_or_obj, group=group)

# reassign azimuth/elevation/time coordinates
ds = ds.assign_coords({"azimuth": ds.azimuth})
ds = ds.assign_coords({"elevation": ds.elevation})
ds = ds.assign_coords({"time": ds.time})

ds.encoding["engine"] = "NexradLevel2"

# handle duplicates and reindex
if decode_coords and reindex_angle is not False:
ds = ds.pipe(util.remove_duplicate_rays)
ds = ds.pipe(util.reindex_angle, **reindex_angle)
ds = ds.pipe(util.ipol_time, **reindex_angle)

dim0 = "elevation" if ds.sweep_mode.load() == "rhi" else "azimuth"

# todo: could be optimized
if first_dim == "time":
ds = ds.swap_dims({dim0: "time"})
ds = ds.sortby("time")
else:
ds = ds.sortby(dim0)

return ds

0 comments on commit 9330b17

Please sign in to comment.