diff --git a/echopype/mask/api.py b/echopype/mask/api.py index a0c2704af..770088e9d 100644 --- a/echopype/mask/api.py +++ b/echopype/mask/api.py @@ -561,10 +561,9 @@ def get_seabed_mask( a Dataset. This input must correspond to a Dataset that has the coordinate ``channel`` and variables ``frequency_nominal`` and ``Sv``. desired_channel: str - channel to generate the mask for - desired_freuency: int - desired frequency, in case the channel isn't directly specified - method: str with either "ariza", "experimental", "blackwell_mod", - "blackwell", "deltaSv", "maxSv" - based on the preferred method for seabed mask generation + desired_frequency: int - desired frequency, in case the channel isn't directly specified + method: str with either "ariza", "blackwell", based on the preferred method + for seabed mask generation Returns ------- xr.DataArray @@ -574,8 +573,7 @@ def get_seabed_mask( Raises ------ ValueError - If neither "ariza", "experimental", "blackwell_mod", - "blackwell", "deltaSv", "maxSv" are given + If neither "ariza", "blackwell" are given Notes ----- @@ -588,11 +586,7 @@ def get_seabed_mask( source_Sv = get_dataset(source_Sv) mask_map = { "ariza": seabed._ariza, - "experimental": seabed._experimental, "blackwell": seabed._blackwell, - "blackwell_mod": seabed._blackwell_mod, - "delta_Sv": seabed._deltaSv, - "max_Sv": seabed._maxSv, } if method not in mask_map.keys(): @@ -621,9 +615,8 @@ def get_seabed_mask_multichannel( else it specifies the path to a zarr or netcdf file containing a Dataset. This input must correspond to a Dataset that has the coordinate ``channel`` and variables ``frequency_nominal`` and ``Sv``. - method: str with either "ariza", "experimental", "blackwell_mod", - "blackwell", "deltaSv", "maxSv" - based on the preferred method for seabed mask generation + method: str with either "ariza", "blackwell" + based on the preferred method for seabed mask generation Returns ------- xr.DataArray @@ -633,8 +626,7 @@ def get_seabed_mask_multichannel( Raises ------ ValueError - If neither "ariza", "experimental", "blackwell_mod", - "blackwell", "deltaSv", "maxSv" are given + If neither "ariza" or "blackwell" are given Notes ----- diff --git a/echopype/mask/seabed.py b/echopype/mask/seabed.py index 0d83eec7c..f1d611543 100644 --- a/echopype/mask/seabed.py +++ b/echopype/mask/seabed.py @@ -33,20 +33,18 @@ import warnings +import dask.array as da import numpy as np -import scipy.ndimage as nd_img import xarray as xr -from scipy.signal import convolve2d -from skimage.measure import label -from skimage.morphology import dilation, erosion, remove_small_objects, square +from dask_image.ndfilters import convolve +from dask_image.ndmeasure import label +from dask_image.ndmorph import binary_dilation, binary_erosion -from ..utils.mask_transformation import lin, log +from ..utils.mask_transformation_xr import dask_nanpercentile, line_to_square MAX_SV_DEFAULT_PARAMS = {"r0": 10, "r1": 1000, "roff": 0, "thr": (-40, -60)} DELTA_SV_DEFAULT_PARAMS = {"r0": 10, "r1": 1000, "roff": 0, "thr": 20} BLACKWELL_DEFAULT_PARAMS = { - "theta": None, - "phi": None, "r0": 10, "r1": 1000, "tSv": -75, @@ -56,8 +54,6 @@ "wphi": 52, } BLACKWELL_MOD_DEFAULT_PARAMS = { - "theta": None, - "phi": None, "r0": 10, "r1": 1000, "tSv": -75, @@ -84,316 +80,309 @@ "roff": 0, "thr": -40, "ec": 1, - "ek": (1, 3), - "dc": 10, - "dk": (3, 7), + "ek": (3, 3), + "dc": 3, + "dk": (3, 3), } +ARIZA_SPIKE_DEFAULT_PARAMS = { + "r0": 10, + "r1": 1000, + "roff": 0, + "thr": (-40, -40), + "ec": 1, + "ek": (3, 3), + "dc": 3, + "dk": (3, 3), + "maximum_spike": 200, +} -def _maxSv(Sv_ds: xr.DataArray, desired_channel: str, parameters: dict = MAX_SV_DEFAULT_PARAMS): +ARIZA_EXPERIMENTAL_DEFAULT_PARAMS = { + "r0": 10, + "r1": 1000, + "roff": 0, + "thr": (-40, -70), + "ec": 1, + "ek": (3, 3), + "dc": 3, + "dk": (3, 3), +} + + +def _get_seabed_range(mask: xr.DataArray): """ - Initially detects the seabed as the ping sample with the strongest Sv value, - as long as it exceeds a dB threshold. Then it searches up along the ping - until Sv falls below a secondary (lower) dB threshold, where the final - seabed is set. + Given a seabed mask, returns the range_sample depth of the seabed Args: - Sv_ds (xr.DataArray): xr.DataArray with Sv data for multiple channels (dB) - desired_channel(str): Name of the desired frequency channel - parameters: parameter dict, should contain: - r0 (int): minimum range below which the search will be performed (m). - r1 (int): maximum range above which the search will be performed (m). - roff (int): seabed range offset (m). - thr (tuple): 2 integers with 1st and 2nd Sv threshold (dB). + mask (xr.DataArray): seabed mask Returns: - xr.DataArray: A DataArray containing the mask for the Sv data. - Regions satisfying the thresholding criteria are True, others are False - """ - parameter_names = ["r0", "r1", "roff", "thr"] - if not all(name in parameters.keys() for name in parameter_names): - raise ValueError( - "Missing parameters - should be: " - + str(parameter_names) - + ", are: " - + str(parameters.keys()) - ) - r0 = parameters["r0"] - r1 = parameters["r1"] - roff = parameters["roff"] - thr = parameters["thr"] + xr.DataArray: a ping_time-sized array containing the range_sample seabed depth, + or max range_sample if no seabed is detected - channel_Sv = Sv_ds.sel(channel=desired_channel) - Sv = channel_Sv["Sv"].values.T - r = Sv_ds["echo_range"].values[0, 0] - - # get offset and range indexes - roff = np.nanargmin(abs(r - roff)) - r0 = np.nanargmin(abs(r - r0)) - r1 = np.nanargmin(abs(r - r1)) - - # get indexes for maximum Sv along every ping, - idx = np.int64(np.zeros(Sv.shape[1])) - idx[~np.isnan(Sv).all(axis=0)] = np.nanargmax(Sv[r0:r1, ~np.isnan(Sv).all(axis=0)], axis=0) + r0 - - # indexes with maximum Sv < main threshold are discarded (=0) - maxSv = Sv[idx, range(len(idx))] - maxSv[np.isnan(maxSv)] = -999 - idx[maxSv < thr[0]] = 0 - - # mask seabed, proceed only with acepted seabed indexes (!=0) - idx = idx - mask = np.zeros(Sv.shape, dtype=bool) - for j, i in enumerate(idx): - if i != 0: - # decrease indexes until Sv mean falls below the 2nd threshold - if np.isnan(Sv[i - 5 : i, j]).all(): - Svmean = thr[1] + 1 - else: - Svmean = log(np.nanmean(lin(Sv[i - 5 : i, j]))) - - while (Svmean > thr[1]) & (i >= 5): - i -= 1 - - # subtract range offset & mask all the way down - i -= roff - if i < 0: - i = 0 - mask[i:, j] = True - - mask = np.logical_not(mask.T) - return_mask = xr.DataArray( - mask, - dims=("ping_time", "range_sample"), - coords={"ping_time": channel_Sv.ping_time, "range_sample": channel_Sv.range_sample}, - ) - return return_mask + """ + seabed_depth = mask.argmax(dim="range_sample").compute() + seabed_depth[seabed_depth == 0] = mask.range_sample.max().item() + return seabed_depth -def _deltaSv(Sv_ds: xr.DataArray, desired_channel: str, parameters: dict = MAX_SV_DEFAULT_PARAMS): +def _morpho(mask: xr.DataArray, operation: str, c: int, k: int): """ - Examines the difference in Sv over a 2-samples moving window along - every ping, and returns the range of the first value that exceeded - a user-defined dB threshold (likely, the seabed). + Given a preexisting 1/0 mask, run erosion or dilation cycles on it to remove noise Args: - Sv_ds (xr.DataArray): xr.DataArray with Sv data for multiple channels (dB) - desired_channel(str): Name of the desired frequency channel - parameters: parameter dict, should contain: - r0 (int): minimum range below which the search will be performed (m). - r1 (int): maximum range above which the search will be performed (m). - roff (int): seabed range offset (m). - thr (int): threshold value (dB). + mask (xr.DataArray): xr.DataArray with 1 and 0 data + operation(str): dilation, erosion + c (int): number of cycles. + k (int): 2-elements tuple with vertical and horizontal dimensions + of the kernel. Returns: - xr.DataArray: A DataArray containing the mask for the Sv data. - Regions satisfying the thresholding criteria are True, others are False + xr.DataArray: A DataArray containing the denoised mask. + Regions satisfying the criteria are 1, others are 0 """ - parameter_names = ["r0", "r1", "roff", "thr"] - if not all(name in parameters.keys() for name in parameter_names): - raise ValueError( - "Missing parameters - should be: " - + str(parameter_names) - + ", are: " - + str(parameters.keys()) - ) - r0 = parameters["r0"] - r1 = parameters["r1"] - roff = parameters["roff"] - thr = parameters["thr"] + function_dict = {"dilation": binary_dilation, "erosion": binary_erosion} + + if c > 0: + dask_mask = da.asarray(mask, allow_unknown_chunksizes=False) + dask_mask.compute_chunk_sizes() + dask_mask = function_dict[operation]( + dask_mask, + structure=da.ones(shape=k, dtype=bool), + iterations=c, + ).compute() + dask_mask = da.asarray(dask_mask, allow_unknown_chunksizes=False) + dask_mask.compute() + mask.values = dask_mask.compute() + return mask + + +def _erode_dilate(mask: xr.DataArray, ec: int, ek: int, dc: int, dk: int): + """ + Given a preexisting 1/0 mask, run erosion and dilation cycles on it to remove noise - channel_Sv = Sv_ds.sel(channel=desired_channel) - Sv = channel_Sv["Sv"].values.T - r = Sv_ds["echo_range"].values[0, 0] - - # get offset as number of samples - roff = np.nanargmin(abs(r - roff)) - - # compute Sv difference along every ping - Svdiff = np.diff(Sv, axis=0) - dummy = np.zeros((1, Svdiff.shape[1])) * np.nan - Svdiff = np.r_[dummy, Svdiff] - - # get range indexes - r0 = np.nanargmin(abs(r - r0)) - r1 = np.nanargmin(abs(r - r1)) - - # get indexes for the first value above threshold, along every ping - idx = np.nanargmax((Svdiff[r0:r1, :] > thr), axis=0) + r0 - - # mask seabed, proceed only with acepted seabed indexes (!=0) - idx = idx - mask = np.zeros(Sv.shape, dtype=bool) - for j, i in enumerate(idx): - if i != 0: - # subtract range offset & mask all the way down - i -= roff - if i < 0: - i = 0 - mask[i:, j] = True - - mask = np.logical_not(mask.T) - return_mask = xr.DataArray( - mask, - dims=("ping_time", "range_sample"), - coords={"ping_time": channel_Sv.ping_time, "range_sample": channel_Sv.range_sample}, - ) - return return_mask + Args: + mask (xr.DataArray): xr.DataArray with 1 and 0 data + ec (int): number of erosion cycles. + ek (int): 2-elements tuple with vertical and horizontal dimensions + of the erosion kernel. + dc (int): number of dilation cycles. + dk (int): 2-elements tuple with vertical and horizontal dimensions + of the dilation kernel. + Returns: + xr.DataArray: A DataArray containing the denoised mask. + Regions satisfying the criteria are 1, others are 0 + """ + mask = _morpho(mask, "erosion", ec, ek) + mask = _morpho(mask, "dilation", dc, dk) + return mask -def _blackwell(Sv_ds: xr.DataArray, desired_channel: str, parameters: dict = MAX_SV_DEFAULT_PARAMS): + +def _create_range_mask(Sv_ds: xr.DataArray, desired_channel: str, thr: int, r0: int, r1: int): """ - Detects and mask seabed using the split-beam angle and Sv, based in - "Blackwell et al (2019), Aliased seabed detection in fisheries acoustic - data". Complete article here: https://arxiv.org/abs/1904.10736 + Return a raw threshold/range mask for a certain dataset and desired channel Args: Sv_ds (xr.DataArray): xr.DataArray with Sv data for multiple channels (dB) desired_channel(str): Name of the desired frequency channel - parameters: parameter dict, should contain: - r0 (int): minimum range below which the search will be performed (m) - r1 (int): maximum range above which the search will be performed (m) - tSv (float): Sv threshold above which seabed is pre-selected (dB) - ttheta (int): Theta threshold above which seabed is pre-selected (dB) - tphi (int): Phi threshold above which seabed is pre-selected (dB) - wtheta (int): window's size for mean square operation in Theta field - wphi (int): window's size for mean square operation in Phi field + r0 (int): minimum range below which the search will be performed (m). + r1 (int): maximum range above which the search will be performed (m). + thr (int): Sv threshold above which seabed might occur (dB). Returns: - xr.DataArray: A DataArray containing the mask for the Sv data. - Regions satisfying the thresholding criteria are True, others are False + dict: a dict containing the mask and whether or not further processing is necessary + mask (xr.DataArray): a basic range/threshold mask. + Regions satisfying the criteria are 1, others are 0 + ok (bool): should the mask be further processed or is there no data to be found? + """ - parameter_names = ["r0", "r1", "tSv", "ttheta", "tphi", "wtheta", "wphi"] - if not all(name in parameters.keys() for name in parameter_names): - raise ValueError( - "Missing parameters - should be: " - + str(parameter_names) - + ", are: " - + str(parameters.keys()) + channel_Sv = Sv_ds.sel(channel=desired_channel) + Sv = channel_Sv["Sv"] + r = channel_Sv["echo_range"][0] + + # return empty mask if searching range is outside the echosounder range + if (r0 > r[-1]) or (r1 < r[0]): + # Raise a warning to inform the user + warnings.warn( + "The searching range is outside the echosounder range. " + "A default mask with all True values is returned, " + "which won't mask any data points in the dataset." ) - r0 = parameters["r0"] - r1 = parameters["r1"] - tSv = parameters["tSv"] - ttheta = parameters["ttheta"] - tphi = parameters["tphi"] - wtheta = parameters["wtheta"] - wphi = parameters["wphi"] + mask = xr.DataArray( + np.ones_like(Sv, dtype=bool), + dims=("ping_time", "range_sample"), + coords={"ping_time": Sv.ping_time, "range_sample": Sv.range_sample}, + ) + return {"mask": mask, "ok": False, "Sv": Sv, "range": r} - channel_Sv = Sv_ds.sel(channel=desired_channel) - Sv = channel_Sv["Sv"].values.T - r = Sv_ds["echo_range"].values[0, 0] - theta = channel_Sv["angle_alongship"].values.T - phi = channel_Sv["angle_athwartship"].values.T - # apply reverse correction on theta & phi to match Blackwell's constants - theta = theta * 22 * 128 / 180 - phi = phi * 22 * 128 / 180 - - # delimit the analysis within user-defined range limits - r0 = np.nanargmin(abs(r - r0)) - r1 = np.nanargmin(abs(r - r1)) + 1 - Svchunk = Sv[r0:r1, :] - thetachunk = theta[r0:r1, :] - phichunk = phi[r0:r1, :] - - # get blur kernels with theta & phi width dimensions - ktheta = np.ones((wtheta, wtheta)) / wtheta**2 - kphi = np.ones((wphi, wphi)) / wphi**2 - - # perform mean square convolution and mask if above theta & phi thresholds - thetamaskchunk = convolve2d(thetachunk, ktheta, "same", boundary="symm") ** 2 > ttheta - phimaskchunk = convolve2d(phichunk, kphi, "same", boundary="symm") ** 2 > tphi - anglemaskchunk = thetamaskchunk | phimaskchunk - - # if aliased seabed, mask Sv above the Sv median of angle-masked regions - if anglemaskchunk.any(): - Svmedian_anglemasked = log(np.nanmedian(lin(Svchunk[anglemaskchunk]))) - if np.isnan(Svmedian_anglemasked): - Svmedian_anglemasked = np.inf - if Svmedian_anglemasked < tSv: - Svmedian_anglemasked = tSv - Svmaskchunk = Svchunk > Svmedian_anglemasked - - # label connected items in Sv mask - items = nd_img.label(Svmaskchunk, nd_img.generate_binary_structure(2, 2))[0] - - # get items intercepted by angle mask (likely, the seabed) - intercepted = list(set(items[anglemaskchunk])) - if 0 in intercepted: - intercepted.remove(intercepted == 0) - - # combine angle-intercepted items in a single mask - maskchunk = np.zeros(Svchunk.shape, dtype=bool) - for i in intercepted: - maskchunk = maskchunk | (items == i) - - # add data above r0 and below r1 (removed in first step) - above = np.zeros((r0, maskchunk.shape[1]), dtype=bool) - below = np.zeros((len(r) - r1, maskchunk.shape[1]), dtype=bool) - mask = np.r_[above, maskchunk, below] - - # give empty mask if aliased-seabed was not detected in Theta & Phi - else: + # get upper and lower range indexes + up = abs(r - r0).argmin(dim="range_sample").item() + lw = abs(r - r1).argmin(dim="range_sample").item() + + # get threshold mask with shallow and deep waters masked + mask = xr.where(Sv > thr, 1, 0).drop("channel") + mask.fillna(0) + range_filter = (mask["range_sample"] >= up) & (mask["range_sample"] <= lw) + mask = mask.where(range_filter, other=0) + + # give empty mask if there is nothing above threshold + if mask.sum() == 0: warnings.warn( - "No aliased seabed detected in Theta & Phi. " - "A default mask with all True values is returned." + "Nothing found above the threshold. " "A default mask with all True values is returned." + ) + mask = xr.DataArray( + np.ones_like(Sv, dtype=bool), + dims=("ping_time", "range_sample"), + coords={"ping_time": Sv.ping_time, "range_sample": Sv.range_sample}, ) - mask = np.zeros_like(Sv, dtype=bool) + return {"mask": mask, "ok": False, "Sv": Sv, "range": r} + return {"mask": mask, "ok": True, "Sv": Sv, "range": r} + + +def _mask_down(mask: xr.DataArray): + """ + Given a seabed mask, masks all signal under the detected seabed + + Args: + mask (xr.DataArray): seabed mask + + Returns: + xr.DataArray(mask with area under seabed masked) + """ + seabed_depth = _get_seabed_range(mask) + mask = (mask["range_sample"] <= seabed_depth).transpose() + return mask + + +# move to utils and rewrite transient noise/fielding to use this once merged +def _erase_floating_zeros(mask: xr.DataArray): + """ + Given a boolean mask, turns back to True any "floating" False values, + e.g. not attached to the max range - mask = np.logical_not(mask.T) - return_mask = xr.DataArray( - mask, + Args: + mask: xr.DataArray - mask to remove floating values from + + Returns: + xr.DataArray - mask with floating False values removed + + """ + flipped_mask = mask.isel(range_sample=slice(None, None, -1)) + flipped_mask["range_sample"] = mask["range_sample"] + ft = len(flipped_mask.range_sample) - flipped_mask.argmax(dim="range_sample") + + first_true_indices = xr.DataArray( + line_to_square(ft, mask, dim="range_sample").transpose(), dims=("ping_time", "range_sample"), - coords={"ping_time": channel_Sv.ping_time, "range_sample": channel_Sv.range_sample}, + coords={"ping_time": mask.ping_time, "range_sample": mask.range_sample}, ) - return return_mask + indices = xr.DataArray( + line_to_square(mask["range_sample"], mask, dim="ping_time"), + dims=("ping_time", "range_sample"), + coords={"ping_time": mask.ping_time, "range_sample": mask.range_sample}, + ) + spike_mask = mask.where(indices > first_true_indices, True) -def _blackwell_mod( - Sv_ds: xr.DataArray, desired_channel: str, parameters: dict = MAX_SV_DEFAULT_PARAMS -): + mask = spike_mask + return mask + + +def _experimental_correction(mask: xr.DataArray, Sv: xr.DataArray, thr: int): """ - Detects and mask seabed using the split-beam angle and Sv, based in - "Blackwell et al (2019), Aliased seabed detection in fisheries acoustic - data". Complete article here: https://arxiv.org/abs/1904.10736 + Given an existing seabed mask, the single-channel dataset it was created on + and a secondary, lower threshold, it builds the mask up until the Sv falls below the threshold - This is a modified version from the original algorithm. It includes extra - arguments to evaluate whether aliased seabed items can occur, given the - true seabed detection range, and the possibility of tuning the percentile's - rank. + Args: + mask (xr.DataArray): seabed mask + Sv (xr.DataArray): single-channel Sv data the mask was build on + thr (int): secondary threshold + + Returns: + xr.DataArray: mask with secondary threshold correction applied + + """ + secondary_mask = xr.where(Sv < thr, 1, 0).drop("channel") + secondary_mask.fillna(1) + fill_mask = secondary_mask & mask + spike_mask = _erase_floating_zeros(fill_mask) + return spike_mask + + +def _cut_spikes(mask: xr.DataArray, maximum_spike: int): + """ + In the Ariza seabed detecting method, large shoals can be falsely detected as + seabed. Their appearance on the seabed mask is large vertical "spikes". + We want to remove any such spikes from the dataset using maximum_spike + as a control parameter. + + If this option is used, we also recommend applying the _experimental_correction, + even if with the same threshold as the initial threshold, to fill up any + imprecisions in the interpolated seabed + + Args: + mask (xr.DataArray): + maximum_spike(int): maximum height, in range samples, acceptable before + we start removing that data + + Returns: + xr.DataArray: the corrected mask + """ + int_mask = (~mask.copy()).astype(int) + seabed = _get_seabed_range(int_mask) + shifted_seabed = seabed.shift(ping_time=-1, fill_value=seabed[-1]) + spike = seabed - shifted_seabed + spike_sign = xr.where(abs(spike) > maximum_spike, xr.where(spike > 0, 1, -1), 0) + spike_cs = spike_sign.cumsum(dim="ping_time") + # for i in spike: + # print(i.item()) + + # mask spikes + nan_mask = xr.where(spike_cs > 0, np.nan, xr.where(spike_sign == -1, np.nan, mask)) + + # fill in with interpolated values from non-spikes + mask_interpolated = nan_mask.interpolate_na(dim="ping_time", method="nearest").astype(bool) + + return mask_interpolated + + +def _ariza(Sv_ds: xr.DataArray, desired_channel: str, parameters: dict = ARIZA_DEFAULT_PARAMS): + """ + Mask Sv above a threshold to get potential seabed features. These features + are eroded first to get rid of fake seabeds (spikes, schools, etc.) and + dilated afterwards to fill in seabed breaches. Seabed detection is coarser + than other methods (it removes water nearby the seabed) but the seabed line + never drops when a breach occurs. Suitable for pelagic assessments and + reconmended for non-supervised processing. Args: Sv_ds (xr.DataArray): xr.DataArray with Sv data for multiple channels (dB) desired_channel(str): Name of the desired frequency channel parameters: parameter dict, should contain: - r0 (int): minimum range below which the search will be performed (m) - r1 (int): maximum range above which the search will be performed (m) - tSv (float): Sv threshold above which seabed is pre-selected (dB) - ttheta (int): Theta threshold above which seabed is pre-selected (dB) - tphi (int): Phi threshold above which seabed is pre-selected (dB) - wtheta (int): window's size for mean square operation in Theta field - wphi (int): window's size for mean square operation in Phi field - rlog (float): Maximum logging range of the echosounder (m) - tpi (float): Transmit pulse interval, or ping rate (s) - freq (int): frequecy (kHz) - rank (int): Rank for percentile operation: [0, 100] + r0 (int): minimum range below which the search will be performed (m). + r1 (int): maximum range above which the search will be performed (m). + roff (int): seabed range offset (m). + thr (int): Sv threshold above which seabed might occur (dB). + Can be a tuple, case in which a secondary experimental + thresholding correction is applied + ec (int): number of erosion cycles. + ek (int): 2-elements tuple with vertical and horizontal dimensions + of the erosion kernel. + dc (int): number of dilation cycles. + dk (int): 2-elements tuple with vertical and horizontal dimensions + of the dilation kernel. + maximum_spike(int): optional, if not None, used to determine the maximum + allowed height of the "spikes" potentially created by + dense shoals before masking them out. If used, applying + a secondary threshold correction is recommended + Returns: xr.DataArray: A DataArray containing the mask for the Sv data. Regions satisfying the thresholding criteria are True, others are False """ - parameter_names = [ - "r0", - "r1", - "tSv", - "ttheta", - "tphi", - "wtheta", - "wphi", - "rlog", - "tpi", - "freq", - "rank", - ] + parameter_names = ["r0", "r1", "roff", "thr", "ec", "ek", "dc", "dk"] if not all(name in parameters.keys() for name in parameter_names): raise ValueError( "Missing parameters - should be: " @@ -403,109 +392,41 @@ def _blackwell_mod( ) r0 = parameters["r0"] r1 = parameters["r1"] - tSv = parameters["tSv"] - ttheta = parameters["ttheta"] - tphi = parameters["tphi"] - wtheta = parameters["wtheta"] - wphi = parameters["wphi"] - rlog = parameters["rlog"] - tpi = parameters["tpi"] - freq = parameters["freq"] - rank = parameters["rank"] + thr = parameters["thr"] + ec = parameters["ec"] + ek = parameters["ek"] + dc = parameters["dc"] + dk = parameters["dk"] + secondary_thr = None + maximum_spike = None + if "maximum_spike" in parameters.keys(): + maximum_spike = parameters["maximum_spike"] - channel_Sv = Sv_ds.sel(channel=desired_channel) - Sv = channel_Sv["Sv"].values.T - r = Sv_ds["echo_range"].values[0, 0] - theta = channel_Sv["angle_alongship"].values.T - phi = channel_Sv["angle_athwartship"].values.T - # apply reverse correction on theta & phi to match Blackwell's constants - theta = theta * 22 * 128 / 180 - phi = phi * 22 * 128 / 180 - - # raise errors if wrong arguments - if r0 > r1: - raise Exception("Minimum range has to be shorter than maximum range") - - # give empty mask if searching range is outside the echosounder range - if (r0 > r[-1]) or (r1 < r[0]): - warnings.warn( - "Search range is outside the echosounder range." - "A default mask with all True values is returned." - ) - mask = np.zeros_like(Sv, dtype=bool) + if isinstance(thr, int) is False: + secondary_thr = thr[1] + thr = thr[0] - # delimit the analysis within user-defined range limits - i0 = np.nanargmin(abs(r - r0)) - i1 = np.nanargmin(abs(r - r1)) + 1 - Svchunk = Sv[i0:i1, :] - thetachunk = theta[i0:i1, :] - phichunk = phi[i0:i1, :] + # create raw range and threshold mask, if no seabed is detected return empty + raw = _create_range_mask(Sv_ds, desired_channel=desired_channel, thr=thr, r0=r0, r1=r1) + mask = raw["mask"] + if raw["ok"] is False: + return mask - # get blur kernels with theta & phi width dimensions - ktheta = np.ones((wtheta, wtheta)) / wtheta**2 - kphi = np.ones((wphi, wphi)) / wphi**2 + # run erosion and dilation denoising cycles + mask = _erode_dilate(mask, ec, ek, dc, dk) - # perform mean square convolution and mask if above theta & phi thresholds - thetamaskchunk = convolve2d(thetachunk, ktheta, "same", boundary="symm") ** 2 > ttheta - phimaskchunk = convolve2d(phichunk, kphi, "same", boundary="symm") ** 2 > tphi - anglemaskchunk = thetamaskchunk | phimaskchunk + # mask areas under the detected seabed + mask = _mask_down(mask) - # remove aliased seabed items when estimated True seabed can not be - # detected below the logging range - if (rlog is not None) and (tpi is not None) and (freq is not None): - items = label(anglemaskchunk) - item_labels = np.unique(label(anglemaskchunk))[1:] - for il in item_labels: - item = items == il - ritem = np.nanmean(r[i0:i1][np.where(item)[0]]) - rseabed = _aliased2seabed(ritem, rlog, tpi, freq) - if rseabed == []: - anglemaskchunk[item] = False - - anglemaskchunk = anglemaskchunk & (Svchunk > tSv) - - # if aliased seabed, mask Sv above the Sv median of angle-masked regions - if anglemaskchunk.any(): - Svmedian_anglemasked = log(np.nanpercentile(lin(Svchunk[anglemaskchunk]), rank)) - if np.isnan(Svmedian_anglemasked): - Svmedian_anglemasked = np.inf - if Svmedian_anglemasked < tSv: - Svmedian_anglemasked = tSv - Svmaskchunk = Svchunk > Svmedian_anglemasked - - # label connected items in Sv mask - items = nd_img.label(Svmaskchunk, nd_img.generate_binary_structure(2, 2))[0] - - # get items intercepted by angle mask (likely, the seabed) - intercepted = list(set(items[anglemaskchunk])) - if 0 in intercepted: - intercepted.remove(intercepted == 0) - - # combine angle-intercepted items in a single mask - maskchunk = np.zeros(Svchunk.shape, dtype=bool) - for i in intercepted: - maskchunk = maskchunk | (items == i) - - # add data above r0 and below r1 (removed in first step) - above = np.zeros((i0, maskchunk.shape[1]), dtype=bool) - below = np.zeros((len(r) - i1, maskchunk.shape[1]), dtype=bool) - mask = np.r_[above, maskchunk, below] - - # give empty mask if aliased-seabed was not detected in Theta & Phi - else: - warnings.warn( - "Aliased seabed not detected in Theta & Phi." - "A default mask with all True values is returned." - ) - mask = np.zeros_like(Sv, dtype=bool) + # apply spike correction + if maximum_spike is not None: + mask = _cut_spikes(mask, maximum_spike) - mask = np.logical_not(mask.T) - return_mask = xr.DataArray( - mask, - dims=("ping_time", "range_sample"), - coords={"ping_time": channel_Sv.ping_time, "range_sample": channel_Sv.range_sample}, - ) - return return_mask + # apply experimental correction, if specified + if secondary_thr is not None: + mask = _experimental_correction(mask, raw["Sv"], secondary_thr) + + return mask def _aliased2seabed( @@ -572,32 +493,29 @@ def _seabed2aliased( return aliased -def _experimental( - Sv_ds: xr.DataArray, desired_channel: str, parameters: dict = MAX_SV_DEFAULT_PARAMS -): +def _blackwell(Sv_ds: xr.DataArray, desired_channel: str, parameters: dict = MAX_SV_DEFAULT_PARAMS): """ - Mask Sv above a threshold to get a potential seabed mask. Then, the mask is - dilated to fill seabed breaches, and small objects are removed to prevent - masking high Sv features that are not seabed (e.g. fish schools or spikes). - Once this is done, the mask is built up until Sv falls below a 2nd - threshold, Finally, the mask is extended all the way down. + Detects and mask seabed using the split-beam angle and Sv, based in + "Blackwell et al (2019), Aliased seabed detection in fisheries acoustic + data". Complete article here: https://arxiv.org/abs/1904.10736 Args: Sv_ds (xr.DataArray): xr.DataArray with Sv data for multiple channels (dB) desired_channel(str): Name of the desired frequency channel parameters: parameter dict, should contain: - r0 (int): minimum range below which the search will be performed (m). - r1 (int): maximum range above which the search will be performed (m). - roff (int): seabed range offset (m). - thr (tuple): 2 integers with 1st and 2nd Sv threshold (dB). - ns (int): maximum number of samples for an object to be removed. - n_dil (int): number of dilations performed to the seabed mask. + r0 (int): minimum range below which the search will be performed (m) + r1 (int): maximum range above which the search will be performed (m) + tSv (float): Sv threshold above which seabed is pre-selected (dB) + ttheta (int): Theta threshold above which seabed is pre-selected (dB) + tphi (int): Phi threshold above which seabed is pre-selected (dB) + wtheta (int): window's size for mean square operation in Theta field + wphi (int): window's size for mean square operation in Phi field Returns: xr.DataArray: A DataArray containing the mask for the Sv data. Regions satisfying the thresholding criteria are True, others are False """ - parameter_names = ["r0", "r1", "roff", "thr", "ns", "n_dil"] + parameter_names = ["r0", "r1", "tSv", "ttheta", "tphi", "wtheta", "wphi"] if not all(name in parameters.keys() for name in parameter_names): raise ValueError( "Missing parameters - should be: " @@ -607,173 +525,118 @@ def _experimental( ) r0 = parameters["r0"] r1 = parameters["r1"] - roff = parameters["roff"] - thr = parameters["thr"] - ns = parameters["ns"] - n_dil = parameters["n_dil"] - - channel_Sv = Sv_ds.sel(channel=desired_channel) - Sv = channel_Sv["Sv"].values.T - r = Sv_ds["echo_range"].values[0, 0] - - # get indexes for range offset and range limits - roff = np.nanargmin(abs(r - roff)) - r0 = np.nanargmin(abs(r - r0)) - r1 = np.nanargmin(abs(r - r1)) + 1 - - # mask Sv above the first Sv threshold - mask = Sv[r0:r1, :] > thr[0] - maskabove = np.zeros((r0, mask.shape[1]), dtype=bool) - maskbelow = np.zeros((len(r) - r1, mask.shape[1]), dtype=bool) - mask = np.r_[maskabove, mask, maskbelow] - - # remove small to prevent other high Sv features to be masked as seabed - # (e.g fish schools, impulse noise not properly masked. etc) - mask = remove_small_objects(mask, ns) - - # dilate mask to fill seabed breaches - # (e.g. attenuated pings or gaps from previous masking) - mask = dilation(np.uint8(mask), square(n_dil)) - mask = np.array(mask, dtype="bool") - - # proceed with the following only if seabed was detected - idx = np.argmax(mask, axis=0) - for j, i in enumerate(idx): - if i != 0: - # rise up seabed until Sv falls below the 2nd threshold - while (log(np.nanmean(lin(Sv[i - 5 : i, j]))) > thr[1]) & (i >= 5): - i -= 1 - - # subtract range offset & mask all the way down - i -= roff - if i < 0: - i = 0 - mask[i:, j] = True - - # # dilate again to ensure not leaving seabed behind - # kernel = np.ones((3,3)) - # mask = cv2.dilate(np.uint8(mask), kernel, iterations = 2) - # mask = np.array(mask, dtype = 'bool') - - mask = np.logical_not(mask.T) - return_mask = xr.DataArray( - mask, - dims=("ping_time", "range_sample"), - coords={"ping_time": channel_Sv.ping_time, "range_sample": channel_Sv.range_sample}, - ) - return return_mask + tSv = parameters["tSv"] + ttheta = parameters["ttheta"] + tphi = parameters["tphi"] + wtheta = parameters["wtheta"] + wphi = parameters["wphi"] + rlog = None + tpi = None + freq = None + rank = 50 -def _ariza(Sv_ds: xr.DataArray, desired_channel: str, parameters: dict = MAX_SV_DEFAULT_PARAMS): - """ - Mask Sv above a threshold to get potential seabed features. These features - are eroded first to get rid of fake seabeds (spikes, schools, etc.) and - dilated afterwards to fill in seabed breaches. Seabed detection is coarser - than other methods (it removes water nearby the seabed) but the seabed line - never drops when a breach occurs. Suitable for pelagic assessments and - reconmended for non-supervised processing. + if "rlog" in parameters.keys(): + rlog = parameters["rlog"] + if "tpi" in parameters.keys(): + tpi = parameters["tpi"] + if "freq" in parameters.keys(): + freq = parameters["freq"] + if "rank" in parameters.keys(): + rank = parameters["rank"] - Args: - Sv_ds (xr.DataArray): xr.DataArray with Sv data for multiple channels (dB) - desired_channel(str): Name of the desired frequency channel - parameters: parameter dict, should contain: - r0 (int): minimum range below which the search will be performed (m). - r1 (int): maximum range above which the search will be performed (m). - roff (int): seabed range offset (m). - thr (int): Sv threshold above which seabed might occur (dB). - ec (int): number of erosion cycles. - ek (int): 2-elements tuple with vertical and horizontal dimensions - of the erosion kernel. - dc (int): number of dilation cycles. - dk (int): 2-elements tuple with vertical and horizontal dimensions - of the dilation kernel. + print(rlog, tpi, freq, rank) - Returns: - xr.DataArray: A DataArray containing the mask for the Sv data. - Regions satisfying the thresholding criteria are True, others are False - """ - parameter_names = ["r0", "r1", "roff", "thr", "ec", "ek", "dc", "dk"] - if not all(name in parameters.keys() for name in parameter_names): - raise ValueError( - "Missing parameters - should be: " - + str(parameter_names) - + ", are: " - + str(parameters.keys()) + channel_Sv = Sv_ds.sel(channel=desired_channel) + Sv = channel_Sv["Sv"] + r = channel_Sv["echo_range"][0] + theta = channel_Sv["angle_alongship"].copy() * 22 * 128 / 180 + phi = channel_Sv["angle_athwartship"].copy() * 22 * 128 / 180 + + dask_theta = da.asarray(theta, allow_unknown_chunksizes=False) + dask_theta.compute_chunk_sizes() + theta.values = ( + convolve( + dask_theta, + weights=da.ones(shape=(wtheta, wtheta), dtype=float) / wtheta**2, + mode="nearest", ) - r0 = parameters["r0"] - r1 = parameters["r1"] - roff = parameters["roff"] - thr = parameters["thr"] - ec = parameters["ec"] - ek = parameters["ek"] - dc = parameters["dc"] - dk = parameters["dk"] + ** 2 + ) - channel_Sv = Sv_ds.sel(channel=desired_channel) - Sv = channel_Sv["Sv"].values.T - r = Sv_ds["echo_range"].values[0, 0] + dask_phi = da.asarray(phi, allow_unknown_chunksizes=False) + dask_phi.compute_chunk_sizes() + phi.values = ( + convolve( + dask_phi, + weights=da.ones(shape=(wphi, wphi), dtype=float) / wphi**2, + mode="nearest", + ) + ** 2 + ) - # raise errors if wrong arguments - if r0 > r1: - raise Exception("Minimum range has to be shorter than maximum range") + angle_mask = ~((theta > ttheta) | (phi > tphi)).compute() - # give empty mask if searching range is outside the echosounder range - if (r0 > r[-1]) or (r1 < r[0]): + if angle_mask.all(): warnings.warn( - "Search range is outside the echosounder range. " + "No aliased seabed detected in Theta & Phi. " "A default mask with all True values is returned." ) - mask = np.zeros_like(Sv, dtype=bool) + return angle_mask + # negate for further processing + angle_mask = ~angle_mask - # get indexes for range offset and range limits - r0 = np.nanargmin(abs(r - r0)) - r1 = np.nanargmin(abs(r - r1)) - roff = np.nanargmin(abs(r - roff)) + # remove aliased seabed items when estimated True seabed can not be + # detected below the logging range + if (rlog is not None) and (tpi is not None) and (freq is not None): + items = label(angle_mask) + item_labels = np.unique(items)[1:] + for il in item_labels: + item = items == il + ritem = np.nanmean(r[np.where(item)[0]]) + rseabed = _aliased2seabed(ritem, rlog, tpi, freq) + if rseabed == []: + angle_mask[item] = False - # set to -999 shallow and deep waters (prevents seabed detection) - Sv_ = Sv.copy() - Sv_[0:r0, :] = -999 - Sv_[r1:, :] = -999 + angle_mask = angle_mask & (Sv > tSv) - # give empty mask if there is nothing above threshold - if not (Sv_ > thr).any(): - warnings.warn( - "Nothing found above the threshold. " "A default mask with all True values is returned." - ) - mask = np.zeros_like(Sv_, dtype=bool) + # calculate rank percentile Sv of angle-masked regions, and mask Sv above + Sv_masked = Sv.where(angle_mask) + # anglemasked_threshold = Sv_masked.median(skipna=True).item() + anglemasked_threshold = dask_nanpercentile(Sv_masked.values, rank) - # search for seabed otherwise - else: - # potential seabed will be everything above the threshold, the rest - # will be set as -999 - seabed = Sv_.copy() - seabed[Sv_ < thr] = -999 - - # run erosion cycles to remove fake seabeds (e.g: spikes, small shoals) - for i in range(ec): - seabed = erosion(seabed, np.ones(ek)) - - # run dilation cycles to fill seabed breaches - for i in range(dc): - seabed = dilation(seabed, np.ones(dk)) - - # mask as seabed everything greater than -999 - mask = seabed > -999 - - # if seabed occur in a ping... - idx = np.argmax(mask, axis=0) - for j, i in enumerate(idx): - if i != 0: - # ...apply range offset & mask all the way down - i -= roff - if i < 0: - i = 0 - mask[i:, j] = True - - mask = np.logical_not(mask.T) - return_mask = xr.DataArray( - mask, - dims=("ping_time", "range_sample"), - coords={"ping_time": channel_Sv.ping_time, "range_sample": channel_Sv.range_sample}, + if np.isnan(anglemasked_threshold): + anglemasked_threshold = np.inf + if anglemasked_threshold < tSv: + anglemasked_threshold = tSv + Sv_threshold_mask = Sv > anglemasked_threshold + + # create structure element that defines connections + structure = da.ones(shape=(3, 3), dtype=bool) + items = label(Sv_threshold_mask, structure)[0] + + items_data = xr.DataArray( + items, + dims=angle_mask.dims, + coords=angle_mask.coords, ) - return return_mask + + mask_items = items_data.where(angle_mask, 0) + + # get items intercepted by angle mask + keep_items = np.unique(mask_items.values) + keep_items = keep_items[keep_items > 0] + angle_items = xr.where(items_data.isin(keep_items), items_data, 0) + angle_items_mask = ~(angle_items > 0) + + mask = angle_items_mask + + # apply range filter + # get upper and lower range indexes + up = abs(r - r0).argmin(dim="range_sample").item() + lw = abs(r - r1).argmin(dim="range_sample").item() + + # get threshold mask with shallow and deep waters masked + range_filter = (mask["range_sample"] >= up) & (mask["range_sample"] <= lw) + mask = mask.where(range_filter, other=True) + return mask diff --git a/echopype/tests/conftest.py b/echopype/tests/conftest.py index 43ec8cce5..9e7cf18d2 100644 --- a/echopype/tests/conftest.py +++ b/echopype/tests/conftest.py @@ -1,4 +1,5 @@ """``pytest`` configuration.""" +from ftplib import FTP import os import subprocess @@ -10,6 +11,49 @@ from echopype.testing import TEST_DATA_FOLDER +def _setup_file(file_name): + FTP_MAIN = "ftp.bas.ac.uk" + FTP_PARTIAL_PATH = "rapidkrill/ek60/" + with FTP(FTP_MAIN) as ftp: + ftp.login() + print(TEST_DATA_FOLDER) + download_ftp_file(ftp, FTP_PARTIAL_PATH, file_name, TEST_DATA_FOLDER) + return os.path.join(TEST_DATA_FOLDER, file_name) + + +def download_ftp_file(ftp, remote_path, file_name, local_path): + # Construct the full paths + remote_file_path = os.path.join(remote_path, file_name) + local_file_path = os.path.join(local_path, file_name) + + try: + # Ensure the local directory exists + os.makedirs(local_path, exist_ok=True) + + # Check if the file already exists locally + if not os.path.exists(local_file_path): + with open(local_file_path, "wb") as local_file: + ftp.retrbinary("RETR " + remote_file_path, local_file.write) + else: + print(f"File {local_file_path} already exists. Skipping download.") + + except Exception as e: + print(f"Error downloading {remote_file_path}. Error: {e}") + + +def _get_sv_dataset(file_path, enriched: bool = False, waveform: str = "CW", encode: str = "power"): + ed = ep.open_raw(file_path, sonar_model="ek60") + Sv = ep.calibrate.compute_Sv(ed).compute() + if enriched is True: + Sv = ep.consolidate.add_splitbeam_angle(Sv, ed, waveform, encode) + return Sv + + +def _get_raw_dataset(file_path): + ed = ep.open_raw(file_path, sonar_model="ek60") + return ed + + @pytest.fixture(scope="session") def dump_output_dir(): return TEST_DATA_FOLDER / "dump" @@ -60,6 +104,7 @@ def setup_test_data_jr179(): return _setup_file(file_name) +""" def _setup_file(file_name): test_data_path = os.path.join(TEST_DATA_FOLDER, file_name) FTP_MAIN = "ftp://ftp.bas.ac.uk" @@ -71,11 +116,10 @@ def _setup_file(file_name): subprocess.run(["wget", ftp_file_path, "-O", test_data_path]) return test_data_path +""" # Separate Sv dataset fixtures for each file - - @pytest.fixture(scope="session") def sv_dataset_jr230(setup_test_data_jr230) -> xr.DataArray: return _get_sv_dataset(setup_test_data_jr230) @@ -97,20 +141,26 @@ def complete_dataset_jr179(setup_test_data_jr179): return Sv -def _get_sv_dataset(file_path, enriched: bool = False, waveform: str = "CW", encode: str = "power"): - ed = ep.open_raw(file_path, sonar_model="ek60") - Sv = ep.calibrate.compute_Sv(ed).compute() - if enriched is True: - Sv = ep.consolidate.add_splitbeam_angle(Sv, ed, waveform, encode) - return Sv +@pytest.fixture(scope="session") +def raw_dataset_jr179(setup_test_data_jr179): + ed = _get_raw_dataset(setup_test_data_jr179) + return ed -def _get_raw_dataset(file_path): - ed = ep.open_raw(file_path, sonar_model="ek60") +@pytest.fixture(scope="session") +def ed_ek_60_for_Sv(): + bucket = "ncei-wcsd-archive" + base_path = "data/raw/Bell_M._Shimada/SH1707/EK60/" + filename = "Summer2017-D20170620-T011027.raw" + rawdirpath = base_path + filename + + s3raw_fpath = f"s3://{bucket}/{rawdirpath}" + storage_opts = {"anon": True} + ed = ep.open_raw(s3raw_fpath, sonar_model="EK60", storage_options=storage_opts) # type: ignore return ed @pytest.fixture(scope="session") -def raw_dataset_jr179(setup_test_data_jr179): - ed = _get_raw_dataset(setup_test_data_jr179) - return ed +def ek60_Sv(ed_ek_60_for_Sv): + sv_echopype_EK60 = ep.calibrate.compute_Sv(ed_ek_60_for_Sv).compute() + return sv_echopype_EK60 diff --git a/echopype/tests/mask/test_mask_seabed.py b/echopype/tests/mask/test_mask_seabed.py index 6750d73bd..2f193e54a 100644 --- a/echopype/tests/mask/test_mask_seabed.py +++ b/echopype/tests/mask/test_mask_seabed.py @@ -3,7 +3,8 @@ from echopype.mask.api import get_seabed_mask from echopype.mask.seabed import ( ARIZA_DEFAULT_PARAMS, - EXPERIMENTAL_DEFAULT_PARAMS, + ARIZA_SPIKE_DEFAULT_PARAMS, + ARIZA_EXPERIMENTAL_DEFAULT_PARAMS, BLACKWELL_DEFAULT_PARAMS, BLACKWELL_MOD_DEFAULT_PARAMS, ) @@ -14,10 +15,11 @@ @pytest.mark.parametrize( "desired_channel,method,parameters,expected_true_false_counts", [ - (DESIRED_CHANNEL, "ariza", ARIZA_DEFAULT_PARAMS, (1430880, 736051)), - (DESIRED_CHANNEL, "experimental", EXPERIMENTAL_DEFAULT_PARAMS, (1514853, 652078)), - (DESIRED_CHANNEL, "blackwell", BLACKWELL_DEFAULT_PARAMS, (1746551, 420380)), - (DESIRED_CHANNEL, "blackwell_mod", BLACKWELL_MOD_DEFAULT_PARAMS, (1945202, 221729)), + (DESIRED_CHANNEL, "ariza", ARIZA_DEFAULT_PARAMS, (1531524, 635407)), + (DESIRED_CHANNEL, "ariza", ARIZA_SPIKE_DEFAULT_PARAMS, (1531514, 635417)), + (DESIRED_CHANNEL, "ariza", ARIZA_EXPERIMENTAL_DEFAULT_PARAMS, (1524807, 642124)), + (DESIRED_CHANNEL, "blackwell", BLACKWELL_DEFAULT_PARAMS, (1738163, 428768)), + (DESIRED_CHANNEL, "blackwell", BLACKWELL_MOD_DEFAULT_PARAMS, (1738163, 428768)), ], ) def test_mask_seabed( diff --git a/echopype/tests/utils/test_utils_mask_transformation_xr.py b/echopype/tests/utils/test_utils_mask_transformation_xr.py new file mode 100644 index 000000000..d103f2f33 --- /dev/null +++ b/echopype/tests/utils/test_utils_mask_transformation_xr.py @@ -0,0 +1,122 @@ +import numpy as np +import pytest +import echopype.utils.mask_transformation_xr as ep +import xarray as xr + + +def test_lin(): + # Prepare input and expected output + variable = xr.DataArray([10, 20, 30]) + expected_output = xr.DataArray([10, 100, 1000]) + + # Apply function + output = ep.lin(variable) + + # Assert output is as expected + xr.testing.assert_equal(output, expected_output) + + +def test_log(): + # Prepare input and expected output + variable = xr.DataArray([10, 100, 0, -10]) + expected_output = xr.DataArray([10, 20, -999, -999]) + + # Apply function + output = ep.log(variable) + # Assert output is as expected + truth = output == expected_output + assert truth.all() + # Test with a single positive number + assert ep.log(10) == 10 * np.log10(10) + + # Test with a single negative number (should return -999) + assert ep.log(-10) == -999 + + # Test with zero (should return -999) + assert ep.log(0) == -999 + + # Test with a list of numbers + truth = ep.log([10, 20, -10, 0]) == xr.DataArray( + [ + 10 * np.log10(10), + 10 * np.log10(20), + -999, + -999, + ] + ) + assert truth.all() + + # Test with an integer numpy array + int_array = xr.DataArray([10, 20, -10, 0]) + truth = ep.log(int_array) == xr.DataArray([10 * np.log10(10), 10 * np.log10(20), -999, -999]) + assert truth.all() + + # Test with a single number in a numpy array + assert ep.log(np.array([10])) == 10 * np.log10(10) + + # Test with a single negative number in a numpy array (should return -999) + assert ep.log(np.array([-10])) == -999 + + # Test with a single zero in a numpy array (should return -999) + assert ep.log(np.array([0])) == -999 + + +def test_downsample_exceptions(): + data = np.arange(24).reshape(4, 6) + dims = ["x", "y"] + coords = {"x": np.arange(4), "y": np.arange(6)} + dataset = xr.DataArray(data=data, dims=dims, coords=coords) + + with pytest.raises(Exception, match="Operation not in approved list"): + ep.downsample(dataset, {"x": 2}, "kind") + with pytest.raises(Exception, match="Coordinate z not in dataset coordinates"): + ep.downsample(dataset, {"z": 2}, "mean") + + +@pytest.mark.parametrize( + "coordinates,operation,is_log,shape,value", + [ + ({"range_sample": 2}, "mean", False, (3, 572, 1891), -10.82763365585262), + ({"range_sample": 2, "ping_time": 2}, "mean", False, (3, 286, 1891), -11.018715656585043), + ({"range_sample": 2}, "sum", False, (3, 572, 1891), -21.65526731170524), + ({"range_sample": 2}, "mean", True, (3, 572, 1891), -10.825779607785), + ], +) +def test_downsample(sv_dataset_jr230, coordinates, operation, is_log, shape, value): + source_Sv = sv_dataset_jr230.copy(deep=True)["Sv"] + res = ep.downsample(source_Sv, coordinates, operation, is_log) + assert res.values.shape == shape + assert res.values[-1, -1, -1] == value + + +def test_upsample(): + data = np.array([[3.5, 4.5, 5.5, 6.5, 7.5], [13.5, 14.5, 15.5, 16.5, 17.5]]) + data_2 = np.arange(25).reshape(5, 5) + data_3 = np.array( + [ + [3.5, 4.5, 5.5, 6.5, 7.5], + [3.5, 4.5, 5.5, 6.5, 7.5], + [13.5, 14.5, 15.5, 16.5, 17.5], + [13.5, 14.5, 15.5, 16.5, 17.5], + [np.nan, np.nan, np.nan, np.nan, np.nan], + ] + ) + dims = ["x", "y"] + coords_1 = {"x": [1, 4], "y": [1, 3, 5, 7, 9]} + coords_2 = {"x": [1, 2, 3, 4, 5], "y": [1, 3, 5, 7, 9]} + ds_1 = xr.DataArray(data=data, dims=dims, coords=coords_1) + ds_2 = xr.DataArray(data=data_2, dims=dims, coords=coords_2) + ds_3 = xr.DataArray(data=data_3, dims=dims, coords=coords_2) + ds_4 = ep.upsample(ds_1, ds_2) + assert ds_3.equals(ds_4) + + +def test_line_to_square(): + row = [False, False, True, False] + one = xr.DataArray(data=[row], dims=["x", "y"], coords={"x": [1], "y": [1, 2, 3, 4]}) + two = xr.DataArray( + data=[row, row, row], dims=["x", "y"], coords={"x": [1, 2, 3], "y": [1, 2, 3, 4]} + ) + res = ep.line_to_square(one, two, dim="x") + print(res) + assert res.shape == two.shape diff --git a/echopype/utils/mask_transformation_xr.py b/echopype/utils/mask_transformation_xr.py new file mode 100644 index 000000000..e37249f07 --- /dev/null +++ b/echopype/utils/mask_transformation_xr.py @@ -0,0 +1,145 @@ +import dask.array as da +import numpy as np +import xarray as xr + + +def lin(db: xr.DataArray) -> xr.DataArray: + """Convert decibel to linear scale, handling NaN values.""" + linear = xr.where(db.isnull(), np.nan, 10 ** (db / 10)) + return linear + + +def log(linear: xr.DataArray) -> xr.DataArray: + """ + Turn variable into the logarithmic domain. This function will return -999 + in the case of values less or equal to zero (undefined logarithm). -999 is + the convention for empty water or vacant sample in fisheries acoustics. + + Args: + variable (float): array of elements to be transformed. + + Returns: + float: array of elements transformed + """ + back_list = False + back_single = False + if not isinstance(linear, xr.DataArray): + if isinstance(linear, list): + linear = xr.DataArray(linear) + back_list = True + else: + linear = xr.DataArray([linear]) + back_single = True + + db = xr.apply_ufunc(lambda x: 10 * np.log10(x), linear) + db = xr.where(db.isnull(), -999, db) + db = xr.where(linear == 0, -999, db) + if back_list: + db = db.values + if back_single: + db = db.values[0] + return db + + +def downsample(dataset, coordinates: {str: int}, operation: str = "mean", is_log: bool = False): + """ + Given a dataset, downsamples it on the specified coordinates + + Args: + dataset (xr.DataArray) : the dataset to resample + coordinates({str: int} : a mapping of dimensions to the windows to use + operation (str) : the downsample operation to use + is_log (bool) : True if the data is logarithmic and should be + converted to linear + + Returns: + xr.DataArray : the resampled dataset + """ + operation_list = ["mean", "sum"] + if operation not in operation_list: + raise Exception("Operation not in approved list") + for k in coordinates.keys(): + if k not in dataset.dims: + raise Exception("Coordinate " + k + " not in dataset coordinates") + if is_log: + dataset = lin(dataset) + if operation == "mean": + dataset = dataset.coarsen(coordinates, boundary="pad").mean() + elif operation == "sum": + dataset = dataset.coarsen(coordinates, boundary="pad").sum() + else: + raise Exception("Operation not in approved list") + # print(dataset) + if is_log: + dataset = log(dataset) + # mask = dataset.isnull() + return dataset + + +def upsample(dataset: xr.DataArray, dataset_size: xr.DataArray): + """ + Given a data dataset and an example dataset, upsamples the data dataset + to the example dataset's dimensions by repeating values + + Args: + dataset (xr.DataArray) : data + dataset_size (xr.DataArray) : dataset of the right size + + Returns + xr.DataArray: the input dataset, with the same coords as dataset_size + and the values repeated to fill it up. + """ + + interpolated = dataset.interp_like(dataset_size, method="nearest") + return interpolated + + +def line_to_square(one: xr.DataArray, two: xr.DataArray, dim: str): + """ + Given a single dimension dataset and an example dataset with 2 dimensions, + returns a two-dimensional dataset that is the single dimension dataset + repeated as often as needed + + Args: + one (xr.DataArray): data + two (xr.DataArray): shape dataset + dim (str): name of dimension to concat against + + Returns: + xr.DataArray: the input dataset, with the same coords as dataset_size and + the values repeated to fill it up + """ + length = len(two[dim]) + array_list = [one for _ in range(0, length)] + array = xr.concat(array_list, dim=dim) + # return_data = xr.DataArray(data=array.values, dims=two.dims, coords=two.coords) + return array.values + + +def dask_nanmedian(array, axis=None): + """ + Nanmedian wrapper that insures the input is a Dask array + """ + if not isinstance(array, da.Array): + raise TypeError("Expected a Dask array, got {}.".format(type(array))) + return da.nanmedian(array, axis=axis) + + +def dask_nanmean(array, axis=None): + """ + Nanmean wrapper that insures the input is a Dask array + """ + if not isinstance(array, da.Array): + raise TypeError("Expected a Dask array, got {}.".format(type(array))) + return da.nanmean(array, axis=axis) + + +def dask_nanpercentile(array, percentile, axis=None): + """ + Applies nanpercentile on a Dask array + """ + if not isinstance(array, da.Array): + if not isinstance(array, np.ndarray): + raise TypeError("Expected a Dask or Numpy array, got {}.".format(type(array))) + return np.nanpercentile(array, percentile, axis=axis) + return da.percentile(array, percentile, axis=axis, skipna=True) diff --git a/requirements.txt b/requirements.txt index d1e83e5f0..d44848c7f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -18,4 +18,5 @@ psutil>=5.9.1 more-itertools==8.13.0 geopy flox>=0.7.2,<1.0.0 -charset-normalizer<3.2 +# charset-normalizer<3.2 +dask-image