Skip to content

Commit

Permalink
Enhance/next dev transient signal (#142)
Browse files Browse the repository at this point in the history
* Memory optimizations for signal & transient noise

* Memory optimizations for signal & transient noise
  • Loading branch information
ruxandra-valcu authored Dec 15, 2023
1 parent 2dd54b1 commit 0788ef5
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 18 deletions.
36 changes: 25 additions & 11 deletions echopype/clean/signal_attenuation.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,24 @@
import numpy as np
import xarray as xr

from ..utils.mask_transformation_xr import lin as _lin, line_to_square, log as _log

DEFAULT_RYAN_PARAMS = {"r0": 180, "r1": 280, "n": 30, "thr": -6, "start": 0}
from ..utils.mask_transformation_xr import (
lin as _lin,
line_to_square,
log as _log,
rolling_median_block,
)

# import dask.array as da


DEFAULT_RYAN_PARAMS = {
"r0": 180,
"r1": 280,
"n": 30,
"thr": -6,
"start": 0,
"dask_chunking": {"ping_time": 100, "range_sample": 100},
}
DEFAULT_ARIZA_PARAMS = {"offset": 20, "thr": (-40, -35), "m": 20, "n": 50}


Expand Down Expand Up @@ -45,15 +60,17 @@ def _ryan(source_Sv: xr.DataArray, desired_channel: str, parameters=DEFAULT_RYAN
Returns:
xr.DataArray: boolean array with AS mask, with ping_time and range_sample dims
"""
parameter_names = ("r0", "r1", "n", "thr", "start")
parameter_names = ("r0", "r1", "n", "thr", "start", "dask_chunking")
if not all(name in parameters.keys() for name in parameter_names):
raise ValueError(
"Missing parameters - should be r0, r1, n, thr, start, are" + str(parameters.keys())
"Missing parameters - should be r0, r1, n, thr, start, dask_chunking are"
+ str(parameters.keys())
)
r0 = parameters["r0"]
r1 = parameters["r1"]
n = parameters["n"]
thr = parameters["thr"]
dask_chunking = parameters["dask_chunking"]
# start = parameters["start"]

channel_Sv = source_Sv.sel(channel=desired_channel)
Expand Down Expand Up @@ -85,13 +102,10 @@ def _ryan(source_Sv: xr.DataArray, desired_channel: str, parameters=DEFAULT_RYAN
layer_mask = (Sv["range_sample"] >= up) & (Sv["range_sample"] <= lw)
layer_Sv = Sv.where(layer_mask)

# Creating shifted arrays for block comparison
shifted_arrays = [layer_Sv.shift(ping_time=i) for i in range(-n, n + 1)]
block = xr.concat(shifted_arrays, dim="shifted_ping_time")
layer_Sv_chunked = layer_Sv.chunk(dask_chunking)

# Computing the median of the block and the pings
ping_median = layer_Sv.median(dim="range_sample", skipna=True)
block_median = block.median(dim=["range_sample", "shifted_ping_time"], skipna=True)
block_median = rolling_median_block(layer_Sv_chunked.data, window_half_size=n, axis=0)
ping_median = layer_Sv_chunked.median(dim="range_sample", skipna=True)

# Creating the mask based on the threshold
mask_condition = (ping_median - block_median) > thr
Expand Down
6 changes: 2 additions & 4 deletions echopype/clean/transient_noise.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
lin as _lin,
line_to_square,
log as _log,
rolling_median_block,
)

RYAN_DEFAULT_PARAMS = {
Expand Down Expand Up @@ -231,10 +232,7 @@ def _fielding(

ping_median = Sv_range.median(dim="range_sample", skipna=True)
ping_75q = Sv_range.reduce(np.nanpercentile, q=75, dim="range_sample")

shifted_arrays = [Sv_range.shift(ping_time=i) for i in range(-n, n + 1)]
block = xr.concat(shifted_arrays, dim="shifted_ping_time")
block_median = block.median(dim=["range_sample", "shifted_ping_time"], skipna=True)
block_median = rolling_median_block(Sv_range.data, window_half_size=n, axis=0)

# identify columns in which noise can be found
noise_col = (ping_75q < maxts) & ((ping_median - block_median) < thr[0])
Expand Down
3 changes: 1 addition & 2 deletions echopype/tests/clean/test_signal_attenuation.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@
import numpy as np
import echopype.clean


DEFAULT_RYAN_PARAMS = {"r0": 180, "r1": 280, "n": 30, "thr": -6, "start": 0}
from echopype.clean.signal_attenuation import DEFAULT_RYAN_PARAMS

# commented ariza out since the current interpretation relies on a
# preexisting seabed mask, which is not available in this PR
Expand Down
23 changes: 23 additions & 0 deletions echopype/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,3 +163,26 @@ def ed_ek_60_for_Sv():
def ek60_Sv(ed_ek_60_for_Sv):
sv_echopype_EK60 = ep.calibrate.compute_Sv(ed_ek_60_for_Sv).compute()
return sv_echopype_EK60


@pytest.fixture(scope="session")
def sv_ek80():
base_url = "noaa-wcsd-pds.s3.amazonaws.com/"
path = "data/raw/Sally_Ride/SR1611/EK80/"
file_name = "D20161109-T163350.raw"

local_path = os.path.join(TEST_DATA_FOLDER, file_name)
if os.path.isfile(local_path):
ed = ep.open_raw(
local_path,
sonar_model="EK80",
)
else:
raw_file_address = base_url + path + file_name
rf = raw_file_address # Path(raw_file_address)
ed = ep.open_raw(
f"https://{rf}",
sonar_model="EK80",
)
Sv = ep.calibrate.compute_Sv(ed, waveform_mode="CW", encode_mode="complex").compute()
return Sv
22 changes: 21 additions & 1 deletion echopype/utils/mask_transformation_xr.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,6 @@ def dask_nanmean(array, axis=None):
return da.nanmean(array, axis=axis)



def dask_nanpercentile(array, percentile, axis=None):
"""
Applies nanpercentile on a Dask array
Expand All @@ -145,3 +144,24 @@ def dask_nanpercentile(array, percentile, axis=None):
return np.nanpercentile(array, percentile, axis=axis)
return da.percentile(array, percentile, axis=axis, skipna=True)


def block_nanmedian(block, i, n, axis):
"""
Since dask nanmedian doesn't work except when applied on a specific axis,
this is a kludge to enable us to calculate block medians without assigning
arrays with an extra dimension
"""
start = max(0, i - n)
end = min(block.shape[axis], i + n + 1)
indices = da.arange(start, end, dtype=int)
use_block = da.take(block, indices, axis)
res = np.nanmedian(use_block.compute())
return res


def rolling_median_block(block, window_half_size, axis):
"""
Applies a median block calculation as a rolling function across an axis
"""
res = [block_nanmedian(block, i, window_half_size, axis) for i in range(block.shape[axis])]
return np.array(res)

0 comments on commit 0788ef5

Please sign in to comment.