From 8ba00383a76afcb9efc74f9b6515f7aaf434dcf0 Mon Sep 17 00:00:00 2001 From: aceglia Date: Tue, 30 May 2023 12:34:57 -0400 Subject: [PATCH 1/3] fix cadence calculation --- biosiglive/processing/data_processing.py | 4 +-- examples/cadence_from_treadmill.py | 38 +++++++++++++++++++----- 2 files changed, 33 insertions(+), 9 deletions(-) diff --git a/biosiglive/processing/data_processing.py b/biosiglive/processing/data_processing.py index ce7d377..c4050fa 100644 --- a/biosiglive/processing/data_processing.py +++ b/biosiglive/processing/data_processing.py @@ -568,9 +568,9 @@ def get_peaks( self.processed_data_buffer = RealTimeProcessing._check_and_adjust_interval( self.processed_data_buffer, min_peaks_interval ) - if isinstance(nb_peaks, list): - nb_peaks.append(np.count_nonzero(self.processed_data_buffer)) + for i in range(self.processed_data_buffer.shape[0]): + nb_peaks.append(np.count_nonzero(self.processed_data_buffer[i, :])) self.process_time.append(time.time() - tic) return nb_peaks, self.processed_data_buffer diff --git a/examples/cadence_from_treadmill.py b/examples/cadence_from_treadmill.py index aedb722..ef34bfa 100644 --- a/examples/cadence_from_treadmill.py +++ b/examples/cadence_from_treadmill.py @@ -7,17 +7,31 @@ from biosiglive.interfaces.vicon_interface import ViconClient from time import time, sleep from custom_interface import MyInterface -from biosiglive import RealTimeProcessingMethod, InterfaceType, DeviceType +import numpy as np +from biosiglive import RealTimeProcessingMethod, InterfaceType, DeviceType, LivePlot, PlotType if __name__ == "__main__": interface = None + plot_curve = LivePlot( + name="curve", + plot_type=PlotType.Curve, + nb_subplots=4, + channel_names=["1", "2", "3", "4"], + ) + # plot_curve = LivePlot( + # name="strike", + # plot_type=PlotType.Curve, + # nb_subplots=2, + # channel_names=["1", "2"], + # ) + plot_curve.init(plot_windows=10000, y_labels=["Strikes", "Strikes", "Force (N)", "Force (N)"]) interface_type = InterfaceType.Custom if interface_type == InterfaceType.Custom: interface = MyInterface(system_rate=100, data_path="walk.bio") elif interface_type == InterfaceType.ViconClient: interface = ViconClient(system_rate=100) - nb_second = 30 + nb_second = 10 interface.add_device( 9, name="Treadmill", @@ -27,20 +41,30 @@ data_buffer_size=1000 * nb_second, processing_window=1000 * nb_second, device_data_file_key="treadmill", - threshold=0.6, - min_peaks_interval=1300, + threshold=0.2, + min_peaks_interval=800, ) print_every = 10 # seconds count = 0 tic_bis = time() while True: + tic = time() data = interface.get_device_data(device_name="Treadmill") force_z_tmp = data[[2, 8], :] - cadence, force_z_process = interface.get_device("Treadmill").process() + peaks, force_z_process = interface.get_device("Treadmill").process() + if peaks: + cadence = peaks[2] + peaks[8] + else: + cadence = 0 if count == print_every * interface.devices[-1].system_rate: + print(f"Loop time: {time() - tic_bis}") print(f"Mean cadence for the last {nb_second} s is :{cadence}") tic_bis = time() count = 0 count += 1 - if interface_type == InterfaceType.Custom: - sleep(1 / 100) + plot_curve.update(np.append(force_z_process[[2, 8], -10:], force_z_tmp, axis=0)) + loop_time = time() - tic + + # if interface_type == InterfaceType.Custom: + # if (1/100) - loop_time > 0: + # sleep((1/100) - loop_time - 0.002) From d0c7f91f7b8f34def21097a6eea61fe13f38ea30 Mon Sep 17 00:00:00 2001 From: aceglia Date: Wed, 5 Jul 2023 10:20:23 -0400 Subject: [PATCH 2/3] Fix streamdata for windows --- biosiglive/streaming/stream_data.py | 27 +++++++++++++++------------ 1 file changed, 15 insertions(+), 12 deletions(-) diff --git a/biosiglive/streaming/stream_data.py b/biosiglive/streaming/stream_data.py index ebd4e41..a53100b 100644 --- a/biosiglive/streaming/stream_data.py +++ b/biosiglive/streaming/stream_data.py @@ -40,16 +40,14 @@ def __init__(self, stream_rate: int = 100): # Multiprocessing stuff manager = mp.Manager() - self.queue = manager.Queue - self.event = manager.Event self.device_queue_in = [] self.device_queue_out = [] self.kin_queue_in = [] self.kin_queue_out = [] self.plots_queue = [] self.device_event = [] - self.is_device_data = self.event() - self.is_kin_data = self.event() + self.is_device_data = [] + self.is_kin_data = [] self.interface_event = [] self.kin_event = [] self.custom_processes = [] @@ -87,9 +85,9 @@ def _add_device(self, device: Device): Device to add. """ self.devices.append(device) - self.device_queue_in.append(self.queue()) - self.device_queue_out.append(self.queue()) - self.device_event.append(self.event()) + self.device_queue_in.append(mp.Manager().Queue()) + self.device_queue_out.append(mp.Manager().Queue()) + self.device_event.append(mp.Manager().Event()) def add_interface(self, interface: GenericInterface()): """ @@ -104,11 +102,14 @@ def add_interface(self, interface: GenericInterface()): raise Exception("Cannot add interface after the stream has started.") self.interfaces.append(interface) self.interfaces_type.append(interface.interface_type) - self.interface_event.append(self.event()) + self.interface_event.append(mp.Manager().Event()) + self.is_kin_data = mp.Manager().Event() + self.is_device_data = mp.Manager().Event() for device in interface.devices: self._add_device(device) for marker in interface.marker_sets: self._add_marker_set(marker) + if len(self.interfaces) > 1: raise ValueError("Only one interface can be added for now.") @@ -144,7 +145,7 @@ def add_server( self.ports = [self.ports] for p in range(len(self.ports)): - self.server_queue.append(self.queue()) + self.server_queue.append(mp.Manager().Queue()) self.client_type = client_type if not device_buffer_size: @@ -166,6 +167,8 @@ def add_server( self.marker_set_buffer_size = marker_set_buffer_size elif isinstance(marker_set_buffer_size, int): self.marker_set_buffer_size = [marker_set_buffer_size] * len(self.marker_sets) + if len(self.ports) > 1: + raise ValueError("Only one server can be added for now.") def start(self, save_streamed_data: bool = False, save_path: str = None, save_frequency: int = None): """ @@ -195,9 +198,9 @@ def _add_marker_set(self, marker: MarkerSet): Marker set to add from given interface. """ self.marker_sets.append(marker) - self.kin_queue_in.append(self.queue()) - self.kin_queue_out.append(self.queue()) - self.kin_event.append(self.event()) + self.kin_queue_in.append(mp.Manager().Queue()) + self.kin_queue_out.append(mp.Manager().Queue()) + self.kin_event.append(mp.Manager().Event()) # TODO : add buffer directly in the server def device_processing(self, device_idx: int): From 1f09785605ed5e4eaa78bd203ec118c3b2794732 Mon Sep 17 00:00:00 2001 From: aceglia Date: Wed, 5 Jul 2023 10:37:57 -0400 Subject: [PATCH 3/3] Update tests for the last comit --- tests/test_data_processing.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_data_processing.py b/tests/test_data_processing.py index cf02e58..d491f7f 100644 --- a/tests/test_data_processing.py +++ b/tests/test_data_processing.py @@ -62,7 +62,7 @@ def test_real_time_processing(method): np.testing.assert_almost_equal(processed_data[:, -1], [0.156469, 0.1365517]) elif method == RealTimeProcessingMethod.GetPeaks: np.testing.assert_almost_equal(processed_data[:, -1], [0.0, 0.0]) - np.testing.assert_almost_equal(nb_peaks, 100) + np.testing.assert_almost_equal(nb_peaks[-1], 100) elif method == RealTimeProcessingMethod.ProcessGenericSignal: np.testing.assert_almost_equal(processed_data[:, -1], [0.0164153, 0.0218168]) elif method == RealTimeProcessingMethod.Custom: