From f9a9f7a591d64977fd298cab32225da22e81bd56 Mon Sep 17 00:00:00 2001 From: wattai Date: Thu, 14 Nov 2024 02:01:14 +0900 Subject: [PATCH 1/8] Remove unnecessary comments --- src/sse/simulators/environments.py | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/src/sse/simulators/environments.py b/src/sse/simulators/environments.py index fd81231..8474192 100644 --- a/src/sse/simulators/environments.py +++ b/src/sse/simulators/environments.py @@ -216,19 +216,8 @@ def euclidean_distance(p1, p2): def delay(signal: Signal, distance: float, sound_speed: float) -> Signal: - # num_delay_points を整数にキャスト num_delay_points = int(round(signal.sampling_frequency * (distance / sound_speed))) - - # スライシングに整数を使用 return Signal( values=np.pad(signal.values[num_delay_points:], (0, num_delay_points)), sampling_frequency=signal.sampling_frequency, ) - - -# def delay(signal: Signal, distance: float, sound_speed: float) -> Signal: -# num_delay_points = signal.sampling_frequency * (distance / sound_speed) -# return Signal( -# values=np.pad(signal.values[num_delay_points:], ((0, num_delay_points))), -# sampling_frequency=signal.sampling_frequency, -# ) From 810e5e338eba646cfc88f45e1a916402a42bb932 Mon Sep 17 00:00:00 2001 From: wattai Date: Thu, 14 Nov 2024 02:03:59 +0900 Subject: [PATCH 2/8] Add a CI badge --- README.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/README.md b/README.md index 7734141..31ec41e 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,5 @@ +[![CI](https://github.com/wattai/sound-source-position-estimation/actions/workflows/ci.yml/badge.svg)](https://github.com/wattai/sound-source-position-estimation/actions/workflows/ci.yml) + # WIP; Sound Source Estimator [![CI](https://github.com/wattai/sound-source-position-estimation/actions/workflows/ci.yml/badge.svg)](https://github.com/wattai/sound-source-position-estimation/actions/workflows/ci.yml) From 2065d46c497755f299627028c6179b33e5944184 Mon Sep 17 00:00:00 2001 From: wattai Date: Wed, 20 Nov 2024 23:21:03 +0900 Subject: [PATCH 3/8] Resolve conflict --- README.md | 2 -- 1 file changed, 2 deletions(-) diff --git a/README.md b/README.md index 31ec41e..7734141 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,3 @@ -[![CI](https://github.com/wattai/sound-source-position-estimation/actions/workflows/ci.yml/badge.svg)](https://github.com/wattai/sound-source-position-estimation/actions/workflows/ci.yml) - # WIP; Sound Source Estimator [![CI](https://github.com/wattai/sound-source-position-estimation/actions/workflows/ci.yml/badge.svg)](https://github.com/wattai/sound-source-position-estimation/actions/workflows/ci.yml) From 928d7174f7095f969762020ba56dd5354bd27bc2 Mon Sep 17 00:00:00 2001 From: wattai Date: Sun, 17 Nov 2024 02:38:48 +0900 Subject: [PATCH 4/8] Add tests for CSP method --- src/sse/music_v2.py | 8 +-- tests/test_music.py | 171 ++++++++++++++++++++++++++++++++------------ 2 files changed, 129 insertions(+), 50 deletions(-) diff --git a/src/sse/music_v2.py b/src/sse/music_v2.py index 63f9500..f39fdca 100644 --- a/src/sse/music_v2.py +++ b/src/sse/music_v2.py @@ -93,13 +93,13 @@ def _calc_s_music(self, thetas: list[float]): outs = [] for theta in thetas: a = self._calc_alpha2(theta, freqs=freqs).reshape(-1, 1, self.N_ch) - print("a.shape", a.shape) - print("self.minvec.shape", self.minvec.shape) + # print("a.shape", a.shape) + # print("self.minvec.shape", self.minvec.shape) # print("np.linalg.vector_norm(a, axis=1, ord=2).shape", np.linalg.vector_norm(a, axis=1, ord=2).shape) upper = np.abs((a.conj() @ self.minvec))[:, 0, :] ** 2 lower = np.linalg.vector_norm(a, axis=2, ord=2) - print("upper.shape", upper.shape) - print("lower.shape", lower.shape) + # print("upper.shape", upper.shape) + # print("lower.shape", lower.shape) outs.append(1 / np.sum(upper / lower, axis=1)) return np.array(outs) diff --git a/tests/test_music.py b/tests/test_music.py index 1bbb095..735b81a 100644 --- a/tests/test_music.py +++ b/tests/test_music.py @@ -1,67 +1,146 @@ # -*- coding: utf-8 -*- +import pytest + import numpy as np from sse.music_v2 import MusicSoundSourceLocator +from sse.simulators.environments import ( + Observer, + Air, + Microphone, + Source, + Position3D, + SineSignalGenerator, +) + +SAMPLING_FREQUENCY = 16000 # [Hz] +SOUND_SPEED = Air().sound_speed # [m/s] +GAP_WIDTH_BETWEEN_MICS = 5.0 # [m] +SIGNAL_TIME_LENGTH = 5.0 # [sec.] def make_dummy_signals( - theta=15.0, - fs=16000, - c=340, - d=1.0, + theta: float, + fs: float, + d: float, + time_length: float, + medium=Air(), ) -> np.ndarray: - """_summary_ + """Return 2ch dummy signals. Args: - theta (float, optional): _description_. Defaults to 15.0. - fs (int, optional): _description_. Defaults to 16000. - N_fft (int, optional): _description_. Defaults to 128. - c: Sound speed [m/sec]. Defaults to 340. - d: Distance between mics [m]. Defaults to 1.0. + theta: Which direction the signal comes from [rad]. + fs: Sampling frequency [Hz]. + d: Distance between mics [m]. + medium: Medium which sounds pass through. Returns: - np.ndarray: _description_ + Sound waves shaped as: [num_samples, num_channels]. """ - tdoa = d * np.sin(np.deg2rad(theta)) / c - # tdoa = d * np.cos(np.deg2rad(theta)) / c - print("tdoa", tdoa) - - T = 0.2 # [sec] - # width = N_fft // 2 - num_points_of_tdoa_width = int(tdoa * fs) # point of TDOA. - # t = np.linspace(0, N_fft + 2 * width - 1, N_fft + 2 * width) / fs - t = np.linspace(0, int(fs * T - 1), int(fs * T)) / fs # base time. - # t1 = t[width + num_points_of_tdoa_width : width + N_fft + num_points_of_tdoa_width] - # t2 = t[width : width + N_fft] - t1 = t[num_points_of_tdoa_width:] - t2 = t[:-num_points_of_tdoa_width] - print("t1.shape", t1.shape) - print("t2.shape", t2.shape) - x1 = np.sin(2 * np.pi * 5000 * t1)[:, None] - x2 = np.sin(2 * np.pi * 5000 * t2)[:, None] - x = np.c_[x1, x2] - # x = np.c_[x1 + np.random.randn(*x1.shape) * 0.05, x2 + np.random.randn(*x2.shape) * 0.05] - - # xs = np.random.randn(len(t))[:, None] - # x1 = xs[num_points_of_tdoa_width:] - # t2 = xs[:-num_points_of_tdoa_width] - # x = np.c_[x1, x2] - return x + + obs = Observer( + sources=[ + Source( + position=Position3D(r=100, theta=theta, phi=0), + signal=SineSignalGenerator(frequency=3000.2).generate( + sampling_frequency=fs, + time_length=time_length, + ), + ) + ], + microphones=[ + Microphone( + position=Position3D(r=d / 2, theta=0, phi=0), + sampling_frequency=fs, + ), + Microphone( + position=Position3D(r=d / 2, theta=np.pi, phi=0), + sampling_frequency=fs, + ), + ], + medium=medium, + ) + outs = obs.ring_sources() + return np.c_[outs[0].values, outs[1].values] class TestMusicSoundSourceLocator: - def test_fit_transform(self): + @pytest.mark.parametrize("theta", [60]) + def test_fit_transform(self, theta: float): + x = make_dummy_signals( + theta=theta / 180 * np.pi, + fs=SAMPLING_FREQUENCY, + d=GAP_WIDTH_BETWEEN_MICS, + time_length=10.0, + ) self.locator = MusicSoundSourceLocator( - fs=16000, - d=0.1, + fs=SAMPLING_FREQUENCY, + d=GAP_WIDTH_BETWEEN_MICS, N_theta=180 + 1, ) - X = make_dummy_signals( - theta=40.0, - fs=16000, - d=0.1, + predicted_theta = self.locator.fit_transform(X=x) + print("predicted_theta (MUSIC): ", predicted_theta) + + +class TestCSPSoundSourceLocator: + @pytest.mark.parametrize("theta", [30, 60, 90, 120, 150]) + def test_accuracy( + self, + theta: float, + acceptable_error_in_deg: float = 5.0, + ): + x = make_dummy_signals( + theta=theta / 180 * np.pi, + fs=SAMPLING_FREQUENCY, + d=GAP_WIDTH_BETWEEN_MICS, + time_length=SIGNAL_TIME_LENGTH, + ) + predicted_theta = estimate_theta_by_csp( + x1=x[:, 0], + x2=x[:, 1], + fs=SAMPLING_FREQUENCY, + c=SOUND_SPEED, + d=GAP_WIDTH_BETWEEN_MICS, ) - predicted_theta = self.locator.fit_transform(X=X) - print("predicted_theta", predicted_theta) - # np.testing.assert_allclose(predicted_theta, 40.726257) + print("predicted_theta (CSP): ", predicted_theta) + assert (predicted_theta - theta) ** 2 < acceptable_error_in_deg + + +def estimate_theta_by_csp( + x1: np.ndarray, + x2: np.ndarray, + fs: float = 16000, + c: float = 343.3, + d: float = 0.1, +) -> float: + return tdoa2deg(calc_tdoa(x1, x2) / fs, c=c, d=d) + + +def calc_tdoa(x1: np.ndarray, x2: np.ndarray) -> float: + assert len(x1) == len(x2) + estimated_delay = calc_csp_coefs(x1=x1, x2=x2).argmax() - len(x1) + return estimated_delay + + +def calc_csp_coefs(x1: np.ndarray, x2: np.ndarray) -> np.ndarray: + phi = np.correlate(x2, x1, mode="full") + PHI = np.fft.fft(phi) + csp = np.fft.fft(PHI / np.abs(PHI)).real + return csp + + +def tdoa2deg( + tdoa: float, + c: float = 343.3, + d: float = 0.1, +) -> float: + return np.rad2deg(np.arccos(np.clip(tdoa * c / d, -1, 1))) + + +def deg2tdoa( + deg: float, + c: float = 343.3, + d: float = 0.1, +) -> float: + return d * np.cos(np.deg2rad(deg)) / c From 62f4b1b613b4a04e6b02821397e48891d97cf982 Mon Sep 17 00:00:00 2001 From: wattai Date: Sun, 17 Nov 2024 23:09:42 +0900 Subject: [PATCH 5/8] Fix docstring --- tests/test_music.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/test_music.py b/tests/test_music.py index 735b81a..8660b8b 100644 --- a/tests/test_music.py +++ b/tests/test_music.py @@ -33,6 +33,7 @@ def make_dummy_signals( theta: Which direction the signal comes from [rad]. fs: Sampling frequency [Hz]. d: Distance between mics [m]. + time_length: Time length of signals [sec.]. medium: Medium which sounds pass through. Returns: From 1932a5e155f5a399ab62c6d570d04cc804e27827 Mon Sep 17 00:00:00 2001 From: wattai Date: Thu, 21 Nov 2024 00:09:12 +0900 Subject: [PATCH 6/8] Tiny badge swaps --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 7734141..3c701e3 100644 --- a/README.md +++ b/README.md @@ -1,8 +1,8 @@ # WIP; Sound Source Estimator [![CI](https://github.com/wattai/sound-source-position-estimation/actions/workflows/ci.yml/badge.svg)](https://github.com/wattai/sound-source-position-estimation/actions/workflows/ci.yml) -[![Ruff](https://img.shields.io/endpoint?url=https://raw.githubusercontent.com/astral-sh/ruff/main/assets/badge/v2.json)](https://github.com/astral-sh/ruff) [![codecov](https://codecov.io/gh/wattai/sound-source-position-estimation/branch/main/graph/badge.svg?token=NU4916R3R8)](https://codecov.io/gh/wattai/sound-source-position-estimation) +[![Ruff](https://img.shields.io/endpoint?url=https://raw.githubusercontent.com/astral-sh/ruff/main/assets/badge/v2.json)](https://github.com/astral-sh/ruff) Brief description of the project and what it aims to accomplish. From 6eed85c50d35a2965b39308329337761d2f72767 Mon Sep 17 00:00:00 2001 From: wattai Date: Fri, 22 Nov 2024 23:50:50 +0900 Subject: [PATCH 7/8] Update README.md --- README.md | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/README.md b/README.md index 3c701e3..04889ef 100644 --- a/README.md +++ b/README.md @@ -24,3 +24,9 @@ pip install -e ".[dev]" ```shell pre-commit install ``` + +Run tests + +```shell +pytest tests +``` From 6f4f8e937eee08b9ebe4c5a99d78c67670b11a57 Mon Sep 17 00:00:00 2001 From: wattai Date: Sat, 23 Nov 2024 16:14:41 +0900 Subject: [PATCH 8/8] [wip] Add CPS implementation --- src/sse/csp.py | 70 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 70 insertions(+) create mode 100644 src/sse/csp.py diff --git a/src/sse/csp.py b/src/sse/csp.py new file mode 100644 index 0000000..ae324d7 --- /dev/null +++ b/src/sse/csp.py @@ -0,0 +1,70 @@ +# -*- coding: utf-8 -*- + +import numpy as np + +from sse.base import SoundSourceLocatorBase + + +def calc_csp_coeffs(x): + phi = np.correlate(x[:, 0], x[:, 1], mode="full") + PHI = np.fft.fft(phi) + csp = np.fft.fft(PHI / np.abs(PHI)).real + return csp + + +def calc_tdoa(x): + estimated_delay = calc_csp_coeffs(x).argmax() - (len(x[:, 0])) + return estimated_delay + + +def tdoa2deg(tdoa, c=340, d=0.1): + return np.rad2deg(np.arccos(np.clip(tdoa * c / d, -1, 1))) + + +def deg2tdoa(deg, c=340, d=0.1): + return d * np.cos(np.deg2rad(deg)) / c + + +class CSPSoundSourceLocator(SoundSourceLocatorBase): + def __init__(self): + pass + + def fit_transform(self, X: np.ndarray, fs, c, d) -> float: + # X: input sound signal + # X.shape = (sample, N_ch) + theta_hat = tdoa2deg(calc_tdoa(X) / fs, c=c, d=d) + return theta_hat + + def calc_likehood_map(self): + pass + # csp = CSP(x) + # N_csp = len(csp) + # csp /= N_csp + # t_delay = np.linspace(-N_csp // 2, N_csp // 2, N_csp) / fs + # theta_delay = tdoa2deg(t_delay, c=c, d=d) + + # plt.figure() + # plt.subplot(211) + # plt.title("Based on CSP.") + # plt.plot(t1, x1, linestyle="-", label="1ch") + # plt.plot(t1, x2, linestyle="-.", label="2ch") + # plt.legend(loc="upper right") + # plt.xlabel("time [sec]") + # plt.ylabel("amp. [a.u.]") + # plt.xlim(t1.min(), t1.max()) + # plt.grid(linestyle="--") + + # plt.subplot(212) + # plt.plot( + # (90 - theta_delay), + # 10 * np.log10((csp**2) / (csp**2).max()), + # marker="D", + # markersize=5, + # ) + # plt.xlabel("theta delay [deg]") + # plt.ylabel("CSP log power [dB]") + # plt.xlim(90 - theta_delay.max(), 90 - theta_delay.min()) + # plt.grid(linestyle="--") + # plt.tight_layout() + # plt.show() + # print("theta: %.3f, theta_hat: %.3f" % (90 - theta, 90 - theta_hat))