diff --git a/echopype/commongrid/__init__.py b/echopype/commongrid/__init__.py index 642434353..63172d2bd 100644 --- a/echopype/commongrid/__init__.py +++ b/echopype/commongrid/__init__.py @@ -1,6 +1,7 @@ -from .api import compute_MVBS, compute_MVBS_index_binning +from .api import compute_MVBS, compute_MVBS_index_binning, compute_NASC __all__ = [ "compute_MVBS", + "compute_NASC", "compute_MVBS_index_binning", ] diff --git a/echopype/commongrid/api.py b/echopype/commongrid/api.py index 8c9132b99..dcc283c05 100644 --- a/echopype/commongrid/api.py +++ b/echopype/commongrid/api.py @@ -1,7 +1,7 @@ """ Functions for enhancing the spatial and temporal coherence of data. """ -import re +import logging from typing import Literal import numpy as np @@ -10,157 +10,19 @@ from ..consolidate.api import POSITION_VARIABLES from ..utils.prov import add_processing_level, echopype_prov_attrs, insert_input_processing_level -from .mvbs import get_MVBS_along_channels +from .utils import ( + _convert_bins_to_interval_index, + _get_reduced_positions, + _parse_x_bin, + _set_MVBS_attrs, + _set_var_attrs, + _setup_and_validate, + compute_raw_MVBS, + compute_raw_NASC, + get_distance_from_latlon, +) - -def _set_var_attrs(da, long_name, units, round_digits, standard_name=None): - """ - Attach common attributes to DataArray variable. - - Parameters - ---------- - da : xr.DataArray - DataArray that will receive attributes - long_name : str - Variable long_name attribute - units : str - Variable units attribute - round_digits : int - Number of digits after decimal point for rounding off actual_range - standard_name : str - CF standard_name, if available (optional) - """ - - da.attrs = { - "long_name": long_name, - "units": units, - "actual_range": [ - round(float(da.min().values), round_digits), - round(float(da.max().values), round_digits), - ], - } - if standard_name: - da.attrs["standard_name"] = standard_name - - -def _set_MVBS_attrs(ds): - """ - Attach common attributes. - - Parameters - ---------- - ds : xr.Dataset - dataset containing MVBS - """ - ds["ping_time"].attrs = { - "long_name": "Ping time", - "standard_name": "time", - "axis": "T", - } - - _set_var_attrs( - ds["Sv"], - long_name="Mean volume backscattering strength (MVBS, mean Sv re 1 m-1)", - units="dB", - round_digits=2, - ) - - -def _convert_bins_to_interval_index( - bins: list, closed: Literal["left", "right"] = "left" -) -> pd.IntervalIndex: - """ - Convert bins to sorted pandas IntervalIndex - with specified closed end - - Parameters - ---------- - bins : list - The bin edges - closed : {'left', 'right'}, default 'left' - Which side of bin interval is closed - - Returns - ------- - pd.IntervalIndex - The resulting IntervalIndex - """ - return pd.IntervalIndex.from_breaks(bins, closed=closed).sort_values() - - -def _parse_x_bin(x_bin: str, x_label="range_bin") -> float: - """ - Parses x bin string, check unit, - and returns x bin in the specified unit. - - Currently only available for: - range_bin: meters (m) - dist_bin: nautical miles (nmi) - - Parameters - ---------- - x_bin : str - X bin string, e.g., "0.5nmi" or "10m" - x_label : {"range_bin", "dist_bin"}, default "range_bin" - The label of the x bin. - - Returns - ------- - float - The resulting x bin value in x unit, - based on label. - - Raises - ------ - ValueError - If the x bin string doesn't include unit value. - TypeError - If the x bin is not a type string. - KeyError - If the x label is not one of the available labels. - """ - x_bin_map = { - "range_bin": { - "name": "Range bin", - "unit": "m", - "ex": "10m", - "unit_label": "meters", - "pattern": r"([\d+]*[.,]{0,1}[\d+]*)(\s+)?(m)", - }, - "dist_bin": { - "name": "Distance bin", - "unit": "nmi", - "ex": "0.5nmi", - "unit_label": "nautical miles", - "pattern": r"([\d+]*[.,]{0,1}[\d+]*)(\s+)?(nmi)", - }, - } - x_bin_info = x_bin_map.get(x_label, None) - - if x_bin_info is None: - raise KeyError(f"x_label must be one of {list(x_bin_map.keys())}") - - # First check for bin types - if not isinstance(x_bin, str): - raise TypeError("'x_bin' must be a string") - # normalize to lower case - # for x_bin - x_bin = x_bin.strip().lower() - # Only matches meters - match_obj = re.match(x_bin_info["pattern"], x_bin) - - # Do some checks on x_bin inputs - if match_obj is None: - # This shouldn't be other units - raise ValueError( - f"{x_bin_info['name']} must be in " - f"{x_bin_info['unit_label']} " - f"(e.g., '{x_bin_info['ex']}')." - ) - - # Convert back to float - x_bin = float(match_obj.group(1)) - return x_bin +logger = logging.getLogger(__name__) @add_processing_level("L3*") @@ -201,7 +63,7 @@ def compute_MVBS( for more details. closed: {'left', 'right'}, default 'left' Which side of bin interval is closed. - **kwargs + **flox_kwargs Additional keyword arguments to be passed to flox reduction function. @@ -210,28 +72,15 @@ def compute_MVBS( A dataset containing bin-averaged Sv """ + # Setup and validate + # * Sv dataset must contain specified range_var + # * Parse range_bin + # * Check closed value + ds_Sv, range_bin = _setup_and_validate(ds_Sv, range_var, range_bin, closed) + if not isinstance(ping_time_bin, str): raise TypeError("ping_time_bin must be a string") - range_bin = _parse_x_bin(range_bin, "range_bin") - - # Clean up filenames dimension if it exists - # not needed here - if "filenames" in ds_Sv.dims: - ds_Sv = ds_Sv.drop_dims("filenames") - - # Check if range_var is valid - if range_var not in ["echo_range", "depth"]: - raise ValueError("range_var must be one of 'echo_range' or 'depth'.") - - # Check if range_var exists in ds_Sv - if range_var not in ds_Sv.data_vars: - raise ValueError(f"range_var '{range_var}' does not exist in the input dataset.") - - # Check for closed values - if closed not in ["right", "left"]: - raise ValueError(f"{closed} is not a valid option. Options are 'left' or 'right'.") - # create bin information for echo_range # this computes the echo range max since there might NaNs in the data echo_range_max = ds_Sv[range_var].max() @@ -249,7 +98,7 @@ def compute_MVBS( # Set interval index for groups ping_interval = _convert_bins_to_interval_index(ping_interval, closed=closed) range_interval = _convert_bins_to_interval_index(range_interval, closed=closed) - raw_MVBS = get_MVBS_along_channels( + raw_MVBS = compute_raw_MVBS( ds_Sv, range_interval, ping_interval, @@ -269,12 +118,9 @@ def compute_MVBS( }, ) - # "has_positions" attribute is inserted in get_MVBS_along_channels - # when the dataset has position information + # If dataset has position information # propagate this to the final MVBS dataset - if raw_MVBS.attrs.get("has_positions", False): - for var in POSITION_VARIABLES: - ds_MVBS[var] = (["ping_time"], raw_MVBS[var].data, ds_Sv[var].attrs) + ds_MVBS = _get_reduced_positions(ds_Sv, ds_MVBS, "MVBS", ping_interval) # Add water level if uses echo_range and it exists in Sv dataset if range_var == "echo_range" and "water_level" in ds_Sv.data_vars: @@ -415,131 +261,147 @@ def compute_MVBS_index_binning(ds_Sv, range_sample_num=100, ping_num=100): return ds_MVBS -# def compute_NASC( -# ds_Sv: xr.Dataset, -# cell_dist: Union[int, float], # TODO: allow xr.DataArray -# cell_depth: Union[int, float], # TODO: allow xr.DataArray -# ) -> xr.Dataset: -# """ -# Compute Nautical Areal Scattering Coefficient (NASC) from an Sv dataset. - -# Parameters -# ---------- -# ds_Sv : xr.Dataset -# A dataset containing Sv data. -# The Sv dataset must contain ``latitude``, ``longitude``, and ``depth`` as data variables. -# cell_dist: int, float -# The horizontal size of each NASC cell, in nautical miles [nmi] -# cell_depth: int, float -# The vertical size of each NASC cell, in meters [m] - -# Returns -# ------- -# xr.Dataset -# A dataset containing NASC - -# Notes -# ----- -# The NASC computation implemented here corresponds to the Echoview algorithm PRC_NASC -# https://support.echoview.com/WebHelp/Reference/Algorithms/Analysis_Variables/PRC_ABC_and_PRC_NASC.htm#PRC_NASC # noqa -# The difference is that since in echopype masking of the Sv dataset is done explicitly using -# functions in the ``mask`` subpackage so the computation only involves computing the -# mean Sv and the mean height within each cell. - -# In addition, here the binning of pings into individual cells is based on the actual horizontal -# distance computed from the latitude and longitude coordinates of each ping in the Sv dataset. -# Therefore, both regular and irregular horizontal distance in the Sv dataset are allowed. -# This is different from Echoview's assumption of constant ping rate, vessel speed, and sample -# thickness when computing mean Sv. -# """ -# # Check Sv contains lat/lon -# if "latitude" not in ds_Sv or "longitude" not in ds_Sv: -# raise ValueError("Both 'latitude' and 'longitude' must exist in the input Sv dataset.") - -# # Check if depth vectors are identical within each channel -# if not ds_Sv["depth"].groupby("channel").map(check_identical_depth).all(): -# raise ValueError( -# "Only Sv data with identical depth vectors across all pings " -# "are allowed in the current compute_NASC implementation." -# ) - -# # Get distance from lat/lon in nautical miles -# dist_nmi = get_distance_from_latlon(ds_Sv) - -# # Find binning indices along distance -# bin_num_dist, dist_bin_idx = get_dist_bin_info(dist_nmi, cell_dist) # dist_bin_idx is 1-based - -# # Find binning indices along depth: channel-dependent -# bin_num_depth, depth_bin_idx = get_depth_bin_info(ds_Sv, cell_depth) # depth_bin_idx is 1-based # noqa - -# # Compute mean sv (volume backscattering coefficient, linear scale) -# # This is essentially to compute MVBS over a the cell defined here, -# # which are typically larger than those used for MVBS. -# # The implementation below is brute force looping, but can be optimized -# # by experimenting with different delayed schemes. -# # The optimized routines can then be used here and -# # in commongrid.compute_MVBS and clean.estimate_noise -# sv_mean = [] -# for ch_seq in np.arange(ds_Sv["channel"].size): -# # TODO: .compute each channel sequentially? -# # dask.delay within each channel? -# ds_Sv_ch = ds_Sv["Sv"].isel(channel=ch_seq).data # preserve the underlying type - -# sv_mean_dist_depth = [] -# for dist_idx in np.arange(bin_num_dist) + 1: # along ping_time -# sv_mean_depth = [] -# for depth_idx in np.arange(bin_num_depth) + 1: # along depth -# # Sv dim: ping_time x depth -# Sv_cut = ds_Sv_ch[dist_idx == dist_bin_idx, :][ -# :, depth_idx == depth_bin_idx[ch_seq] -# ] -# sv_mean_depth.append(np.nanmean(10 ** (Sv_cut / 10))) -# sv_mean_dist_depth.append(sv_mean_depth) - -# sv_mean.append(sv_mean_dist_depth) - -# # Compute mean height -# # For data with uniform depth step size, mean height = vertical size of cell -# height_mean = cell_depth -# # TODO: generalize to variable depth step size - -# ds_NASC = xr.DataArray( -# np.array(sv_mean) * height_mean, -# dims=["channel", "distance", "depth"], -# coords={ -# "channel": ds_Sv["channel"].values, -# "distance": np.arange(bin_num_dist) * cell_dist, -# "depth": np.arange(bin_num_depth) * cell_depth, -# }, -# name="NASC", -# ).to_dataset() - -# ds_NASC["frequency_nominal"] = ds_Sv["frequency_nominal"] # re-attach frequency_nominal - -# # Attach attributes -# _set_var_attrs( -# ds_NASC["NASC"], -# long_name="Nautical Areal Scattering Coefficient (NASC, m2 nmi-2)", -# units="m2 nmi-2", -# round_digits=3, -# ) -# _set_var_attrs(ds_NASC["distance"], "Cumulative distance", "m", 3) -# _set_var_attrs(ds_NASC["depth"], "Cell depth", "m", 3, standard_name="depth") - -# # Calculate and add ACDD bounding box global attributes -# ds_NASC.attrs["Conventions"] = "CF-1.7,ACDD-1.3" -# ds_NASC.attrs["time_coverage_start"] = np.datetime_as_string( -# ds_Sv["ping_time"].min().values, timezone="UTC" -# ) -# ds_NASC.attrs["time_coverage_end"] = np.datetime_as_string( -# ds_Sv["ping_time"].max().values, timezone="UTC" -# ) -# ds_NASC.attrs["geospatial_lat_min"] = round(float(ds_Sv["latitude"].min().values), 5) -# ds_NASC.attrs["geospatial_lat_max"] = round(float(ds_Sv["latitude"].max().values), 5) -# ds_NASC.attrs["geospatial_lon_min"] = round(float(ds_Sv["longitude"].min().values), 5) -# ds_NASC.attrs["geospatial_lon_max"] = round(float(ds_Sv["longitude"].max().values), 5) - -# return ds_NASC +def compute_NASC( + ds_Sv: xr.Dataset, + range_bin: str = "10m", + dist_bin: str = "0.5nmi", + method: str = "map-reduce", + closed: Literal["left", "right"] = "left", + **flox_kwargs, +) -> xr.Dataset: + """ + Compute Nautical Areal Scattering Coefficient (NASC) from an Sv dataset. + + Parameters + ---------- + ds_Sv : xr.Dataset + A dataset containing Sv data. + The Sv dataset must contain ``latitude``, ``longitude``, and ``depth`` as data variables. + range_bin : str, default '10m' + bin size along ``depth`` in meters (m). + dist_bin : str, default '0.5nmi' + bin size along ``distance`` in nautical miles (nmi). + method: str, default 'map-reduce' + The flox strategy for reduction of dask arrays only. + See flox `documentation `_ + for more details. + closed: {'left', 'right'}, default 'left' + Which side of bin interval is closed. + **flox_kwargs + Additional keyword arguments to be passed + to flox reduction function. + + Returns + ------- + xr.Dataset + A dataset containing NASC + + Notes + ----- + The NASC computation implemented here generally corresponds to the Echoview algorithm PRC_NASC + https://support.echoview.com/WebHelp/Reference/Algorithms/Analysis_Variables/PRC_ABC_and_PRC_NASC.htm#PRC_NASC # noqa + The difference is that since in echopype masking of the Sv dataset is done explicitly using + functions in the ``mask`` subpackage, the computation only involves computing the + mean Sv and the mean height within each cell, where some Sv "pixels" may have been + masked as NaN. + + In addition, in echopype the binning of pings into individual cells is based on the actual horizontal + distance computed from the latitude and longitude coordinates of each ping in the Sv dataset. + Therefore, both regular and irregular horizontal distance in the Sv dataset are allowed. + This is different from Echoview's assumption of constant ping rate, vessel speed, and sample + thickness when computing mean Sv + (see https://support.echoview.com/WebHelp/Reference/Algorithms/Analysis_Variables/Sv_mean.htm#Conversions). # noqa + """ + # Set range_var to be 'depth' + range_var = "depth" + + # Setup and validate + # * Sv dataset must contain latitude, longitude, and depth + # * Parse range_bin + # * Check closed value + ds_Sv, range_bin = _setup_and_validate( + ds_Sv, range_var, range_bin, closed, required_data_vars=POSITION_VARIABLES + ) + + # Check if dist_bin is a string + if not isinstance(dist_bin, str): + raise TypeError("dist_bin must be a string") + + # Parse the dist_bin string and convert to float + dist_bin = _parse_x_bin(dist_bin, "dist_bin") + + # Get distance from lat/lon in nautical miles + dist_nmi = get_distance_from_latlon(ds_Sv) + ds_Sv = ds_Sv.assign_coords({"distance_nmi": ("ping_time", dist_nmi)}).swap_dims( + {"ping_time": "distance_nmi"} + ) + + # create bin information along range_var + # this computes the range_var max since there might NaNs in the data + range_var_max = ds_Sv[range_var].max() + range_interval = np.arange(0, range_var_max + range_bin, range_bin) + + # create bin information along distance_nmi + # this computes the distance max since there might NaNs in the data + dist_max = ds_Sv["distance_nmi"].max() + dist_interval = np.arange(0, dist_max + dist_bin, dist_bin) + + # Set interval index for groups + dist_interval = _convert_bins_to_interval_index(dist_interval, closed=closed) + range_interval = _convert_bins_to_interval_index(range_interval, closed=closed) + + raw_NASC = compute_raw_NASC( + ds_Sv, + range_interval, + dist_interval, + method=method, + **flox_kwargs, + ) + + # create MVBS dataset + # by transforming the binned dimensions to regular coords + ds_NASC = xr.Dataset( + data_vars={"NASC": (["channel", "distance", range_var], raw_NASC["sv"].data)}, + coords={ + "distance": np.array([v.left for v in raw_NASC["distance_nmi_bins"].values]), + "channel": raw_NASC["channel"].values, + range_var: np.array([v.left for v in raw_NASC[f"{range_var}_bins"].values]), + }, + ) + + # If dataset has position information + # propagate this to the final NASC dataset + ds_NASC = _get_reduced_positions(ds_Sv, ds_NASC, "NASC", dist_interval) + + # Set ping time binning information + ds_NASC["ping_time"] = (["distance"], raw_NASC["ping_time"].data, ds_Sv["ping_time"].attrs) + + ds_NASC["frequency_nominal"] = ds_Sv["frequency_nominal"] # re-attach frequency_nominal + + # Attach attributes + _set_var_attrs( + ds_NASC["NASC"], + long_name="Nautical Areal Scattering Coefficient (NASC, m2 nmi-2)", + units="m2 nmi-2", + round_digits=3, + ) + _set_var_attrs(ds_NASC["distance"], "Cumulative distance", "nmi", 3) + _set_var_attrs(ds_NASC["depth"], "Cell depth", "m", 3, standard_name="depth") + + # Calculate and add ACDD bounding box global attributes + ds_NASC.attrs["Conventions"] = "CF-1.7,ACDD-1.3" + ds_NASC.attrs["time_coverage_start"] = np.datetime_as_string( + ds_Sv["ping_time"].min().values, timezone="UTC" + ) + ds_NASC.attrs["time_coverage_end"] = np.datetime_as_string( + ds_Sv["ping_time"].max().values, timezone="UTC" + ) + ds_NASC.attrs["geospatial_lat_min"] = round(float(ds_Sv["latitude"].min().values), 5) + ds_NASC.attrs["geospatial_lat_max"] = round(float(ds_Sv["latitude"].max().values), 5) + ds_NASC.attrs["geospatial_lon_min"] = round(float(ds_Sv["longitude"].min().values), 5) + ds_NASC.attrs["geospatial_lon_max"] = round(float(ds_Sv["longitude"].max().values), 5) + + return ds_NASC def regrid(): diff --git a/echopype/commongrid/mvbs.py b/echopype/commongrid/mvbs.py deleted file mode 100644 index 2fbc1b38f..000000000 --- a/echopype/commongrid/mvbs.py +++ /dev/null @@ -1,89 +0,0 @@ -""" -Contains core functions needed to compute the MVBS of an input dataset. -""" - -from typing import Literal, Union - -import numpy as np -import pandas as pd -import xarray as xr -from flox.xarray import xarray_reduce - -from ..consolidate.api import POSITION_VARIABLES -from ..utils.compute import _lin2log, _log2lin - - -def get_MVBS_along_channels( - ds_Sv: xr.Dataset, - range_interval: Union[pd.IntervalIndex, np.ndarray], - ping_interval: Union[pd.IntervalIndex, np.ndarray], - range_var: Literal["echo_range", "depth"] = "echo_range", - method: str = "map-reduce", - **kwargs -) -> xr.Dataset: - """ - Computes the MVBS of ``ds_Sv`` along each channel for the given - intervals. - - Parameters - ---------- - ds_Sv: xr.Dataset - A Dataset containing ``Sv`` and ``echo_range`` data with coordinates - ``channel``, ``ping_time``, and ``range_sample`` - range_interval: pd.IntervalIndex or np.ndarray - 1D array or interval index representing - the bins required for ``range_var`` - ping_interval: pd.IntervalIndex or np.ndarray - 1D array or interval index representing - the bins required for ``ping_time`` - range_var: str - The variable to use for range binning. - Either ``echo_range`` or ``depth``. - method: str - The flox strategy for reduction of dask arrays only. - See flox `documentation `_ - for more details. - **kwargs - Additional keyword arguments to be passed - to flox reduction function - - Returns - ------- - xr.Dataset - The MVBS dataset of the input ``ds_Sv`` for all channels - """ - - # average should be done in linear domain - sv = ds_Sv["Sv"].pipe(_log2lin) - - # Get positions if exists - # otherwise just use an empty dataset - ds_Pos = xr.Dataset(attrs={"has_positions": False}) - if all(v in ds_Sv for v in POSITION_VARIABLES): - ds_Pos = xarray_reduce( - ds_Sv[POSITION_VARIABLES], - ds_Sv["ping_time"], - func="nanmean", - expected_groups=(ping_interval), - isbin=True, - method=method, - ) - ds_Pos.attrs["has_positions"] = True - - # reduce along ping_time and echo_range or depth - # by binning and averaging - mvbs = xarray_reduce( - sv, - sv["channel"], - ds_Sv["ping_time"], - ds_Sv[range_var], - func="nanmean", - expected_groups=(None, ping_interval, range_interval), - isbin=[False, True, True], - method=method, - **kwargs - ) - - # apply inverse mapping to get back to the original domain and store values - da_MVBS = mvbs.pipe(_lin2log) - return xr.merge([ds_Pos, da_MVBS]) diff --git a/echopype/commongrid/nasc.py b/echopype/commongrid/nasc.py deleted file mode 100644 index 2656f2bc7..000000000 --- a/echopype/commongrid/nasc.py +++ /dev/null @@ -1,86 +0,0 @@ -""" -An overhaul is required for the below compute_NASC implementation, to: -- increase the computational efficiency -- debug the current code of any discrepancy against Echoview implementation -- potentially provide an alternative based on ping-by-ping Sv - -This script contains functions used by commongrid.compute_NASC, -but a subset of these overlap with operations needed -for commongrid.compute_MVBS and clean.estimate_noise. -The compute_MVBS and remove_noise code needs to be refactored, -and the compute_NASC needs to be optimized. -The plan is to create a common util set of functions for use in -these functions in an upcoming release. -""" - -import numpy as np -import xarray as xr -from geopy import distance - - -def check_identical_depth(ds_ch): - """ - Check if all pings have the same depth vector. - """ - # Depth vector are identical for all pings, if: - # - the number of non-NaN range_sample is the same for all pings, AND - # - all pings have the same max range - num_nan = np.isnan(ds_ch.values).sum(axis=1) - nan_check = True if np.all(num_nan == 0) or np.unique(num_nan).size == 1 else False - - if not nan_check: - return xr.DataArray(False, coords={"channel": ds_ch["channel"]}) - else: - # max range of each ping should be identical - max_range_ping = ds_ch.values[np.arange(ds_ch.shape[0]), ds_ch.shape[1] - num_nan - 1] - if np.unique(max_range_ping).size == 1: - return xr.DataArray(True, coords={"channel": ds_ch["channel"]}) - else: - return xr.DataArray(False, coords={"channel": ds_ch["channel"]}) - - -def get_depth_bin_info(ds_Sv, cell_depth): - """ - Find binning indices along depth - """ - depth_ping1 = ds_Sv["depth"].isel(ping_time=0) - num_nan = np.isnan(depth_ping1.values).sum(axis=1) - # ping 1 max range of each channel - max_range_ch = depth_ping1.values[ - np.arange(depth_ping1.shape[0]), depth_ping1.shape[1] - num_nan - 1 - ] - bin_num_depth = np.ceil(max_range_ch.max() / cell_depth) # use max range of all channel - depth_bin_idx = [ - np.digitize(dp1, np.arange(bin_num_depth + 1) * cell_depth, right=False) - for dp1 in depth_ping1 - ] - return bin_num_depth, depth_bin_idx - - -def get_distance_from_latlon(ds_Sv): - # Get distance from lat/lon in nautical miles - df_pos = ds_Sv["latitude"].to_dataframe().join(ds_Sv["longitude"].to_dataframe()) - df_pos["latitude_prev"] = df_pos["latitude"].shift(-1) - df_pos["longitude_prev"] = df_pos["longitude"].shift(-1) - df_latlon_nonan = df_pos.dropna().copy() - df_latlon_nonan["dist"] = df_latlon_nonan.apply( - lambda x: distance.distance( - (x["latitude"], x["longitude"]), - (x["latitude_prev"], x["longitude_prev"]), - ).nm, - axis=1, - ) - df_pos = df_pos.join(df_latlon_nonan["dist"], how="left") - df_pos["dist"] = df_pos["dist"].cumsum() - df_pos["dist"] = df_pos["dist"].fillna(method="ffill").fillna(method="bfill") - - return df_pos["dist"].values - - -def get_dist_bin_info(dist_nmi, cell_dist): - bin_num_dist = np.ceil(dist_nmi.max() / cell_dist) - if np.mod(dist_nmi.max(), cell_dist) == 0: - # increment bin num if last element coincides with bin edge - bin_num_dist = bin_num_dist + 1 - dist_bin_idx = np.digitize(dist_nmi, np.arange(bin_num_dist + 1) * cell_dist, right=False) - return bin_num_dist, dist_bin_idx diff --git a/echopype/commongrid/utils.py b/echopype/commongrid/utils.py new file mode 100644 index 000000000..39d2cd62a --- /dev/null +++ b/echopype/commongrid/utils.py @@ -0,0 +1,557 @@ +import logging +import re +from typing import Literal, Optional, Tuple, Union + +import numpy as np +import pandas as pd +import xarray as xr +from flox.xarray import xarray_reduce +from geopy import distance + +from ..consolidate.api import POSITION_VARIABLES +from ..utils.compute import _lin2log, _log2lin + +logger = logging.getLogger(__name__) + + +def compute_raw_MVBS( + ds_Sv: xr.Dataset, + range_interval: Union[pd.IntervalIndex, np.ndarray], + ping_interval: Union[pd.IntervalIndex, np.ndarray], + range_var: Literal["echo_range", "depth"] = "echo_range", + method="map-reduce", + **flox_kwargs, +): + """ + Compute the raw unformatted MVBS of ``ds_Sv``. + + Parameters + ---------- + ds_Sv : xr.Dataset + A Dataset containing ``Sv`` and ``echo_range`` data + with coordinates ``channel``, ``ping_time``, and ``range_sample`` + at bare minimum. + Or this can contain ``Sv`` and ``depth`` data with similar coordinates. + range_interval: pd.IntervalIndex or np.ndarray + 1D array or interval index representing + the bins required for ``range_var`` + ping_interval : pd.IntervalIndex or np.ndarray + 1D array or interval index representing + the bins required for ``ping_time``. + range_var: {'echo_range', 'depth'}, default 'echo_range' + The variable to use for range binning. + Either ``echo_range`` or ``depth``. + method: str + The flox strategy for reduction of dask arrays only. + See flox `documentation `_ + for more details. + **flox_kwargs + Additional keyword arguments to be passed + to flox reduction function. + + Returns + ------- + xr.Dataset + The MVBS dataset of the input ``ds_Sv`` for all channels. + """ + # Set initial variables + ds = xr.Dataset() + x_var = "ping_time" + + sv_mean = _groupby_x_along_channels( + ds_Sv, + range_interval, + x_interval=ping_interval, + x_var=x_var, + range_var=range_var, + method=method, + **flox_kwargs, + ) + + # This is MVBS computation + # apply inverse mapping to get back to the original domain and store values + da_MVBS = sv_mean.pipe(_lin2log) + # return xr.merge([ds_Pos, da_MVBS]) + return xr.merge([ds, da_MVBS]) + + +def compute_raw_NASC( + ds_Sv: xr.Dataset, + range_interval: Union[pd.IntervalIndex, np.ndarray], + dist_interval: Union[pd.IntervalIndex, np.ndarray], + method="map-reduce", + **flox_kwargs, +): + """ + Compute the raw unformatted NASC of ``ds_Sv``. + + Parameters + ---------- + ds_Sv : xr.Dataset + A Dataset containing ``Sv`` and ``depth`` data + with coordinates ``channel``, ``distance_nmi``, and ``range_sample``. + range_interval: pd.IntervalIndex or np.ndarray + 1D array or interval index representing + the bins required for ``range_var``. + dist_interval : pd.IntervalIndex or np.ndarray + 1D array or interval index representing + the bins required for ``distance_nmi``. + method: str + The flox strategy for reduction of dask arrays only. + See flox `documentation `_ + for more details. + **flox_kwargs + Additional keyword arguments to be passed + to flox reduction function. + + Returns + ------- + xr.Dataset + The MVBS or NASC dataset of the input ``ds_Sv`` for all channels + """ + # Set initial variables + ds = xr.Dataset() + x_var = "distance_nmi" + range_var = "depth" + + # Determine range_dim for NASC computation + range_dim = "range_sample" + if range_dim not in ds_Sv.dims: + range_dim = "depth" + + sv_mean = _groupby_x_along_channels( + ds_Sv, + range_interval, + x_interval=dist_interval, + x_var=x_var, + range_var=range_var, + method=method, + **flox_kwargs, + ) + + # Get mean ping_time along distance_nmi + # this is only done for NASC computation, + # since for MVBS the ping_time is used for binning already. + ds_ping_time = xarray_reduce( + ds_Sv["ping_time"], + ds_Sv[x_var], + func="nanmean", + expected_groups=(dist_interval), + isbin=True, + method=method, + ) + + # Mean height: approach to use flox + # Numerator (h_mean_num): + # - create a dataarray filled with the first difference of sample height + # with 2D coordinate (distance, depth) + # - flox xarray_reduce along both distance and depth, summing over each 2D bin + # Denominator (h_mean_denom): + # - create a datararray filled with 1, with 1D coordinate (distance) + # - flox xarray_reduce along distance, summing over each 1D bin + # h_mean = N/D + da_denom = xr.ones_like(ds_Sv[x_var]) + h_mean_denom = xarray_reduce( + da_denom, + ds_Sv[x_var], + func="sum", + expected_groups=(dist_interval), + isbin=[True], + method=method, + ) + + h_mean_num = xarray_reduce( + ds_Sv[range_var].diff(dim=range_dim, label="lower"), # use lower end label after diff + ds_Sv["channel"], + ds_Sv[x_var], + ds_Sv[range_var].isel(**{range_dim: slice(0, -1)}), + func="sum", + expected_groups=(None, dist_interval, range_interval), + isbin=[False, True, True], + method=method, + ) + h_mean = h_mean_num / h_mean_denom + + # Combine to compute NASC and name it + raw_NASC = sv_mean * h_mean * 4 * np.pi * 1852**2 + raw_NASC.name = "sv" + + return xr.merge([ds, ds_ping_time, raw_NASC]) + + +def get_distance_from_latlon(ds_Sv): + # Get distance from lat/lon in nautical miles + df_pos = ds_Sv["latitude"].to_dataframe().join(ds_Sv["longitude"].to_dataframe()) + df_pos["latitude_prev"] = df_pos["latitude"].shift(-1) + df_pos["longitude_prev"] = df_pos["longitude"].shift(-1) + df_latlon_nonan = df_pos.dropna().copy() + df_latlon_nonan["dist"] = df_latlon_nonan.apply( + lambda x: distance.distance( + (x["latitude"], x["longitude"]), + (x["latitude_prev"], x["longitude_prev"]), + ).nm, + axis=1, + ) + df_pos = df_pos.join(df_latlon_nonan["dist"], how="left") + df_pos["dist"] = df_pos["dist"].cumsum() + df_pos["dist"] = df_pos["dist"].ffill().bfill() + + return df_pos["dist"].values + + +def _set_var_attrs(da, long_name, units, round_digits, standard_name=None): + """ + Attach common attributes to DataArray variable. + + Parameters + ---------- + da : xr.DataArray + DataArray that will receive attributes + long_name : str + Variable long_name attribute + units : str + Variable units attribute + round_digits : int + Number of digits after decimal point for rounding off actual_range + standard_name : str + CF standard_name, if available (optional) + """ + + da.attrs = { + "long_name": long_name, + "units": units, + "actual_range": [ + round(float(da.min().values), round_digits), + round(float(da.max().values), round_digits), + ], + } + if standard_name: + da.attrs["standard_name"] = standard_name + + +def _set_MVBS_attrs(ds): + """ + Attach common attributes. + + Parameters + ---------- + ds : xr.Dataset + dataset containing MVBS + """ + ds["ping_time"].attrs = { + "long_name": "Ping time", + "standard_name": "time", + "axis": "T", + } + + _set_var_attrs( + ds["Sv"], + long_name="Mean volume backscattering strength (MVBS, mean Sv re 1 m-1)", + units="dB", + round_digits=2, + ) + + +def _convert_bins_to_interval_index( + bins: list, closed: Literal["left", "right"] = "left" +) -> pd.IntervalIndex: + """ + Convert bins to sorted pandas IntervalIndex + with specified closed end + + Parameters + ---------- + bins : list + The bin edges + closed : {'left', 'right'}, default 'left' + Which side of bin interval is closed + + Returns + ------- + pd.IntervalIndex + The resulting IntervalIndex + """ + return pd.IntervalIndex.from_breaks(bins, closed=closed).sort_values() + + +def _parse_x_bin(x_bin: str, x_label="range_bin") -> float: + """ + Parses x bin string, check unit, + and returns x bin in the specified unit. + + Currently only available for: + range_bin: meters (m) + dist_bin: nautical miles (nmi) + + Parameters + ---------- + x_bin : str + X bin string, e.g., "0.5nmi" or "10m" + x_label : {"range_bin", "dist_bin"}, default "range_bin" + The label of the x bin. + + Returns + ------- + float + The resulting x bin value in x unit, + based on label. + + Raises + ------ + ValueError + If the x bin string doesn't include unit value. + TypeError + If the x bin is not a type string. + KeyError + If the x label is not one of the available labels. + """ + x_bin_map = { + "range_bin": { + "name": "Range bin", + "unit": "m", + "ex": "10m", + "unit_label": "meters", + "pattern": r"([\d+]*[.,]{0,1}[\d+]*)(\s+)?(m)", + }, + "dist_bin": { + "name": "Distance bin", + "unit": "nmi", + "ex": "0.5nmi", + "unit_label": "nautical miles", + "pattern": r"([\d+]*[.,]{0,1}[\d+]*)(\s+)?(nmi)", + }, + } + x_bin_info = x_bin_map.get(x_label, None) + + if x_bin_info is None: + raise KeyError(f"x_label must be one of {list(x_bin_map.keys())}") + + # First check for bin types + if not isinstance(x_bin, str): + raise TypeError("'x_bin' must be a string") + # normalize to lower case + # for x_bin + x_bin = x_bin.strip().lower() + # Only matches meters + match_obj = re.match(x_bin_info["pattern"], x_bin) + + # Do some checks on x_bin inputs + if match_obj is None: + # This shouldn't be other units + raise ValueError( + f"{x_bin_info['name']} must be in " + f"{x_bin_info['unit_label']} " + f"(e.g., '{x_bin_info['ex']}')." + ) + + # Convert back to float + x_bin = float(match_obj.group(1)) + return x_bin + + +def _setup_and_validate( + ds_Sv: xr.Dataset, + range_var: Literal["echo_range", "depth"] = "echo_range", + range_bin: Optional[str] = None, + closed: Literal["left", "right"] = "left", + required_data_vars: Optional[list] = None, +) -> Tuple[xr.Dataset, float]: + """ + Setup and validate shared arguments for + ``compute_X`` functions. + + For now this is only used by ``compute_MVBS`` and ``compute_NASC``. + + Parameters + ---------- + ds_Sv : xr.Dataset + Sv dataset + range_var : {'echo_range', 'depth'}, default 'echo_range' + The variable to use for range binning. + Must be one of ``echo_range`` or ``depth``. + Note that ``depth`` is only available if the input dataset contains + ``depth`` as a data variable. + range_bin : str, default None + bin size along ``echo_range`` or ``depth`` in meters. + closed: {'left', 'right'}, default 'left' + Which side of bin interval is closed. + required_data_vars : list, optional + List of required data variables in ds_Sv. + If None, defaults to empty list. + + Returns + ------- + ds_Sv : xr.Dataset + Modified Sv dataset + range_bin : float + The range bin value in meters + """ + + # Check if range_var is valid + if range_var not in ["echo_range", "depth"]: + raise ValueError("range_var must be one of 'echo_range' or 'depth'.") + + # Set to default empty list if None + if required_data_vars is None: + required_data_vars = [] + + # Check if required data variables exists in ds_Sv + # Use set to ensure no duplicates + required_data_vars = set(required_data_vars + [range_var]) + if not all([var in ds_Sv.variables for var in required_data_vars]): + raise ValueError( + "Input Sv dataset must contain all of " f"the following variables: {required_data_vars}" + ) + + # Check if range_bin is a string + if not isinstance(range_bin, str): + raise TypeError("range_bin must be a string") + + # Parse the range_bin string and convert to float + range_bin = _parse_x_bin(range_bin, "range_bin") + + # Check for closed values + if closed not in ["right", "left"]: + raise ValueError(f"{closed} is not a valid option. Options are 'left' or 'right'.") + + # Clean up filenames dimension if it exists + # not needed here + if "filenames" in ds_Sv.dims: + ds_Sv = ds_Sv.drop_dims("filenames") + + return ds_Sv, range_bin + + +def _get_reduced_positions( + ds_Sv: xr.Dataset, + ds_X: xr.Dataset, + X: Literal["MVBS", "NASC"], + x_interval: Union[pd.IntervalIndex, np.ndarray], +) -> xr.Dataset: + """Helper function to get reduced positions + + Parameters + ---------- + ds_Sv : xr.Dataset + The input Sv dataset + ds_X : xr.Dataset + The input X dataset, either ``ds_MVBS`` or ``ds_NASC`` + X : {'MVBS', 'NASC'} + The type of X dataset + x_interval : pd.IntervalIndex or np.ndarray + 1D array or interval index representing + the bins required for X dataset. + + MVBS: ``ping_time`` + NASC: ``distance_nmi`` + + Returns + ------- + xr.Dataset + The X dataset with reduced position variables + such as latitude and longitude + """ + # Get positions if exists + # otherwise return the input ds_X + if all(v in ds_Sv for v in POSITION_VARIABLES): + x_var = x_dim = "ping_time" + if X == "NASC": + x_var = "distance_nmi" + x_dim = "distance" + + ds_Pos = xarray_reduce( + ds_Sv[POSITION_VARIABLES], + ds_Sv[x_var], + func="nanmean", + expected_groups=(x_interval), + isbin=True, + method="map-reduce", + ) + + for var in POSITION_VARIABLES: + ds_X[var] = ([x_dim], ds_Pos[var].data, ds_Sv[var].attrs) + return ds_X + + +def _groupby_x_along_channels( + ds_Sv: xr.Dataset, + range_interval: Union[pd.IntervalIndex, np.ndarray], + x_interval: Union[pd.IntervalIndex, np.ndarray], + x_var: Literal["ping_time", "distance_nmi"] = "ping_time", + range_var: Literal["echo_range", "depth"] = "echo_range", + method: str = "map-reduce", + **flox_kwargs, +) -> xr.Dataset: + """ + Perform groupby of ``ds_Sv`` along each channel for the given + intervals to get the 'sv' mean value. + + Parameters + ---------- + ds_Sv : xr.Dataset + A Dataset containing ``Sv`` and other variables, + depending on computation performed. + + For MVBS computation, this must contain ``Sv`` and ``echo_range`` data + with coordinates ``channel``, ``ping_time``, and ``range_sample`` + at bare minimum. + Or this can contain ``Sv`` and ``depth`` data with similar coordinates. + + For NASC computatioon this must contain ``Sv`` and ``depth`` data + with coordinates ``channel``, ``distance_nmi``, and ``range_sample``. + range_interval: pd.IntervalIndex or np.ndarray + 1D array or interval index representing + the bins required for ``range_var`` + x_interval : pd.IntervalIndex or np.ndarray + 1D array or interval index representing + the bins required for ``ping_time`` or ``distance_nmi``. + x_var : {'ping_time', 'distance_nmi'}, default 'ping_time' + The variable to use for x binning. This will determine + if computation is for MVBS or NASC. + range_var: {'echo_range', 'depth'}, default 'echo_range' + The variable to use for range binning. + Either ``echo_range`` or ``depth``. + + **For NASC, this must be ``depth``.** + method: str + The flox strategy for reduction of dask arrays only. + See flox `documentation `_ + for more details. + **flox_kwargs + Additional keyword arguments to be passed + to flox reduction function. + + Returns + ------- + xr.Dataset + The MVBS or NASC dataset of the input ``ds_Sv`` for all channels + """ + # Check if x_var is valid, currently only support + # ping_time and distance_nmi, which indicates + # either a MVBS or NASC computation + if x_var not in ["ping_time", "distance_nmi"]: + raise ValueError("x_var must be 'ping_time' or 'distance_nmi'") + + # Set correct range_var just in case + if x_var == "distance_nmi" and range_var != "depth": + logger.warning("x_var is 'distance_nmi', setting range_var to 'depth'") + range_var = "depth" + + # average should be done in linear domain + sv = ds_Sv["Sv"].pipe(_log2lin) + + # reduce along ping_time or distance_nmi + # and echo_range or depth + # by binning and averaging + sv_mean = xarray_reduce( + sv, + ds_Sv["channel"], + ds_Sv[x_var], + ds_Sv[range_var], + func="nanmean", + expected_groups=(None, x_interval, range_interval), + isbin=[False, True, True], + method=method, + **flox_kwargs, + ) + return sv_mean diff --git a/echopype/tests/commongrid/conftest.py b/echopype/tests/commongrid/conftest.py index 27b166a03..9c30c504f 100644 --- a/echopype/tests/commongrid/conftest.py +++ b/echopype/tests/commongrid/conftest.py @@ -3,8 +3,12 @@ import xarray as xr import numpy as np import pandas as pd +from typing import Literal from echopype.consolidate import add_depth +from echopype.commongrid.utils import ( + get_distance_from_latlon, +) import echopype as ep @@ -87,15 +91,47 @@ def mock_Sv_sample(mock_parameters): return np.tile(depth_data, (channel_len, ping_time_len, 1)) +def _add_latlon_depth(ds_Sv, latlon=False, depth=False, lat_attrs={}, lon_attrs={}, depth_offset=0): + """Adds lat lon variables and/or depth to ds_Sv""" + if latlon: + # Add lat lon + n_pings = ds_Sv.ping_time.shape[0] + latitude = np.linspace(42.48916859, 42.49071833, num=n_pings) + longitude = np.linspace(-124.88296688, -124.81919229, num=n_pings) + + ds_Sv["latitude"] = (["ping_time"], latitude, lat_attrs) + ds_Sv["longitude"] = (["ping_time"], longitude, lon_attrs) + + # Need processing level code for compute MVBS to work! + ds_Sv.attrs["processing_level"] = "Level 2A" + + if depth: + # Add depth + ds_Sv = ds_Sv.pipe(add_depth, depth_offset=depth_offset) + return ds_Sv + + @pytest.fixture -def mock_Sv_dataset_regular(mock_parameters, mock_Sv_sample): +def mock_Sv_dataset_regular(mock_parameters, mock_Sv_sample, lat_attrs, lon_attrs, depth_offset): ds_Sv = _gen_Sv_echo_range_regular(**mock_parameters, ping_time_jitter_max_ms=0) ds_Sv["Sv"].data = mock_Sv_sample + + # Add latlon and depth + ds_Sv = _add_latlon_depth( + ds_Sv, + latlon=True, + depth=True, + lat_attrs=lat_attrs, + lon_attrs=lon_attrs, + depth_offset=depth_offset, + ) return ds_Sv @pytest.fixture -def mock_Sv_dataset_irregular(mock_parameters, mock_Sv_sample, mock_nan_ilocs): +def mock_Sv_dataset_irregular( + mock_parameters, mock_Sv_sample, mock_nan_ilocs, lat_attrs, lon_attrs, depth_offset +): depth_interval = [0.5, 0.32, 0.2] depth_ping_time_len = [2, 3, 5] ds_Sv = _gen_Sv_echo_range_irregular( @@ -105,6 +141,17 @@ def mock_Sv_dataset_irregular(mock_parameters, mock_Sv_sample, mock_nan_ilocs): ping_time_jitter_max_ms=30, # Added jitter to ping_time ) ds_Sv["Sv"].data = mock_Sv_sample + + # Add latlon and depth + ds_Sv = _add_latlon_depth( + ds_Sv, + latlon=True, + depth=True, + lat_attrs=lat_attrs, + lon_attrs=lon_attrs, + depth_offset=depth_offset, + ) + # Sprinkle nans around echo_range for pos in mock_nan_ilocs: ds_Sv["echo_range"][pos] = np.nan @@ -129,13 +176,29 @@ def mock_mvbs_array_regular(mock_Sv_dataset_regular, mock_mvbs_inputs, mock_para ping_time_bin = mock_mvbs_inputs["ping_time_bin"] range_bin = mock_mvbs_inputs["range_meter_bin"] channel_len = mock_parameters["channel_len"] - expected_mvbs_val = _get_expected_mvbs_val( - ds_Sv, ping_time_bin, range_bin, channel_len - ) + expected_mvbs_val = _get_expected_mvbs_val(ds_Sv, ping_time_bin, range_bin, channel_len) return expected_mvbs_val +@pytest.fixture +def mock_nasc_array_regular(mock_Sv_dataset_regular, mock_parameters): + """ + Mock Sv sample result from compute_MVBS + + Dimension: (2, 3, 5) + Ping time bin: 1s + Range bin: 2m + """ + ds_Sv = mock_Sv_dataset_regular + dist_bin = 0.5 + range_bin = 2 + channel_len = mock_parameters["channel_len"] + expected_nasc_val = _get_expected_nasc_val(ds_Sv, dist_bin, range_bin, channel_len) + + return expected_nasc_val + + @pytest.fixture def mock_mvbs_array_irregular(mock_Sv_dataset_irregular, mock_mvbs_inputs, mock_parameters): """ @@ -149,13 +212,29 @@ def mock_mvbs_array_irregular(mock_Sv_dataset_irregular, mock_mvbs_inputs, mock_ ping_time_bin = mock_mvbs_inputs["ping_time_bin"] range_bin = mock_mvbs_inputs["range_meter_bin"] channel_len = mock_parameters["channel_len"] - expected_mvbs_val = _get_expected_mvbs_val( - ds_Sv, ping_time_bin, range_bin, channel_len - ) + expected_mvbs_val = _get_expected_mvbs_val(ds_Sv, ping_time_bin, range_bin, channel_len) return expected_mvbs_val +@pytest.fixture +def mock_nasc_array_irregular(mock_Sv_dataset_irregular, mock_parameters): + """ + Mock Sv sample result from compute_MVBS + + Dimension: (2, 3, 5) + Ping time bin: 1s + Range bin: 2m + """ + ds_Sv = mock_Sv_dataset_irregular + dist_bin = 0.5 + range_bin = 2 + channel_len = mock_parameters["channel_len"] + expected_nasc_val = _get_expected_nasc_val(ds_Sv, dist_bin, range_bin, channel_len) + + return expected_nasc_val + + @pytest.fixture( params=[ ( @@ -285,15 +364,9 @@ def depth_offset(): @pytest.fixture def ds_Sv_echo_range_regular_w_latlon(ds_Sv_echo_range_regular, lat_attrs, lon_attrs): """Sv dataset with latitude and longitude""" - n_pings = ds_Sv_echo_range_regular.ping_time.shape[0] - latitude = np.linspace(42, 43, num=n_pings) - longitude = np.linspace(-124, -125, num=n_pings) - - ds_Sv_echo_range_regular["latitude"] = (["ping_time"], latitude, lat_attrs) - ds_Sv_echo_range_regular["longitude"] = (["ping_time"], longitude, lon_attrs) - - # Need processing level code for compute MVBS to work! - ds_Sv_echo_range_regular.attrs["processing_level"] = "Level 2A" + ds_Sv_echo_range_regular = _add_latlon_depth( + ds_Sv_echo_range_regular, latlon=True, lat_attrs=lat_attrs, lon_attrs=lon_attrs + ) return ds_Sv_echo_range_regular @@ -319,14 +392,228 @@ def ds_Sv_echo_range_irregular(random_number_generator): ) +# Helper functions for NASC testing +def _create_dataset(i, sv, dim, rng): + dims = { + "range_sample": np.arange(5), + "distance_nmi": np.arange(5), + } + # Add one for other channel + sv = sv + (rng.random() * 5) + Sv = ep.utils.compute._lin2log(sv) + ds_Sv = xr.Dataset( + { + "Sv": (list(dims.keys()), Sv), + "depth": (list(dims.keys()), np.array([dim] * 5).T), + "ping_time": ( + ["distance_nmi"], + pd.date_range("2020-01-01", periods=len(dim), freq="1min"), + ), + }, + coords=dict(channel=f"ch_{i}", **dims), + ) + return ds_Sv + + +def get_NASC_echoview(ds_Sv, ch_idx=0, r0=2, r1=20): + """ + Computes NASC using echoview's method, 1 channel only, + as described in https://gist.github.com/leewujung/3b058ab63c3b897b273b33b907b62f6d + """ + r = ds_Sv.depth.isel(channel=ch_idx, distance_nmi=0).values + # get r0 and r1 indexes + # these are used to slice the desired Sv samples + r0 = np.argmin(abs(r - r0)) + r1 = np.argmin(abs(r - r1)) + + sh = np.r_[np.diff(r), np.nan] + + sv = ds_Sv["Sv"].pipe(ep.utils.compute._log2lin).isel(channel=ch_idx).values + sv_mean_echoview = np.nanmean(sv[r0:r1]) + h_mean_echoview = np.sum(sh[r0:r1]) * sv.shape[1] / sv.shape[1] + + NASC_echoview = sv_mean_echoview * h_mean_echoview * 4 * np.pi * 1852**2 + return NASC_echoview + + +@pytest.fixture +def mock_Sv_dataset_NASC(mock_parameters, random_number_generator): + channel_len = mock_parameters["channel_len"] + dim0 = np.array([0.5, 1.5, 2.5, 3.5, 9]) + sv0 = np.array( + [ + [1.0, 2.0, 3.0, 4.0, np.nan], + [6.0, 7.0, 8.0, 9.0, 10.0], + [11.0, 12.0, 13.0, 14.0, 15.0], + [16.0, 17.0, 18.0, 19.0, np.nan], + [21.0, 22.0, 23.0, 24.0, 25.0], + ] + ) + return xr.concat( + [_create_dataset(i, sv0, dim0, random_number_generator) for i in range(channel_len)], + dim="channel", + ) + + # Helper functions to generate mock Sv and MVBS dataset +def _get_expected_nasc_val( + ds_Sv: xr.Dataset, dist_bin: str, range_bin: float, channel_len: int = 2 +) -> np.ndarray: + """ + Helper functions to generate expected NASC outputs from mock Sv dataset + by brute-force looping and compute the mean + + Parameters + ---------- + ds_Sv : xr.Dataset + Mock Sv dataset + dist_bin : float + Distance bin + range_bin : float + Range bin + channel_len : int, default 2 + Number of channels + """ + # Get distance from lat/lon in nautical miles + dist_nmi = get_distance_from_latlon(ds_Sv) + ds_Sv = ds_Sv.assign_coords({"distance_nmi": ("ping_time", dist_nmi)}).swap_dims( + {"ping_time": "distance_nmi"} + ) + + # create bin information along distance_nmi + # this computes the distance max since there might NaNs in the data + dist_max = ds_Sv["distance_nmi"].max() + dist_interval = np.arange(0, dist_max + dist_bin, dist_bin) + + # create bin information for depth + # this computes the depth max since there might NaNs in the data + depth_max = ds_Sv["depth"].max() + range_interval = np.arange(0, depth_max + range_bin, range_bin) + + sv = ds_Sv["Sv"].pipe(ep.utils.compute._log2lin) + + # Compute sv mean + sv_mean = _brute_mean_reduce_3d( + sv, ds_Sv, "depth", "distance_nmi", channel_len, dist_interval, range_interval + ) + + # Calculate denominator + h_mean_denom = np.ones(len(dist_interval) - 1) * np.nan + for x_idx in range(len(dist_interval) - 1): + x_range = ds_Sv["distance_nmi"].sel( + distance_nmi=slice(dist_interval[x_idx], dist_interval[x_idx + 1]) + ) + h_mean_denom[x_idx] = float(len(x_range.data)) + + # Calculate numerator + r_diff = ds_Sv["depth"].diff(dim="range_sample", label="lower") + depth = ds_Sv["depth"].isel(**{"range_sample": slice(0, -1)}) + h_mean_num = np.ones((channel_len, len(dist_interval) - 1, len(range_interval) - 1)) * np.nan + for ch_idx in range(channel_len): + for x_idx in range(len(dist_interval) - 1): + for r_idx in range(len(range_interval) - 1): + x_range = depth.isel(channel=ch_idx).sel( + **{"distance_nmi": slice(dist_interval[x_idx], dist_interval[x_idx + 1])} + ) + r_idx_active = np.logical_and( + x_range.data >= range_interval[r_idx], + x_range.data < range_interval[r_idx + 1], + ) + r_tmp = ( + r_diff.isel(channel=ch_idx) + .sel(**{"distance_nmi": slice(dist_interval[x_idx], dist_interval[x_idx + 1])}) + .data[r_idx_active] + ) + if 0 in r_tmp.shape: + h_mean_num[ch_idx, x_idx, r_idx] = np.nan + else: + h_mean_num[ch_idx, x_idx, r_idx] = np.sum(r_tmp) + + # Compute raw NASC + h_mean_num_da = xr.DataArray(h_mean_num, dims=["channel", "distance_nmi", "depth"]) + h_mean_denom_da = xr.DataArray(h_mean_denom, dims=["distance_nmi"]) + h_mean = h_mean_num_da / h_mean_denom_da + # Combine to compute NASC + return sv_mean * h_mean * 4 * np.pi * 1852**2 + + +def _brute_mean_reduce_3d( + sv: xr.DataArray, + ds_Sv: xr.Dataset, + range_var: Literal["echo_range", "depth"], + x_var: Literal["ping_time", "distance_nmi"], + channel_len: int, + x_interval: list, + range_interval: list, +) -> np.ndarray: + """ + Perform brute force reduction on sv data for 3 Dimensions + + Parameters + ---------- + sv : xr.DataArray + A DataArray containing ``sv`` data with coordinates + ds_Sv : xr.Dataset + A Dataset containing ``Sv`` and other variables, + depending on computation performed. + + For MVBS computation, this must contain ``Sv`` and ``echo_range`` data + with coordinates ``channel``, ``ping_time``, and ``range_sample`` + at bare minimum. + Or this can contain ``Sv`` and ``depth`` data with similar coordinates. + + For NASC computatioon this must contain ``Sv`` and ``depth`` data + with coordinates ``channel``, ``distance_nmi``, and ``range_sample``. + range_var: {'echo_range', 'depth'}, default 'echo_range' + The variable to use for range binning. + Either ``echo_range`` or ``depth``. + + **For NASC, this must be ``depth``.** + x_var : {'ping_time', 'distance_nmi'}, default 'ping_time' + The variable to use for x binning. This will determine + if computation is for MVBS or NASC. + channel_len : int + Number of channels + x_interval : list + 1D array or interval index representing + the bins required for ``ping_time`` or ``distance_nmi``. + range_interval : list + 1D array or interval index representing + the bins required for ``range_var`` + """ + mean_vals = np.ones((channel_len, len(x_interval) - 1, len(range_interval) - 1)) * np.nan + + for ch_idx in range(channel_len): + for x_idx in range(len(x_interval) - 1): + for r_idx in range(len(range_interval) - 1): + x_range = ( + ds_Sv[range_var] + .isel(channel=ch_idx) + .sel(**{x_var: slice(x_interval[x_idx], x_interval[x_idx + 1])}) + ) + r_idx_active = np.logical_and( + x_range.data >= range_interval[r_idx], + x_range.data < range_interval[r_idx + 1], + ) + sv_tmp = ( + sv.isel(channel=ch_idx) + .sel(**{x_var: slice(x_interval[x_idx], x_interval[x_idx + 1])}) + .data[r_idx_active] + ) + if 0 in sv_tmp.shape: + mean_vals[ch_idx, x_idx, r_idx] = np.nan + else: + mean_vals[ch_idx, x_idx, r_idx] = np.mean(sv_tmp) + return mean_vals + + def _get_expected_mvbs_val( ds_Sv: xr.Dataset, ping_time_bin: str, range_bin: float, channel_len: int = 2 ) -> np.ndarray: """ Helper functions to generate expected MVBS outputs from mock Sv dataset by brute-force looping and compute the mean - + Parameters ---------- ds_Sv : xr.Dataset @@ -350,30 +637,13 @@ def _get_expected_mvbs_val( # create bin information for echo_range # this computes the echo range max since there might NaNs in the data echo_range_max = ds_Sv["echo_range"].max() - range_interval = np.arange(0, echo_range_max + 2, range_bin) + range_interval = np.arange(0, echo_range_max + range_bin, range_bin) sv = ds_Sv["Sv"].pipe(ep.utils.compute._log2lin) - expected_mvbs_val = np.ones((2, len(ping_interval) - 1, len(range_interval) - 1)) * np.nan - - for ch_idx in range(channel_len): - for p_idx in range(len(ping_interval) - 1): - for r_idx in range(len(range_interval) - 1): - echo_range = ( - ds_Sv['echo_range'] - .isel(channel=ch_idx) - .sel(ping_time=slice(ping_interval[p_idx], ping_interval[p_idx+1])) - ) - r_idx_active = np.logical_and( - echo_range.data >= range_interval[r_idx], - echo_range.data < range_interval[r_idx+1] - ) - sv_tmp = sv.isel(channel=ch_idx).sel( - ping_time=slice(ping_interval[p_idx], ping_interval[p_idx+1])).data[r_idx_active] - if 0 in sv_tmp.shape: - expected_mvbs_val[ch_idx, p_idx, r_idx] = np.nan - else: - expected_mvbs_val[ch_idx, p_idx, r_idx] = np.mean(sv_tmp) + expected_mvbs_val = _brute_mean_reduce_3d( + sv, ds_Sv, "echo_range", "ping_time", channel_len, ping_interval, range_interval + ) return ep.utils.compute._lin2log(expected_mvbs_val) @@ -400,7 +670,7 @@ def _gen_Sv_echo_range_regular( Generate a Sv dataset with uniform echo_range across all ping_time. ping_time_jitter_max_ms controlled jitter in milliseconds in ping_time. - + Parameters ------------ channel_len @@ -457,7 +727,7 @@ def _gen_Sv_echo_range_irregular( Generate a Sv dataset with uniform echo_range across all ping_time. ping_time_jitter_max_ms controlled jitter in milliseconds in ping_time. - + Parameters ------------ channel_len @@ -468,7 +738,7 @@ def _gen_Sv_echo_range_irregular( depth intervals, may have multiple values depth_ping_time_len the number of pings to use each of the depth_interval - for example, with depth_interval=[0.5, 0.32, 0.13] + for example, with depth_interval=[0.5, 0.32, 0.13] and depth_ping_time_len=[100, 300, 200], the first 100 pings have echo_range with depth intervals of 0.5 m, the next 300 pings have echo_range with depth intervals of 0.32 m, diff --git a/echopype/tests/commongrid/test_api.py b/echopype/tests/commongrid/test_api.py index d618d6443..6e3a84385 100644 --- a/echopype/tests/commongrid/test_api.py +++ b/echopype/tests/commongrid/test_api.py @@ -1,6 +1,17 @@ import pytest -import echopype as ep + import numpy as np +import pandas as pd +from flox.xarray import xarray_reduce +import echopype as ep +from echopype.consolidate import add_location, add_depth +from echopype.commongrid.utils import ( + _parse_x_bin, + _groupby_x_along_channels, + get_distance_from_latlon, + compute_raw_NASC +) +from echopype.tests.commongrid.conftest import get_NASC_echoview # Utilities Tests @@ -32,11 +43,126 @@ def test__parse_x_bin(x_bin, x_label, expected_result): else: assert ep.commongrid.api._parse_x_bin(x_bin, x_label) == expected_result + +@pytest.mark.unit +@pytest.mark.parametrize( + ["range_var", "lat_lon"], [("depth", False), ("echo_range", False)] +) +def test__groupby_x_along_channels(request, range_var, lat_lon): + """Testing the underlying function of compute_MVBS and compute_NASC""" + range_bin = 20 + ping_time_bin = "20S" + method = "map-reduce" + + flox_kwargs = {"reindex": True} + + # Retrieve the correct dataset + if range_var == "depth": + ds_Sv = request.getfixturevalue("ds_Sv_echo_range_regular_w_depth") + else: + ds_Sv = request.getfixturevalue("ds_Sv_echo_range_regular") + + # compute range interval + echo_range_max = ds_Sv[range_var].max() + range_interval = np.arange(0, echo_range_max + range_bin, range_bin) + + # create bin information needed for ping_time + d_index = ( + ds_Sv["ping_time"] + .resample(ping_time=ping_time_bin, skipna=True) + .asfreq() + .indexes["ping_time"] + ) + ping_interval = d_index.union([d_index[-1] + pd.Timedelta(ping_time_bin)]) + + sv_mean = _groupby_x_along_channels( + ds_Sv, + range_interval, + x_interval=ping_interval, + x_var="ping_time", + range_var=range_var, + method=method, + **flox_kwargs + ) + + # Check that the range_var is in the dimension + assert f"{range_var}_bins" in sv_mean.dims + + # NASC Tests @pytest.mark.integration -@pytest.mark.skip(reason="NASC is not implemented yet") -def test_compute_NASC(test_data_samples): - pass +@pytest.mark.parametrize("compute_mvbs", [True, False]) +def test_compute_NASC(request, test_data_samples, compute_mvbs): + if any(request.node.callspec.id.startswith(id) for id in ["ek80", "azfp"]): + pytest.skip("Skipping NASC test for ek80 and azfp, no data available") + + ( + filepath, + sonar_model, + azfp_xml_path, + range_kwargs, + ) = test_data_samples + ed = ep.open_raw(filepath, sonar_model, azfp_xml_path) + if ed.sonar_model.lower() == "azfp": + avg_temperature = ed["Environment"]["temperature"].values.mean() + env_params = { + "temperature": avg_temperature, + "salinity": 27.9, + "pressure": 59, + } + range_kwargs["env_params"] = env_params + if "azfp_cal_type" in range_kwargs: + range_kwargs.pop("azfp_cal_type") + ds_Sv = ep.calibrate.compute_Sv(ed, **range_kwargs) + + # Adds location and depth information + ds_Sv = ds_Sv.pipe(add_location, ed).pipe( + add_depth, depth_offset=ed["Platform"].water_level.values + ) + + if compute_mvbs: + range_bin = "2m" + ping_time_bin = "1s" + + ds_Sv = ds_Sv.pipe( + ep.commongrid.compute_MVBS, + range_var="depth", + range_bin=range_bin, + ping_time_bin=ping_time_bin, + ) + + dist_bin = "0.5nmi" + range_bin = "10m" + + ds_NASC = ep.commongrid.compute_NASC(ds_Sv, range_bin=range_bin, dist_bin=dist_bin) + assert ds_NASC is not None + + dist_nmi = get_distance_from_latlon(ds_Sv) + + # Check dimensions + dist_bin = _parse_x_bin(dist_bin, "dist_bin") + range_bin = _parse_x_bin(range_bin) + da_NASC = ds_NASC["NASC"] + assert da_NASC.dims == ("channel", "distance", "depth") + assert np.all(ds_NASC["channel"].values == ds_Sv["channel"].values) + assert da_NASC["depth"].size == np.ceil(ds_Sv["depth"].max() / range_bin) + assert da_NASC["distance"].size == np.ceil(dist_nmi.max() / dist_bin) + + +@pytest.mark.unit +def test_simple_NASC_Echoview_values(mock_Sv_dataset_NASC): + dist_interval = np.array([-5, 10]) + range_interval = np.array([1, 5]) + raw_NASC = compute_raw_NASC( + mock_Sv_dataset_NASC, + range_interval, + dist_interval, + ) + for ch_idx, _ in enumerate(raw_NASC.channel): + NASC_echoview = get_NASC_echoview(mock_Sv_dataset_NASC, ch_idx) + assert np.allclose( + raw_NASC.sv.isel(channel=ch_idx)[0, 0], NASC_echoview, atol=1e-10, rtol=1e-10 + ) # MVBS Tests @@ -123,7 +249,7 @@ def test_compute_MVBS_invalid_range_var(ds_Sv_echo_range_regular, range_var): ep.commongrid.compute_MVBS(ds_Sv_echo_range_regular, range_var=range_var) elif range_var == "depth": with pytest.raises( - ValueError, match=f"range_var '{range_var}' does not exist in the input dataset." + ValueError, match=r"Input Sv dataset must contain all of the following variables" ): ep.commongrid.compute_MVBS(ds_Sv_echo_range_regular, range_var=range_var) else: @@ -287,3 +413,36 @@ def _parse_nans(mvbs, ds_Sv) -> np.ndarray: # Ensures that the computation of MVBS takes doesn't take into account NaN values # that are sporadically placed in the echo_range values assert np.array_equal(np.isnan(ds_MVBS.Sv.values), expected_outputs) + + +@pytest.mark.integration +@pytest.mark.parametrize( + ("er_type"), + [ + ("regular"), + ("irregular"), + ], +) +def test_compute_NASC_values(request, er_type): + """Tests for the values of compute_NASC on regular and irregular data.""" + + range_bin = "2m" + dist_bin = "0.5nmi" + + if er_type == "regular": + ds_Sv = request.getfixturevalue("mock_Sv_dataset_regular") + expected_nasc = request.getfixturevalue("mock_nasc_array_regular") + else: + # Mock irregular dataset contains jitter + # and NaN values in the bottom echo_range + ds_Sv = request.getfixturevalue("mock_Sv_dataset_irregular") + expected_nasc = request.getfixturevalue("mock_nasc_array_irregular") + + ds_NASC = ep.commongrid.compute_NASC(ds_Sv, range_bin=range_bin, dist_bin=dist_bin) + + assert ds_NASC.NASC.shape == expected_nasc.shape + # Floating digits need to check with all close not equal + # Compare the values of the MVBS array with the expected values + assert np.allclose( + ds_NASC.NASC.values, expected_nasc.values, atol=1e-10, rtol=1e-10, equal_nan=True + ) diff --git a/echopype/tests/commongrid/test_mvbs.py b/echopype/tests/commongrid/test_mvbs.py deleted file mode 100644 index 78a77be0a..000000000 --- a/echopype/tests/commongrid/test_mvbs.py +++ /dev/null @@ -1,67 +0,0 @@ -import numpy as np -import pandas as pd -import pytest -from echopype.commongrid.mvbs import get_MVBS_along_channels -from echopype.consolidate.api import POSITION_VARIABLES -from flox.xarray import xarray_reduce - -@pytest.mark.unit -@pytest.mark.parametrize(["range_var", "lat_lon"], [("depth", False), ("echo_range", True), ("echo_range", False)]) -def test_get_MVBS_along_channels(request, range_var, lat_lon): - """Testing the underlying function of compute_MVBS""" - range_bin = 20 - ping_time_bin = "20S" - method = "map-reduce" - - flox_kwargs = { - "reindex": True - } - - # Retrieve the correct dataset - if range_var == "depth": - ds_Sv = request.getfixturevalue("ds_Sv_echo_range_regular_w_depth") - elif range_var == "echo_range" and lat_lon is True: - ds_Sv = request.getfixturevalue("ds_Sv_echo_range_regular_w_latlon") - else: - ds_Sv = request.getfixturevalue("ds_Sv_echo_range_regular") - - # compute range interval - echo_range_max = ds_Sv[range_var].max() - range_interval = np.arange(0, echo_range_max + range_bin, range_bin) - - # create bin information needed for ping_time - d_index = ( - ds_Sv["ping_time"] - .resample(ping_time=ping_time_bin, skipna=True) - .asfreq() - .indexes["ping_time"] - ) - ping_interval = d_index.union([d_index[-1] + pd.Timedelta(ping_time_bin)]) - - raw_MVBS = get_MVBS_along_channels( - ds_Sv, range_interval, ping_interval, - range_var=range_var, method=method, **flox_kwargs - ) - - # Check that the range_var is in the dimension - assert f"{range_var}_bins" in raw_MVBS.dims - - # When it's echo_range and lat_lon, the dataset should have positions - if range_var == "echo_range" and lat_lon is True: - assert raw_MVBS.attrs["has_positions"] is True - assert all(v in raw_MVBS for v in POSITION_VARIABLES) - - # Compute xarray reduce manually for this - expected_Pos = xarray_reduce( - ds_Sv[POSITION_VARIABLES], - ds_Sv["ping_time"], - func="nanmean", - expected_groups=(ping_interval), - isbin=True, - method=method, - ) - - for v in POSITION_VARIABLES: - assert np.array_equal(raw_MVBS[v].data, expected_Pos[v].data) - else: - assert raw_MVBS.attrs["has_positions"] is False diff --git a/echopype/tests/commongrid/test_nasc.py b/echopype/tests/commongrid/test_nasc.py deleted file mode 100644 index 0430f99c1..000000000 --- a/echopype/tests/commongrid/test_nasc.py +++ /dev/null @@ -1,38 +0,0 @@ -import pytest - -import numpy as np - -from echopype import open_raw -from echopype.calibrate import compute_Sv -# from echopype.commongrid import compute_NASC -from echopype.commongrid.nasc import ( - get_distance_from_latlon, - get_depth_bin_info, - get_dist_bin_info, - get_distance_from_latlon, -) -from echopype.consolidate import add_location, add_depth - - -@pytest.fixture -def ek60_path(test_path): - return test_path['EK60'] - - -# def test_compute_NASC(ek60_path): -# raw_path = ek60_path / "ncei-wcsd/Summer2017-D20170620-T011027.raw" - -# ed = open_raw(raw_path, sonar_model="EK60") -# ds_Sv = add_depth(add_location(compute_Sv(ed), ed, nmea_sentence="GGA")) -# cell_dist = 0.1 -# cell_depth = 20 -# ds_NASC = compute_NASC(ds_Sv, cell_dist, cell_depth) - -# dist_nmi = get_distance_from_latlon(ds_Sv) - -# # Check dimensions -# da_NASC = ds_NASC["NASC"] -# assert da_NASC.dims == ("channel", "distance", "depth") -# assert np.all(ds_NASC["channel"].values == ds_Sv["channel"].values) -# assert da_NASC["depth"].size == np.ceil(ds_Sv["depth"].max() / cell_depth) -# assert da_NASC["distance"].size == np.ceil(dist_nmi.max() / cell_dist)